Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion .github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ jobs:
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_VLLM_Gemma_Batch.txt
${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Table_Row_Inference_Base.txt
# The env variables are created and populated in the test-arguments-action as "<github.job>_test_arguments_<argument_file_paths_index>"
- name: get current time
run: echo "NOW_UTC=$(date '+%m%d%H%M%S' --utc)" >> $GITHUB_ENV
Expand Down Expand Up @@ -189,4 +190,26 @@ jobs:
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/torch_tests_requirements.txt \
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_5 }} --job_name=benchmark-tests-pytorch-imagenet-python-gpu-${{env.NOW_UTC}} --output=gs://temp-storage-for-end-to-end-tests/torch/result_resnet152_gpu-${{env.NOW_UTC}}.txt'
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_5 }} --job_name=benchmark-tests-pytorch-imagenet-python-gpu-${{env.NOW_UTC}} --output=gs://temp-storage-for-end-to-end-tests/torch/result_resnet152_gpu-${{env.NOW_UTC}}.txt'
- name: run Table Row Inference Sklearn Batch
uses: ./.github/actions/gradle-command-self-hosted-action
timeout-minutes: 180
with:
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
arguments: |
-PloadTest.mainClass=apache_beam.testing.benchmarks.inference.table_row_inference_benchmark \
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/table_row_inference_requirements.txt \
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_9 }} --autoscaling_algorithm=NONE --metrics_table=result_table_row_inference_batch --influx_measurement=result_table_row_inference_batch --mode=batch --input_file=gs://apache-beam-ml/testing/inputs/table_rows_100k_benchmark.jsonl --input_expand_factor=100 --output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_batch_outputs --job_name=benchmark-tests-table-row-inference-batch-${{env.NOW_UTC}}'
- name: run Table Row Inference Sklearn Stream
uses: ./.github/actions/gradle-command-self-hosted-action
timeout-minutes: 180
with:
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
arguments: |
-PloadTest.mainClass=apache_beam.testing.benchmarks.inference.table_row_inference_benchmark \
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/table_row_inference_requirements.txt \
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_9 }} --autoscaling_algorithm=THROUGHPUT_BASED --max_num_workers=20 --metrics_table=result_table_row_inference_stream --influx_measurement=result_table_row_inference_stream --mode=streaming --input_subscription=projects/apache-beam-testing/subscriptions/table_row_inference_benchmark --window_size_sec=60 --trigger_interval_sec=30 --timeout_ms=900000 --output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_stream_outputs --job_name=benchmark-tests-table-row-inference-stream-${{env.NOW_UTC}}'
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

--project=apache-beam-testing
--region=us-central1
--worker_machine_type=n1-standard-4
--num_workers=10
--disk_size_gb=50
--autoscaling_algorithm=NONE
--staging_location=gs://temp-storage-for-perf-tests/loadtests
--temp_location=gs://temp-storage-for-perf-tests/loadtests
--requirements_file=apache_beam/ml/inference/table_row_inference_requirements.txt
--publish_to_big_query=true
--metrics_dataset=beam_run_inference
--metrics_table=result_table_row_inference_batch
--input_options={}
--influx_measurement=result_table_row_inference_batch
--mode=batch
--input_file=gs://apache-beam-ml/testing/inputs/table_rows_100k_benchmark.jsonl
# 100k lines × 100 = 10M rows; use 1000 for 100M rows
--input_expand_factor=100
--model_path=gs://apache-beam-ml/models/sklearn_table_classifier.pkl
--feature_columns=feature1,feature2,feature3,feature4,feature5
--output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_batch_outputs
--runner=DataflowRunner
--experiments=use_runner_v2
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

