diff --git a/.github/workflows/run_perf_alert_tool.yml b/.github/workflows/run_perf_alert_tool.yml new file mode 100644 index 000000000000..30779e765f12 --- /dev/null +++ b/.github/workflows/run_perf_alert_tool.yml @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# To learn more about GitHub Actions in Apache Beam check the CI.md + +name: Run performance alerting tool on Python load/performance/benchmark tests. + +on: + schedule: + - cron: '5 22 * * *' + +jobs: + python_run_change_point_analysis: + name: Run Change Point Analysis. + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + - name: Install python + uses: actions/setup-python@v4 + with: + python-version: 3.8 + - name: Get Apache Beam Build dependencies + working-directory: ./sdks/python + run: pip install pip setuptools --upgrade && pip install -r build-requirements.txt + - name: Install Apache Beam + working-directory: ./sdks/python + run: pip install -e .[gcp,test] + - name: Install signal-processing-algorithms + run: pip install signal-processing-algorithms + - name: Install pandas, yaml, requests + run: pip install pandas PyYAML requests +# - name: Run Change Point Analysis. +# working-directory: ./sdks/python/apache_beam/testing/analyzers +# shell: bash +# run: python analysis.py +# env: +# GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + - name: Run change point analysis tests. + working-directory: ./sdks/python/apache_beam/testing/analyzers + shell: bash + run: pytest perf_analysis_test.py + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + diff --git a/sdks/python/apache_beam/testing/analyzers/README.md b/sdks/python/apache_beam/testing/analyzers/README.md new file mode 100644 index 000000000000..9f50d9797b59 --- /dev/null +++ b/sdks/python/apache_beam/testing/analyzers/README.md @@ -0,0 +1,97 @@ + + +# Performance alerts for Beam Python performance and load tests + +## Alerts +Performance regressions or improvements detected with the [Change Point Analysis](https://en.wikipedia.org/wiki/Change_detection) using [edivisive](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30) +analyzer are automatically filed as Beam GitHub issues with a label `perf-alert`. + +The GitHub issue description will contain the information on the affected test and metric by providing the metric values for N consecutive runs with timestamps +before and after the observed change point. Observed change point is pointed as `Anomaly` in the issue description. + +Example: [sample perf alert GitHub issue](https://github.com/AnandInguva/beam/issues/83). + +If a performance alert is created on a test, a GitHub issue will be created and the GitHub issue metadata such as GitHub issue +URL, issue number along with the change point value and timestamp are exported to BigQuery. This data will be used to analyze the next change point observed on the same test to +update already created GitHub issue or ignore performance alert by not creating GitHub issue to avoid duplicate issue creation. + +## Config file structure +The config file defines the structure to run change point analysis on a given test. To add a test to the config file, +please follow the below structure. + +**NOTE**: The Change point analysis only supports reading the metric data from Big Query for now. + +``` +# the test_1 must be a unique id. +test_1: + test_name: apache_beam.testing.benchmarks.inference.pytorch_image_classification_benchmarks + source: big_query + metrics_dataset: beam_run_inference + metrics_table: torch_inference_imagenet_results_resnet152 + project: apache-beam-testing + metric_name: mean_load_model_latency_milli_secs + labels: + - run-inference + min_runs_between_change_points: 3 # optional parameter + num_runs_in_change_point_window: 30 # optional parameter +``` + +**NOTE**: `test_name` should be in the format `apache_beam.foo.bar`. It should point to a single test target. + +**Note**: If the source is **BigQuery**, the metrics_dataset, metrics_table, project and metric_name should match with the values defined for performance/load tests. +The above example uses this [test configuration](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30) +to fill up the values required to fetch the data from source. + +### Different ways to avoid false positive change points + +**min_runs_between_change_points**: As the metric data moves across the runs, the change point analysis can place the +change point in a slightly different place. These change points refer to the same regression and are just noise. +When we find a new change point, we will search up to the `min_runs_between_change_points` in both directions from the +current change point. If an existing change point is found within the distance, then the current change point will be +suppressed. + +**num_runs_in_change_point_window**: This defines how many runs to consider from the most recent run to be in change point window. +Sometimes, the change point found might be way back in time and could be irrelevant. For a test, if a change point needs to be +reported only when it was observed in the last 7 runs from the current run, +setting `num_runs_in_change_point_window=7` will achieve it. + +## Register a test for performance alerts + +If a new test needs to be registered for the performance alerting tool, please add the required test parameters to the +config file. + +## Triage performance alert issues + +All the performance/load tests metrics defined at [beam/.test-infra/jenkins](https://github.com/apache/beam/tree/master/.test-infra/jenkins) are imported to [Grafana dashboards](http://104.154.241.245/d/1/getting-started?orgId=1) for visualization. Please +find the alerted test dashboard to find a spike in the metric values. + +For example, for the below configuration, +* test: `apache_beam.testing.benchmarks.inference.pytorch_image_classification_benchmarks` +* metric_name: `mean_load_model_latency_milli_secs` + +Grafana dashboard can be found at http://104.154.241.245/d/ZpS8Uf44z/python-ml-runinference-benchmarks?orgId=1&viewPanel=7 + +If the dashboard for a test is not found, you can use the +notebook `analyze_metric_data.ipynb` to generate a plot for the given test, metric_name. + +If you confirm there is a change in the pattern of the values for a test, find the timestamp of when that change happened +and use that timestamp to find possible culprit commit. + +If the performance alert is a `false positive`, close the issue as `Close as not planned`. diff --git a/sdks/python/apache_beam/testing/analyzers/__init__.py b/sdks/python/apache_beam/testing/analyzers/__init__.py new file mode 100644 index 000000000000..cce3acad34a4 --- /dev/null +++ b/sdks/python/apache_beam/testing/analyzers/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/sdks/python/apache_beam/testing/analyzers/analyze_metric_data.ipynb b/sdks/python/apache_beam/testing/analyzers/analyze_metric_data.ipynb new file mode 100644 index 000000000000..690226571e4c --- /dev/null +++ b/sdks/python/apache_beam/testing/analyzers/analyze_metric_data.ipynb @@ -0,0 +1,172 @@ +{ + "nbformat": 4, + "nbformat_minor": 0, + "metadata": { + "colab": { + "provenance": [] + }, + "kernelspec": { + "name": "python3", + "display_name": "Python 3" + }, + "language_info": { + "name": "python" + } + }, + "cells": [ + { + "cell_type": "code", + "source": [ + "# Licensed to the Apache Software Foundation (ASF) under one\n", + "# or more contributor license agreements. See the NOTICE file\n", + "# distributed with this work for additional information\n", + "# regarding copyright ownership. The ASF licenses this file\n", + "# to you under the Apache License, Version 2.0 (the\n", + "# \"License\"); you may not use this file except in compliance\n", + "# with the License. You may obtain a copy of the License at\n", + "#\n", + "# http://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing,\n", + "# software distributed under the License is distributed on an\n", + "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", + "# KIND, either express or implied. See the License for the\n", + "# specific language governing permissions and limitations\n", + "# under the License." + ], + "metadata": { + "id": "fCjymAKWJiTh" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "# this notebook intended for internal testing purpose." + ], + "metadata": { + "id": "CCAvj4mQFR5x" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Apache Beam can be installed directly from the main branch of https://github.com/apache/beam or can be installed via `pip install apache_beam>=2.45.0`" + ], + "metadata": { + "id": "IL7coD4DJqzG" + } + }, + { + "cell_type": "code", + "source": [ + "!git clone https://github.com/apache/beam.git\n", + "!pip install -r beam/sdks/python/build-requirements.txt\n", + "!pip install -e beam/sdks/python[gcp]\n", + "!pip install matplotlib\n", + "!pip install pandas" + ], + "metadata": { + "id": "yW4okqmpECqY" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Import necessary dependencies" + ], + "metadata": { + "id": "ZPt3DbZcL-Ki" + } + }, + { + "cell_type": "code", + "source": [ + "import pandas as pd\n", + "import matplotlib.pyplot as plt\n", + "from apache_beam.testing.load_tests import load_test_metrics_utils\n", + "from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsFetcher" + ], + "metadata": { + "id": "xYGgc-tpE9qY" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "bq_project = 'apache-beam-testing'\n", + "bq_dataset = '' # sample value: beam_run_inference\n", + "bq_table = '' # sample value: torch_inference_imagenet_results_resnet152\n", + "metric_name = '' # sample value: mean_load_model_latency_milli_secs\n", + "\n", + "query = f\"\"\"\n", + " SELECT *\n", + " FROM {bq_project}.{bq_dataset}.{bq_table}\n", + " WHERE CONTAINS_SUBSTR(({load_test_metrics_utils.METRICS_TYPE_LABEL}), '{metric_name}')\n", + " ORDER BY {load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL} DESC\n", + " LIMIT 30\n", + " \"\"\"\n" + ], + "metadata": { + "id": "nyMmUpRrD_zV" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "If the performance/load test store the results in BigQuery using this [schema](https://github.com/apache/beam/blob/83679216cce2d52dbeb7e837f06ca1d57b31d509/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py#L66),\n", + "then fetch the metric_values for a `metric_name` for the last `30` runs and display a plot using matplotlib.\n" + ], + "metadata": { + "id": "RwlsXCLbVs_2" + } + }, + { + "cell_type": "code", + "source": [ + "big_query_metrics_fetcher = BigQueryMetricsFetcher()\n", + "metric_data: pd.DataFrame = big_query_metrics_fetcher.fetch(query=query)" + ], + "metadata": { + "id": "rmOE_odNEBFK" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "# sort the data to view it in chronological order.\n", + "metric_data.sort_values(\n", + " by=[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL], inplace=True)" + ], + "metadata": { + "id": "q-i3qLpGV5Ly" + }, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "code", + "source": [ + "metric_data.plot(x=load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL,\n", + " y=load_test_metrics_utils.VALUE_LABEL)\n", + "plt.show()" + ], + "metadata": { + "id": "vbFoxdxHVvtQ" + }, + "execution_count": null, + "outputs": [] + } + ] +} \ No newline at end of file diff --git a/sdks/python/apache_beam/testing/analyzers/constants.py b/sdks/python/apache_beam/testing/analyzers/constants.py new file mode 100644 index 000000000000..c4bdded77a06 --- /dev/null +++ b/sdks/python/apache_beam/testing/analyzers/constants.py @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""The file defines global variables for performance analysis.""" + +_BQ_PROJECT_NAME = 'apache-beam-testing' +_BQ_DATASET = 'beam_perf_storage' + +_UNIQUE_ID = 'test_id' +_ISSUE_CREATION_TIMESTAMP_LABEL = 'issue_timestamp' +_CHANGE_POINT_TIMESTAMP_LABEL = 'change_point_timestamp' +_CHANGE_POINT_LABEL = 'change_point' +_TEST_NAME = 'test_name' +_METRIC_NAME = 'metric_name' +_ISSUE_NUMBER = 'issue_number' +_ISSUE_URL = 'issue_url' +# number of results to display on the issue description +# from change point index in both directions. +_NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION = 10 +_NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS = 100 +# Variables used for finding duplicate change points. +_DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS = 3 +_DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW = 30 + +_PERF_TEST_KEYS = { + 'test_name', 'metrics_dataset', 'metrics_table', 'project', 'metric_name' +} + +_SCHEMA = [{ + 'name': _UNIQUE_ID, 'field_type': 'STRING', 'mode': 'REQUIRED' +}, + { + 'name': _ISSUE_CREATION_TIMESTAMP_LABEL, + 'field_type': 'TIMESTAMP', + 'mode': 'REQUIRED' + }, + { + 'name': _CHANGE_POINT_TIMESTAMP_LABEL, + 'field_type': 'TIMESTAMP', + 'mode': 'REQUIRED' + }, + { + 'name': _CHANGE_POINT_LABEL, + 'field_type': 'FLOAT64', + 'mode': 'REQUIRED' + }, { + 'name': _METRIC_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED' + }, { + 'name': _TEST_NAME, 'field_type': 'STRING', 'mode': 'REQUIRED' + }, { + 'name': _ISSUE_NUMBER, 'field_type': 'INT64', 'mode': 'REQUIRED' + }, { + 'name': _ISSUE_URL, 'field_type': 'STRING', 'mode': 'REQUIRED' + }] diff --git a/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py b/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py new file mode 100644 index 000000000000..398a98e00ced --- /dev/null +++ b/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py @@ -0,0 +1,196 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import json +import logging +import os +from typing import List +from typing import Optional +from typing import Tuple + +import pandas as pd +import requests + +try: + _GITHUB_TOKEN: Optional[str] = os.environ['GITHUB_TOKEN'] +except KeyError as e: + _GITHUB_TOKEN = None + logging.warning( + 'A Github Personal Access token is required ' + 'to create Github Issues.') + +# TODO: Change the REPO owner name to apache before merging. +_BEAM_GITHUB_REPO_OWNER = 'AnandInguva' +_BEAM_GITHUB_REPO_NAME = 'beam' +# Adding GitHub Rest API version to the header to maintain version stability. +# For more information, please look at +# https://github.blog/2022-11-28-to-infinity-and-beyond-enabling-the-future-of-githubs-rest-api-with-api-versioning/ # pylint: disable=line-too-long +_HEADERS = { + "Authorization": 'token {}'.format(_GITHUB_TOKEN), + "Accept": "application/vnd.github+json", + "X-GitHub-Api-Version": "2022-11-28" +} + +_ISSUE_TITLE_TEMPLATE = """ + Performance Regression or Improvement: {}:{} +""" + +_ISSUE_DESCRIPTION_TEMPLATE = """ + Performance change found in the + test: `{}` for the metric: `{}`. + + For more information on how to triage the alerts, please look at + `Triage performance alert issues` section of the [README](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/testing/analyzers/README.md#triage-performance-alert-issues). +""" +_METRIC_INFO_TEMPLATE = "timestamp: {}, metric_value: `{}`" +_AWAITING_TRIAGE_LABEL = 'awaiting triage' +_PERF_ALERT_LABEL = 'perf-alert' + + +def create_issue( + title: str, + description: str, + labels: Optional[List[str]] = None, +) -> Tuple[int, str]: + """ + Create an issue with title, description with a label. + + Args: + title: GitHub issue title. + description: GitHub issue description. + labels: Labels used to tag the GitHub issue. + Returns: + Tuple containing GitHub issue number and issue URL. + """ + url = "https://api.github.com/repos/{}/{}/issues".format( + _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME) + data = { + 'owner': _BEAM_GITHUB_REPO_OWNER, + 'repo': _BEAM_GITHUB_REPO_NAME, + 'title': title, + 'body': description, + 'labels': [_AWAITING_TRIAGE_LABEL, _PERF_ALERT_LABEL] + } + if labels: + data['labels'].extend(labels) # type: ignore + response = requests.post( + url=url, data=json.dumps(data), headers=_HEADERS).json() + return response['number'], response['html_url'] + + +def comment_on_issue(issue_number: int, + comment_description: str) -> Tuple[bool, str]: + """ + This method looks for an issue with provided issue_number. If an open + issue is found, comment on the open issue with provided description else + do nothing. + + Args: + issue_number: A GitHub issue number. + comment_description: If an issue with issue_number is open, + then comment on the issue with the using comment_description. + Returns: + tuple[bool, Optional[str]] indicating if a comment was added to + issue, and the comment URL. + """ + url = 'https://api.github.com/repos/{}/{}/issues/{}'.format( + _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number) + open_issue_response = requests.get( + url, + json.dumps({ + 'owner': _BEAM_GITHUB_REPO_OWNER, + 'repo': _BEAM_GITHUB_REPO_NAME, + 'issue_number': issue_number + }), + headers=_HEADERS).json() + if open_issue_response['state'] == 'open': + data = { + 'owner': _BEAM_GITHUB_REPO_OWNER, + 'repo': _BEAM_GITHUB_REPO_NAME, + 'body': comment_description, + issue_number: issue_number, + } + response = requests.post( + open_issue_response['comments_url'], json.dumps(data), headers=_HEADERS) + return True, response.json()['html_url'] + return False, '' + + +def add_awaiting_triage_label(issue_number: int): + url = 'https://api.github.com/repos/{}/{}/issues/{}/labels'.format( + _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number) + requests.post( + url, json.dumps({'labels': [_AWAITING_TRIAGE_LABEL]}), headers=_HEADERS) + + +def get_issue_description( + test_name: str, + metric_name: str, + timestamps: List[pd.Timestamp], + metric_values: List, + change_point_index: int, + max_results_to_display: int = 5) -> str: + """ + Args: + metric_name: Metric name used for the Change Point Analysis. + timestamps: Timestamps of the metrics when they were published to the + Database. Timestamps are expected in ascending order. + metric_values: metric values for the previous runs. + change_point_index: Index for the change point. The element in the + index of the metric_values would be the change point. + max_results_to_display: Max number of results to display from the change + point index, in both directions of the change point index. + + Returns: + str: Description used to fill the GitHub issues description. + """ + + # TODO: Add mean and median before and after the changepoint index. + max_timestamp_index = min( + change_point_index + max_results_to_display, len(metric_values) - 1) + min_timestamp_index = max(0, change_point_index - max_results_to_display) + + description = _ISSUE_DESCRIPTION_TEMPLATE.format( + test_name, metric_name) + 2 * '\n' + + runs_to_display = [ + _METRIC_INFO_TEMPLATE.format(timestamps[i].ctime(), metric_values[i]) + for i in reversed(range(min_timestamp_index, max_timestamp_index + 1)) + ] + + runs_to_display[change_point_index - min_timestamp_index] += " <---- Anomaly" + return description + '\n'.join(runs_to_display) + + +def report_change_point_on_issues( + title: str, + description: str, + labels: Optional[List[str]] = None, + existing_issue_number: Optional[int] = None, +) -> Tuple[int, str]: + """ + Comments the description on the existing issue (if provided and still open), + or creates a new issue. + """ + if existing_issue_number is not None: + commented_on_issue, issue_url = comment_on_issue( + issue_number=existing_issue_number, + comment_description=description + ) + if commented_on_issue: + add_awaiting_triage_label(issue_number=existing_issue_number) + return existing_issue_number, issue_url + return create_issue(title=title, description=description, labels=labels) diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py new file mode 100644 index 000000000000..870deed770c5 --- /dev/null +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py @@ -0,0 +1,192 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This script is used to run Change Point Analysis using a config file. +# config file holds the parameters required to fetch data, and to run the +# change point analysis. Change Point Analysis is used to find Performance +# regressions for benchmark/load/performance test. + +import argparse +import logging +import os +import uuid +from datetime import datetime +from datetime import timezone +from typing import Any +from typing import Dict +from typing import Optional + +import pandas as pd + +from apache_beam.testing.analyzers import constants +from apache_beam.testing.analyzers.perf_analysis_utils import GitHubIssueMetaData +from apache_beam.testing.analyzers.perf_analysis_utils import create_performance_alert +from apache_beam.testing.analyzers.perf_analysis_utils import fetch_metric_data +from apache_beam.testing.analyzers.perf_analysis_utils import find_latest_change_point_index +from apache_beam.testing.analyzers.perf_analysis_utils import get_existing_issues_data +from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window +from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert +from apache_beam.testing.analyzers.perf_analysis_utils import publish_issue_metadata_to_big_query +from apache_beam.testing.analyzers.perf_analysis_utils import read_test_config +from apache_beam.testing.analyzers.perf_analysis_utils import validate_config +from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsFetcher + + +def run_change_point_analysis(params, test_id, big_query_metrics_fetcher): + """ + Args: + params: Dict containing parameters to run change point analysis. + test_id: Test id for the current test. + big_query_metrics_fetcher: BigQuery metrics fetcher used to fetch data for + change point analysis. + Returns: + bool indicating if a change point is observed and alerted on GitHub. + """ + if not validate_config(params.keys()): + raise ValueError( + f"Please make sure all these keys {constants._PERF_TEST_KEYS} " + f"are specified for the {test_id}") + + metric_name = params['metric_name'] + test_name = params['test_name'].replace('.', '_') + f'_{metric_name}' + + min_runs_between_change_points = ( + constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS) + if 'min_runs_between_change_points' in params: + min_runs_between_change_points = params['min_runs_between_change_points'] + + num_runs_in_change_point_window = ( + constants._DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW) + if 'num_runs_in_change_point_window' in params: + num_runs_in_change_point_window = params['num_runs_in_change_point_window'] + + metric_values, timestamps = fetch_metric_data( + params=params, + big_query_metrics_fetcher=big_query_metrics_fetcher + ) + + change_point_index = find_latest_change_point_index( + metric_values=metric_values) + if not change_point_index: + return False + # since timestamps are ordered in ascending order and + # num_runs_in_change_point_window refers to the latest runs, + # latest_change_point_run can help determine if the change point + # index is recent wrt num_runs_in_change_point_window + latest_change_point_run = len(timestamps) - 1 - change_point_index + if not is_change_point_in_valid_window(num_runs_in_change_point_window, + latest_change_point_run): + logging.info( + 'Performance regression/improvement found for the test: %s. ' + 'Since the change point run %s ' + 'lies outside the num_runs_in_change_point_window distance: %s, ' + 'alert is not raised.' % ( + params['test_name'], + latest_change_point_run, + num_runs_in_change_point_window)) + return False + + is_alert = True + last_reported_issue_number = None + existing_issue_data = get_existing_issues_data( + test_name=test_name, big_query_metrics_fetcher=big_query_metrics_fetcher) + + if existing_issue_data is not None: + existing_issue_timestamps = existing_issue_data[ + constants._CHANGE_POINT_TIMESTAMP_LABEL].tolist() + last_reported_issue_number = existing_issue_data[ + constants._ISSUE_NUMBER].tolist()[0] + + is_alert = is_perf_alert( + previous_change_point_timestamps=existing_issue_timestamps, + change_point_index=change_point_index, + timestamps=timestamps, + min_runs_between_change_points=min_runs_between_change_points) + + logging.debug( + "Performance alert is %s for test %s" % (is_alert, params['test_name'])) + if is_alert: + issue_number, issue_url = create_performance_alert( + metric_name, params['test_name'], timestamps, + metric_values, change_point_index, + params.get('labels', None), + last_reported_issue_number) + + issue_metadata = GitHubIssueMetaData( + issue_timestamp=pd.Timestamp( + datetime.now().replace(tzinfo=timezone.utc)), + test_name=test_name, + metric_name=metric_name, + test_id=uuid.uuid4().hex, + change_point=metric_values[change_point_index], + issue_number=issue_number, + issue_url=issue_url, + change_point_timestamp=timestamps[change_point_index]) + + publish_issue_metadata_to_big_query( + issue_metadata=issue_metadata, test_name=test_name) + + return is_alert + + +def run(config_file_path: Optional[str] = None) -> None: + """ + run is the entry point to run change point analysis on test metric + data, which is read from config file, and if there is a performance + regression/improvement observed for a test, an alert + will filed with GitHub Issues. + + If config_file_path is None, then the run method will use default + config file to read the required perf test parameters. + + Please take a look at the README for more information on the parameters + defined in the config file. + + """ + if config_file_path is None: + config_file_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), 'tests_config.yaml') + + tests_config: Dict[str, Dict[str, Any]] = read_test_config(config_file_path) + + big_query_metrics_fetcher = BigQueryMetricsFetcher() + + for test_id, params in tests_config.items(): + run_change_point_analysis(params, test_id, big_query_metrics_fetcher) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + + parser = argparse.ArgumentParser() + parser.add_argument( + '--config_file_path', + default=None, + type=str, + help='Path to the config file that contains data to run the Change Point ' + 'Analysis.The default file will used will be ' + 'apache_beam/testing/analyzers/tests.config.yml. ' + 'If you would like to use the Change Point Analysis for finding ' + 'performance regression in the tests, ' + 'please provide an .yml file in the same structure as the above ' + 'mentioned file. ') + known_args, unknown_args = parser.parse_known_args() + + if unknown_args: + logging.warning('Discarding unknown arguments : %s ' % unknown_args) + + run(known_args.config_file_path) diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py new file mode 100644 index 000000000000..000175e6388a --- /dev/null +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py @@ -0,0 +1,198 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pytype: skip-file + +import logging +import os +import unittest + +import mock +import pandas as pd + +# pylint: disable=ungrouped-imports +try: + import apache_beam.testing.analyzers.perf_analysis as analysis + from apache_beam.testing.analyzers import constants + from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window + from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert + from apache_beam.testing.analyzers.perf_analysis_utils import e_divisive + from apache_beam.testing.analyzers.perf_analysis_utils import validate_config +except ImportError as e: + analysis = None # type: ignore + + +# mock methods. +def get_fake_data_with_no_change_point(**kwargs): + num_samples = 20 + metric_values = [1] * num_samples + timestamps = list(range(num_samples)) + return metric_values, timestamps + + +def get_fake_data_with_change_point(**kwargs): + num_samples = 20 + metric_values = [0] * (num_samples // 2) + [1] * (num_samples // 2) + timestamps = [i for i in range(num_samples)] + return metric_values, timestamps + + +def get_existing_issue_data(**kwargs): + # change point found at index 10. So passing 10 in the + # existing issue data in mock method. + return pd.DataFrame([{ + constants._CHANGE_POINT_TIMESTAMP_LABEL: 10, constants._ISSUE_NUMBER: 0 + }]) + + +@unittest.skipIf( + analysis is None, + 'Missing dependencies. ' + 'Test dependencies are missing for the Analyzer.') +class TestChangePointAnalysis(unittest.TestCase): + def setUp(self) -> None: + self.single_change_point_series = [0] * 10 + [1] * 10 + self.multiple_change_point_series = self.single_change_point_series + [ + 2 + ] * 20 + self.timestamps = list(range(5)) + self.params = { + 'test_name': 'fake_test', + 'metrics_dataset': 'fake_dataset', + 'metrics_table': 'fake_table', + 'project': 'fake_project', + 'metric_name': 'fake_metric_name' + } + self.test_id = 'fake_id' + + def test_edivisive_means(self): + change_point_indexes = e_divisive(self.single_change_point_series) + self.assertEqual(change_point_indexes, [10]) + change_point_indexes = e_divisive(self.multiple_change_point_series) + self.assertEqual(sorted(change_point_indexes), [10, 20]) + + def test_is_changepoint_in_valid_window(self): + changepoint_to_recent_run_window = 19 + change_point_index = 14 + + is_valid = is_change_point_in_valid_window( + changepoint_to_recent_run_window, change_point_index) + self.assertEqual(is_valid, True) + + def test_change_point_outside_inspection_window_is_not_a_valid_alert(self): + changepoint_to_recent_run_window = 12 + change_point_index = 14 + + is_valid = is_change_point_in_valid_window( + changepoint_to_recent_run_window, change_point_index) + self.assertEqual(is_valid, False) + + def test_validate_config(self): + test_keys = { + 'test_name', + 'metrics_dataset', + 'metrics_table', + 'project', + 'metric_name' + } + self.assertEqual(test_keys, constants._PERF_TEST_KEYS) + self.assertTrue(validate_config(test_keys)) + + def test_duplicate_change_point(self): + change_point_index = 2 + min_runs_between_change_points = 1 + is_alert = is_perf_alert( + previous_change_point_timestamps=[self.timestamps[0]], + timestamps=self.timestamps, + change_point_index=change_point_index, + min_runs_between_change_points=min_runs_between_change_points) + self.assertTrue(is_alert) + + def test_duplicate_change_points_are_not_valid_alerts(self): + change_point_index = 2 + min_runs_between_change_points = 1 + is_alert = is_perf_alert( + previous_change_point_timestamps=[self.timestamps[3]], + timestamps=self.timestamps, + change_point_index=change_point_index, + min_runs_between_change_points=min_runs_between_change_points) + self.assertFalse(is_alert) + + is_alert = is_perf_alert( + previous_change_point_timestamps=[ + self.timestamps[0], self.timestamps[3] + ], + timestamps=self.timestamps, + change_point_index=change_point_index, + min_runs_between_change_points=min_runs_between_change_points) + self.assertFalse(is_alert) + + @mock.patch( + 'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data', + get_fake_data_with_no_change_point) + def test_no_alerts_when_no_change_points(self): + is_alert = analysis.run_change_point_analysis( + params=self.params, + test_id=self.test_id, + big_query_metrics_fetcher=None) + self.assertFalse(is_alert) + + @mock.patch( + 'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data', + get_fake_data_with_change_point) + @mock.patch( + 'apache_beam.testing.analyzers.perf_analysis.get_existing_issues_data', + return_value=None) + @mock.patch( + 'apache_beam.testing.analyzers.perf_analysis.' + 'publish_issue_metadata_to_big_query', + return_value=None) + @mock.patch( + 'apache_beam.testing.analyzers.perf_analysis' + '.create_performance_alert', + return_value=(0, '')) + def test_alert_on_data_with_change_point(self, *args): + is_alert = analysis.run_change_point_analysis( + params=self.params, + test_id=self.test_id, + big_query_metrics_fetcher=None) + self.assertTrue(is_alert) + + @mock.patch( + 'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data', + get_fake_data_with_change_point) + @mock.patch( + 'apache_beam.testing.analyzers.perf_analysis.get_existing_issues_data', + get_existing_issue_data) + @mock.patch( + 'apache_beam.testing.analyzers.perf_analysis.' + 'publish_issue_metadata_to_big_query', + return_value=None) + @mock.patch( + 'apache_beam.testing.analyzers.perf_analysis.create_performance_alert', + return_value=(0, '')) + def test_alert_on_data_with_reported_change_point(self, *args): + is_alert = analysis.run_change_point_analysis( + params=self.params, + test_id=self.test_id, + big_query_metrics_fetcher=None) + self.assertFalse(is_alert) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.DEBUG) + os.environ['GITHUB_TOKEN'] = 'fake_token' + unittest.main() diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py new file mode 100644 index 000000000000..247fe07f4df7 --- /dev/null +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py @@ -0,0 +1,214 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import logging +from dataclasses import asdict +from dataclasses import dataclass +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple +from typing import Union + +import pandas as pd +import yaml +from google.api_core import exceptions + +from apache_beam.testing.analyzers import constants +from apache_beam.testing.analyzers import github_issues_utils +from apache_beam.testing.load_tests import load_test_metrics_utils +from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsFetcher +from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher +from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive + + +@dataclass +class GitHubIssueMetaData: + """ + This class holds metadata that needs to be published to the + BigQuery when a GitHub issue is created on a performance + alert. + """ + issue_timestamp: pd.Timestamp + change_point_timestamp: pd.Timestamp + test_name: str + metric_name: str + issue_number: int + issue_url: str + test_id: str + change_point: float + + +def is_change_point_in_valid_window( + num_runs_in_change_point_window: int, latest_change_point_run: int) -> bool: + return num_runs_in_change_point_window > latest_change_point_run + + +def get_existing_issues_data( + test_name: str, big_query_metrics_fetcher: BigQueryMetricsFetcher +) -> Optional[pd.DataFrame]: + """ + Finds the most recent GitHub issue created for the test_name. + If no table found with name=test_name, return (None, None) + else return latest created issue_number along with + """ + query = f""" + SELECT * FROM {constants._BQ_PROJECT_NAME}.{constants._BQ_DATASET}.{test_name} + ORDER BY {constants._ISSUE_CREATION_TIMESTAMP_LABEL} DESC + LIMIT 10 + """ + try: + df = big_query_metrics_fetcher.fetch(query=query) + except exceptions.NotFound: + # If no table found, that means this is first performance regression + # on the current test+metric. + return None + return df + + +def is_perf_alert( + previous_change_point_timestamps: List[pd.Timestamp], + change_point_index: int, + timestamps: List[pd.Timestamp], + min_runs_between_change_points: int) -> bool: + """ + Search the previous_change_point_timestamps with current observed + change point sibling window and determine if it is a duplicate + change point or not. + timestamps are expected to be in ascending order. + + Return False if the current observed change point is a duplicate of + already reported change points else return True. + """ + sibling_change_point_min_timestamp = timestamps[max( + 0, change_point_index - min_runs_between_change_points)] + sibling_change_point_max_timestamp = timestamps[min( + change_point_index + min_runs_between_change_points, len(timestamps) - 1)] + # Search a list of previous change point timestamps and compare it with + # current change point timestamp. We do this in case, if a current change + # point is already reported in the past. + for previous_change_point_timestamp in previous_change_point_timestamps: + if (sibling_change_point_min_timestamp <= previous_change_point_timestamp <= + sibling_change_point_max_timestamp): + return False + return True + + +def read_test_config(config_file_path: str) -> Dict: + """ + Reads the config file in which the data required to + run the change point analysis is specified. + """ + with open(config_file_path, 'r') as stream: + config = yaml.safe_load(stream) + return config + + +def validate_config(keys): + return constants._PERF_TEST_KEYS.issubset(keys) + + +def fetch_metric_data( + params: Dict[str, Any], big_query_metrics_fetcher: BigQueryMetricsFetcher +) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: + """ + Args: + params: Dict containing keys required to fetch data from a data source. + big_query_metrics_fetcher: A BigQuery metrics fetcher for fetch metrics. + Returns: + Tuple[List[Union[int, float]], List[pd.Timestamp]]: Tuple containing list + of metric_values and list of timestamps. Both are sorted in ascending + order wrt timestamps. + """ + query = f""" + SELECT * + FROM {params['project']}.{params['metrics_dataset']}.{params['metrics_table']} + WHERE CONTAINS_SUBSTR(({load_test_metrics_utils.METRICS_TYPE_LABEL}), '{params['metric_name']}') + ORDER BY {load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL} DESC + LIMIT {constants._NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS} + """ + metric_data: pd.DataFrame = big_query_metrics_fetcher.fetch(query=query) + metric_data.sort_values( + by=[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL], inplace=True) + return ( + metric_data[load_test_metrics_utils.VALUE_LABEL].tolist(), + metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL].tolist()) + + +def find_latest_change_point_index(metric_values: List[Union[float, int]]): + """ + Args: + metric_values: Metric values used to run change point analysis. + Returns: + int: Right most change point index observed on metric_values. + """ + change_points_idx = e_divisive(metric_values) + if not change_points_idx: + return None + # Consider the latest change point. + change_points_idx.sort() + return change_points_idx[-1] + + +def publish_issue_metadata_to_big_query(issue_metadata, test_name): + """ + Published issue_metadata to BigQuery with table name=test_name. + """ + bq_metrics_publisher = BigQueryMetricsPublisher( + project_name=constants._BQ_PROJECT_NAME, + dataset=constants._BQ_DATASET, + table=test_name, + bq_schema=constants._SCHEMA) + bq_metrics_publisher.publish([asdict(issue_metadata)]) + logging.info( + 'GitHub metadata is published to Big Query Dataset %s' + ', table %s' % (constants._BQ_DATASET, test_name)) + + +def create_performance_alert( + metric_name: str, + test_name: str, + timestamps: List[pd.Timestamp], + metric_values: List[Union[int, float]], + change_point_index: int, + labels: List[str], + existing_issue_number: Optional[int]) -> Tuple[int, str]: + """ + Creates performance alert on GitHub issues and returns GitHub issue + number and issue URL. + """ + description = github_issues_utils.get_issue_description( + test_name=test_name, + metric_name=metric_name, + timestamps=timestamps, + metric_values=metric_values, + change_point_index=change_point_index, + max_results_to_display=( + constants._NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION)) + + issue_number, issue_url = github_issues_utils.report_change_point_on_issues( + title=github_issues_utils._ISSUE_TITLE_TEMPLATE.format( + test_name, metric_name + ), + description=description, + labels=labels, + existing_issue_number=existing_issue_number) + + logging.info( + 'Performance regression is alerted on issue #%s. Link to ' + 'the issue: %s' % (issue_number, issue_url)) + return issue_number, issue_url diff --git a/sdks/python/apache_beam/testing/analyzers/tests_config.yaml b/sdks/python/apache_beam/testing/analyzers/tests_config.yaml new file mode 100644 index 000000000000..9a208ea9e815 --- /dev/null +++ b/sdks/python/apache_beam/testing/analyzers/tests_config.yaml @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +test_1: + test_name: apache_beam.testing.benchmarks.inference.pytorch_image_classification_benchmarks_22 + metrics_dataset: beam_run_inference + metrics_table: torch_inference_imagenet_results_resnet152 + project: apache-beam-testing + metric_name: mean_load_model_latency_milli_secs + labels: + - run-inference + # Optional parameters. + min_runs_between_change_points: 3 + num_runs_in_change_point_window: 30 + +test_2: + test_name: apache_beam.testing.benchmarks.inference.pytorch_image_classification_benchmarks + metrics_dataset: beam_run_inference + metrics_table: torch_inference_imagenet_results_resnet101 + project: apache-beam-testing + metric_name: mean_load_model_latency_milli_secs + labels: + - run-inference diff --git a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py index fbca1cb96e9d..60595ed02e08 100644 --- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py +++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py @@ -38,6 +38,7 @@ from typing import Optional from typing import Union +import pandas as pd import requests from requests.auth import HTTPBasicAuth @@ -408,8 +409,10 @@ def publish(self, results): class BigQueryMetricsPublisher(MetricsPublisher): """A :class:`BigQueryMetricsPublisher` publishes collected metrics to BigQuery output.""" - def __init__(self, project_name, table, dataset): - self.bq = BigQueryClient(project_name, table, dataset) + def __init__(self, project_name, table, dataset, bq_schema=None): + if not bq_schema: + bq_schema = SCHEMA + self.bq = BigQueryClient(project_name, table, dataset, bq_schema) def publish(self, results): outputs = self.bq.save(results) @@ -424,7 +427,8 @@ def publish(self, results): class BigQueryClient(object): """A :class:`BigQueryClient` publishes collected metrics to BigQuery output.""" - def __init__(self, project_name, table, dataset): + def __init__(self, project_name, table, dataset, bq_schema=None): + self.schema = bq_schema self._namespace = table self._client = bigquery.Client(project=project_name) self._schema_names = self._get_schema_names() @@ -432,10 +436,10 @@ def __init__(self, project_name, table, dataset): self._get_or_create_table(schema, dataset) def _get_schema_names(self): - return [schema['name'] for schema in SCHEMA] + return [schema['name'] for schema in self.schema] def _prepare_schema(self): - return [SchemaField(**row) for row in SCHEMA] + return [SchemaField(**row) for row in self.schema] def _get_or_create_table(self, bq_schemas, dataset): if self._namespace == '': @@ -620,3 +624,13 @@ def __init__(self): def process(self, element): yield self.timestamp_val_fn( element, self.timestamp_fn(micros=int(self.time_fn() * 1000000))) + + +class BigQueryMetricsFetcher: + def __init__(self): + self.client = bigquery.Client() + + def fetch(self, query) -> pd.DataFrame: + query_job = self.client.query(query=query) + result = query_job.result() + return result.to_dataframe() diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh index ae715bb3a026..376dc123cef5 100755 --- a/sdks/python/scripts/generate_pydoc.sh +++ b/sdks/python/scripts/generate_pydoc.sh @@ -68,6 +68,7 @@ excluded_patterns=( 'apache_beam/testing/benchmarks/inference/' 'apache_beam/testing/benchmarks/data/' 'apache_beam/testing/benchmarks/load_tests/' + 'apache_beam/testing/analyzers' 'apache_beam/testing/.*test.py' 'apache_beam/tools/' 'apache_beam/tools/map_fn_microbenchmark.*'