--project=apache-beam-testing
--region=us-central1
--worker_machine_type=n1-standard-4
--num_workers=10
--disk_size_gb=50
--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=20
--staging_location=gs://temp-storage-for-perf-tests/loadtests
--temp_location=gs://temp-storage-for-perf-tests/loadtests
--requirements_file=apache_beam/ml/inference/table_row_inference_requirements.txt
--publish_to_big_query=true
--metrics_dataset=beam_run_inference
--metrics_table=result_table_row_inference_stream
--input_options={}
--influx_measurement=result_table_row_inference_stream
--mode=streaming
--input_subscription=projects/apache-beam-testing/subscriptions/table_row_inference_benchmark
--window_size_sec=60
--trigger_interval_sec=30
--timeout_ms=1800000
--model_path=gs://apache-beam-ml/models/sklearn_table_classifier.pkl
--feature_columns=feature1,feature2,feature3,feature4,feature5
--output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_stream_outputs
--runner=DataflowRunner
--experiments=use_runner_v2
3 changes: 2 additions & 1 deletion .test-infra/tools/refresh_looker_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@
("82", ["263", "264", "265", "266", "267"]), # PyTorch Sentiment Streaming DistilBERT base uncased
("85", ["268", "269", "270", "271", "272"]), # PyTorch Sentiment Batch DistilBERT base uncased
("86", ["284", "285", "286", "287", "288"]), # VLLM Batch Gemma
("96", ["270", "304", "305", "353", "354"]), # Table Row Inference Sklearn Batch
("106", ["355", "356", "357", "358", "359"]) # Table Row Inference Sklearn Streaming
]


def get_look(id: str) -> models.Look:
look = next(iter(sdk.search_looks(id=id)), None)
if not look:
Expand Down
66 changes: 66 additions & 0 deletions sdks/python/apache_beam/examples/inference/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -968,4 +968,70 @@ and produce the following result in your output file location:
An emperor penguin is an adorable creature that lives in Antarctica.
```

---
## Table row inference

[`table_row_inference.py`](./table_row_inference.py) contains an implementation for a RunInference pipeline that processes structured table rows from a file or Pub/Sub, runs ML inference while preserving the table schema, and writes results to BigQuery. It supports both batch (file input) and streaming (Pub/Sub) modes.

### Prerequisites for table row inference

Install dependencies (or use `apache_beam/ml/inference/table_row_inference_requirements.txt` from the `sdks/python` directory):

```sh
pip install apache-beam[gcp] scikit-learn google-cloud-pubsub
```

For streaming mode you need a Pub/Sub topic and subscription, a BigQuery dataset, and a GCS bucket for model and temp files.

### Model and data for table row inference

1. Create a scikit-learn model and sample data using the provided utilities:

```sh
python -m apache_beam.examples.inference.table_row_inference_utils --action=create_model --output_path=model.pkl --num_features=3
python -m apache_beam.examples.inference.table_row_inference_utils --action=generate_data --output_path=input_data.jsonl --num_rows=1000 --num_features=3
```

2. Input data should be JSONL with an `id` field and feature columns, for example:

```json
{"id": "row_1", "feature1": 1.5, "feature2": 2.3, "feature3": 3.7}
```

### Running `table_row_inference.py` (batch)

To run the table row inference pipeline in batch mode locally:

```sh
python -m apache_beam.examples.inference.table_row_inference \
--mode=batch \
--input_file=input_data.jsonl \
--output_table=PROJECT:DATASET.predictions \
--model_path=model.pkl \
--feature_columns=feature1,feature2,feature3 \
--runner=DirectRunner
```

### Running `table_row_inference.py` (streaming)

For streaming mode, use a Pub/Sub subscription and DataflowRunner. Set up a topic and subscription first, then run:

```sh
python -m apache_beam.examples.inference.table_row_inference \
--mode=streaming \
--input_subscription=projects/PROJECT/subscriptions/SUBSCRIPTION \
--output_table=PROJECT:DATASET.predictions \
--model_path=gs://BUCKET/model.pkl \
--feature_columns=feature1,feature2,feature3 \
--runner=DataflowRunner \
--project=PROJECT \
--region=us-central1 \
--temp_location=gs://BUCKET/temp \
--staging_location=gs://BUCKET/staging
```

See the script for full pipeline options (window size, trigger interval, worker settings, etc.).

Output is written to the BigQuery table with columns such as `row_key`, `prediction`, and the original input feature columns.

---
Loading
Loading