From 5d292a9a7d514c843c60d80b15db92301c853096 Mon Sep 17 00:00:00 2001 From: AnandInguva Date: Tue, 26 Sep 2023 15:40:22 -0400 Subject: [PATCH 01/13] Support for custom BigQueryMetricsFetcher --- .../testing/analyzers/perf_analysis.py | 22 +++-- .../testing/analyzers/perf_analysis_test.py | 26 +++--- .../testing/analyzers/perf_analysis_utils.py | 86 +++++++++++-------- .../load_tests/load_test_metrics_utils.py | 11 --- 4 files changed, 81 insertions(+), 64 deletions(-) diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py index 7f1ffbb944e9..c2118d1e51e5 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py @@ -33,9 +33,10 @@ import pandas as pd from apache_beam.testing.analyzers import constants +from apache_beam.testing.analyzers.perf_analysis_utils import MetricsFetcher from apache_beam.testing.analyzers.perf_analysis_utils import GitHubIssueMetaData +from apache_beam.testing.analyzers.perf_analysis_utils import BigQueryMetricsFetcher from apache_beam.testing.analyzers.perf_analysis_utils import create_performance_alert -from apache_beam.testing.analyzers.perf_analysis_utils import fetch_metric_data from apache_beam.testing.analyzers.perf_analysis_utils import find_latest_change_point_index from apache_beam.testing.analyzers.perf_analysis_utils import get_existing_issues_data from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window @@ -43,10 +44,10 @@ from apache_beam.testing.analyzers.perf_analysis_utils import publish_issue_metadata_to_big_query from apache_beam.testing.analyzers.perf_analysis_utils import read_test_config from apache_beam.testing.analyzers.perf_analysis_utils import validate_config -from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsFetcher -def run_change_point_analysis(params, test_name, big_query_metrics_fetcher): +def run_change_point_analysis( + params, test_name, big_query_metrics_fetcher: MetricsFetcher): """ Args: params: Dict containing parameters to run change point analysis. @@ -74,9 +75,11 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher): if 'num_runs_in_change_point_window' in params: num_runs_in_change_point_window = params['num_runs_in_change_point_window'] - metric_values, timestamps = fetch_metric_data( - params=params, - big_query_metrics_fetcher=big_query_metrics_fetcher + metric_values, timestamps = big_query_metrics_fetcher.fetch_metric_data( + project=params['project'], + metrics_dataset=params['metrics_dataset'], + metrics_table=params['metrics_table'], + metric_name=params['metric_name'] ) change_point_index = find_latest_change_point_index( @@ -149,7 +152,9 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher): return is_alert -def run(config_file_path: Optional[str] = None) -> None: +def run( + config_file_path: Optional[str] = None, + big_query_metrics_fetcher: Optional[MetricsFetcher] = None) -> None: """ run is the entry point to run change point analysis on test metric data, which is read from config file, and if there is a performance @@ -169,7 +174,8 @@ def run(config_file_path: Optional[str] = None) -> None: tests_config: Dict[str, Dict[str, Any]] = read_test_config(config_file_path) - big_query_metrics_fetcher = BigQueryMetricsFetcher() + if not big_query_metrics_fetcher: + big_query_metrics_fetcher = BigQueryMetricsFetcher() for test_name, params in tests_config.items(): run_change_point_analysis( diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py index 094cd9c47ec0..3c934c295e72 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py @@ -32,6 +32,7 @@ from apache_beam.io.filesystems import FileSystems from apache_beam.testing.analyzers import constants from apache_beam.testing.analyzers import github_issues_utils + from apache_beam.testing.analyzers.perf_analysis_utils import BigQueryMetricsFetcher from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert from apache_beam.testing.analyzers.perf_analysis_utils import e_divisive @@ -45,14 +46,14 @@ # mock methods. -def get_fake_data_with_no_change_point(**kwargs): +def get_fake_data_with_no_change_point(*args, **kwargs): num_samples = 20 metric_values = [1] * num_samples timestamps = list(range(num_samples)) return metric_values, timestamps -def get_fake_data_with_change_point(**kwargs): +def get_fake_data_with_change_point(*args, **kwargs): # change point will be at index 13. num_samples = 20 metric_values = [0] * 12 + [3] + [4] * 7 @@ -151,18 +152,20 @@ def test_duplicate_change_points_are_not_valid_alerts(self): min_runs_between_change_points=min_runs_between_change_points) self.assertFalse(is_alert) - @mock.patch( - 'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data', + @mock.patch.object( + BigQueryMetricsFetcher, + 'fetch_metric_data', get_fake_data_with_no_change_point) def test_no_alerts_when_no_change_points(self): is_alert = analysis.run_change_point_analysis( params=self.params, test_name=self.test_id, - big_query_metrics_fetcher=None) + big_query_metrics_fetcher=BigQueryMetricsFetcher()) self.assertFalse(is_alert) - @mock.patch( - 'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data', + @mock.patch.object( + BigQueryMetricsFetcher, + 'fetch_metric_data', get_fake_data_with_change_point) @mock.patch( 'apache_beam.testing.analyzers.perf_analysis.get_existing_issues_data', @@ -179,11 +182,12 @@ def test_alert_on_data_with_change_point(self, *args): is_alert = analysis.run_change_point_analysis( params=self.params, test_name=self.test_id, - big_query_metrics_fetcher=None) + big_query_metrics_fetcher=BigQueryMetricsFetcher()) self.assertTrue(is_alert) - @mock.patch( - 'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data', + @mock.patch.object( + BigQueryMetricsFetcher, + 'fetch_metric_data', get_fake_data_with_change_point) @mock.patch( 'apache_beam.testing.analyzers.perf_analysis.get_existing_issues_data', @@ -199,7 +203,7 @@ def test_alert_on_data_with_reported_change_point(self, *args): is_alert = analysis.run_change_point_analysis( params=self.params, test_name=self.test_id, - big_query_metrics_fetcher=None) + big_query_metrics_fetcher=BigQueryMetricsFetcher()) self.assertFalse(is_alert) def test_change_point_has_anomaly_marker_in_gh_description(self): diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py index 0a559fc4beeb..d474ca5c5095 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py @@ -14,11 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import abc import logging from dataclasses import asdict from dataclasses import dataclass from statistics import median -from typing import Any from typing import Dict from typing import List from typing import Optional @@ -28,11 +28,11 @@ import pandas as pd import yaml from google.api_core import exceptions +from google.cloud import bigquery from apache_beam.testing.analyzers import constants from apache_beam.testing.analyzers import github_issues_utils from apache_beam.testing.load_tests import load_test_metrics_utils -from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsFetcher from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive @@ -59,9 +59,7 @@ def is_change_point_in_valid_window( return num_runs_in_change_point_window > latest_change_point_run -def get_existing_issues_data( - table_name: str, big_query_metrics_fetcher: BigQueryMetricsFetcher -) -> Optional[pd.DataFrame]: +def get_existing_issues_data(table_name: str, ) -> Optional[pd.DataFrame]: """ Finds the most recent GitHub issue created for the test_name. If no table found with name=test_name, return (None, None) @@ -73,12 +71,14 @@ def get_existing_issues_data( LIMIT 10 """ try: - df = big_query_metrics_fetcher.fetch(query=query) + client = bigquery.Client() + query_job = client.query(query=query) + existing_issue_data = query_job.result().to_dataframe() except exceptions.NotFound: # If no table found, that means this is first performance regression # on the current test+metric. return None - return df + return existing_issue_data def is_perf_alert( @@ -123,33 +123,6 @@ def validate_config(keys): return constants._PERF_TEST_KEYS.issubset(keys) -def fetch_metric_data( - params: Dict[str, Any], big_query_metrics_fetcher: BigQueryMetricsFetcher -) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: - """ - Args: - params: Dict containing keys required to fetch data from a data source. - big_query_metrics_fetcher: A BigQuery metrics fetcher for fetch metrics. - Returns: - Tuple[List[Union[int, float]], List[pd.Timestamp]]: Tuple containing list - of metric_values and list of timestamps. Both are sorted in ascending - order wrt timestamps. - """ - query = f""" - SELECT * - FROM {params['project']}.{params['metrics_dataset']}.{params['metrics_table']} - WHERE CONTAINS_SUBSTR(({load_test_metrics_utils.METRICS_TYPE_LABEL}), '{params['metric_name']}') - ORDER BY {load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL} DESC - LIMIT {constants._NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS} - """ - metric_data: pd.DataFrame = big_query_metrics_fetcher.fetch(query=query) - metric_data.sort_values( - by=[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL], inplace=True) - return ( - metric_data[load_test_metrics_utils.VALUE_LABEL].tolist(), - metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL].tolist()) - - def find_change_points(metric_values: List[Union[float, int]]): return e_divisive(metric_values) @@ -253,3 +226,48 @@ def filter_change_points_by_median_threshold( if relative_change > threshold: valid_change_points.append(idx) return valid_change_points + + +class MetricsFetcher: + @abc.abstractmethod + def fetch_metric_data( + self, project, metrics_dataset, metrics_table, + metric_name) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: + """ + Define schema and fetch the timestamp values and metric values + from BigQuery. + """ + + +class BigQueryMetricsFetcher: + def __init__(self, query: Optional[str] = None): + self.client = bigquery.Client() + self.query = query + + def fetch_metric_data( + self, *, project, metrics_dataset, metrics_table, + metric_name) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: + """ + Args: + params: Dict containing keys required to fetch data from a data source. + Returns: + Tuple[List[Union[int, float]], List[pd.Timestamp]]: Tuple containing list + of metric_values and list of timestamps. Both are sorted in ascending + order wrt timestamps. + """ + if not self.query: + self.query = f""" + SELECT * + FROM {project}.{metrics_dataset}.{metrics_table} + WHERE CONTAINS_SUBSTR(({load_test_metrics_utils.METRICS_TYPE_LABEL}), '{metric_name}') + ORDER BY {load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL} DESC + LIMIT {constants._NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS} + """ + # metric_data: pd.DataFrame = self.fetch(query=query) + query_job = self.client.query(query=self.query) + metric_data = query_job.result().to_dataframe() + metric_data.sort_values( + by=[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL], inplace=True) + return ( + metric_data[load_test_metrics_utils.VALUE_LABEL].tolist(), + metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL].tolist()) diff --git a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py index 92a5f68351fe..01db2c114efb 100644 --- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py +++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py @@ -38,7 +38,6 @@ from typing import Optional from typing import Union -import pandas as pd import requests from requests.auth import HTTPBasicAuth @@ -650,13 +649,3 @@ def __init__(self): def process(self, element): yield self.timestamp_val_fn( element, self.timestamp_fn(micros=int(self.time_fn() * 1000000))) - - -class BigQueryMetricsFetcher: - def __init__(self): - self.client = bigquery.Client() - - def fetch(self, query) -> pd.DataFrame: - query_job = self.client.query(query=query) - result = query_job.result() - return result.to_dataframe() From 5adaf4fc1e33d7c3f84e35fe9b603f32a3bc9dd2 Mon Sep 17 00:00:00 2001 From: AnandInguva Date: Tue, 26 Sep 2023 17:27:33 -0400 Subject: [PATCH 02/13] Read GITHUB repo and owner name from environment variables --- .../testing/analyzers/github_issues_utils.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py b/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py index e1f20baa50a6..7df4c7f6a167 100644 --- a/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py +++ b/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py @@ -34,8 +34,8 @@ 'A Github Personal Access token is required ' 'to create Github Issues.') -_BEAM_GITHUB_REPO_OWNER = 'apache' -_BEAM_GITHUB_REPO_NAME = 'beam' +_GITHUB_REPO_OWNER = os.environ.get('REPO_OWNER', 'AnandInguva') +_GITHUB_REPO_NAME = os.environ.get('REPO_NAME', 'beam') # Adding GitHub Rest API version to the header to maintain version stability. # For more information, please look at # https://github.blog/2022-11-28-to-infinity-and-beyond-enabling-the-future-of-githubs-rest-api-with-api-versioning/ # pylint: disable=line-too-long @@ -77,10 +77,10 @@ def create_issue( Tuple containing GitHub issue number and issue URL. """ url = "https://api.github.com/repos/{}/{}/issues".format( - _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME) + _GITHUB_REPO_OWNER, _GITHUB_REPO_NAME) data = { - 'owner': _BEAM_GITHUB_REPO_OWNER, - 'repo': _BEAM_GITHUB_REPO_NAME, + 'owner': _GITHUB_REPO_OWNER, + 'repo': _GITHUB_REPO_NAME, 'title': title, 'body': description, 'labels': [_AWAITING_TRIAGE_LABEL, _PERF_ALERT_LABEL] @@ -108,20 +108,20 @@ def comment_on_issue(issue_number: int, issue, and the comment URL. """ url = 'https://api.github.com/repos/{}/{}/issues/{}'.format( - _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number) + _GITHUB_REPO_OWNER, _GITHUB_REPO_NAME, issue_number) open_issue_response = requests.get( url, json.dumps({ - 'owner': _BEAM_GITHUB_REPO_OWNER, - 'repo': _BEAM_GITHUB_REPO_NAME, + 'owner': _GITHUB_REPO_OWNER, + 'repo': _GITHUB_REPO_NAME, 'issue_number': issue_number }, default=str), headers=_HEADERS).json() if open_issue_response['state'] == 'open': data = { - 'owner': _BEAM_GITHUB_REPO_OWNER, - 'repo': _BEAM_GITHUB_REPO_NAME, + 'owner': _GITHUB_REPO_OWNER, + 'repo': _GITHUB_REPO_NAME, 'body': comment_description, issue_number: issue_number, } @@ -134,7 +134,7 @@ def comment_on_issue(issue_number: int, def add_awaiting_triage_label(issue_number: int): url = 'https://api.github.com/repos/{}/{}/issues/{}/labels'.format( - _BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number) + _GITHUB_REPO_OWNER, _GITHUB_REPO_NAME, issue_number) requests.post( url, json.dumps({'labels': [_AWAITING_TRIAGE_LABEL]}), headers=_HEADERS) From cf5aa74b47622085bdd596055cb16b7abd41e365 Mon Sep 17 00:00:00 2001 From: AnandInguva Date: Tue, 26 Sep 2023 18:29:34 -0400 Subject: [PATCH 03/13] Add test_name, test_id --- .../testing/analyzers/github_issues_utils.py | 14 ++++++--- .../testing/analyzers/perf_analysis.py | 31 ++++++++++++------- .../testing/analyzers/perf_analysis_test.py | 9 +++--- .../testing/analyzers/perf_analysis_utils.py | 31 +++++++++++++------ .../testing/analyzers/tests_config.yaml | 2 +- 5 files changed, 56 insertions(+), 31 deletions(-) diff --git a/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py b/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py index 7df4c7f6a167..82758be8f180 100644 --- a/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py +++ b/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py @@ -34,7 +34,7 @@ 'A Github Personal Access token is required ' 'to create Github Issues.') -_GITHUB_REPO_OWNER = os.environ.get('REPO_OWNER', 'AnandInguva') +_GITHUB_REPO_OWNER = os.environ.get('REPO_OWNER', 'apache') _GITHUB_REPO_NAME = os.environ.get('REPO_NAME', 'beam') # Adding GitHub Rest API version to the header to maintain version stability. # For more information, please look at @@ -140,7 +140,8 @@ def add_awaiting_triage_label(issue_number: int): def get_issue_description( - test_name: str, + test_id: str, + test_name: Optional[str], metric_name: str, timestamps: List[pd.Timestamp], metric_values: List, @@ -167,10 +168,13 @@ def get_issue_description( description = [] - description.append(_ISSUE_DESCRIPTION_TEMPLATE.format(test_name, metric_name)) + description.append(_ISSUE_DESCRIPTION_TEMPLATE.format(test_id, metric_name)) - description.append(("`Test description:` " + - f'{test_description}') if test_description else '') + if test_name: + description.append(("`test_name:` " + f'{test_name}')) + + if test_description: + description.append(("`Test description:` " + f'{test_description}')) description.append('```') diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py index c2118d1e51e5..9642547f1e94 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py @@ -23,7 +23,6 @@ import argparse import logging import os -import uuid from datetime import datetime from datetime import timezone from typing import Any @@ -47,7 +46,7 @@ def run_change_point_analysis( - params, test_name, big_query_metrics_fetcher: MetricsFetcher): + params, test_id, big_query_metrics_fetcher: MetricsFetcher): """ Args: params: Dict containing parameters to run change point analysis. @@ -57,14 +56,21 @@ def run_change_point_analysis( Returns: bool indicating if a change point is observed and alerted on GitHub. """ - logging.info("Running change point analysis for test %s" % test_name) + logging.info("Running change point analysis for test ID %s" % test_id) if not validate_config(params.keys()): raise ValueError( f"Please make sure all these keys {constants._PERF_TEST_KEYS} " - f"are specified for the {test_name}") + f"are specified for the {test_id}") metric_name = params['metric_name'] + # test_name will be used to query a single test from + # multiple tests in a single BQ table. Right now, the default + # assumption is that all the test have an individual BQ table + # but this might not be case for other tests(such as IO tests where + # a single BQ tables stores all the data) + test_name = params.get('test_name', None) + min_runs_between_change_points = ( constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS) if 'min_runs_between_change_points' in params: @@ -85,7 +91,7 @@ def run_change_point_analysis( change_point_index = find_latest_change_point_index( metric_values=metric_values) if not change_point_index: - logging.info("Change point is not detected for the test %s" % test_name) + logging.info("Change point is not detected for the test ID %s" % test_id) return False # since timestamps are ordered in ascending order and # num_runs_in_change_point_window refers to the latest runs, @@ -95,11 +101,11 @@ def run_change_point_analysis( if not is_change_point_in_valid_window(num_runs_in_change_point_window, latest_change_point_run): logging.info( - 'Performance regression/improvement found for the test: %s. ' + 'Performance regression/improvement found for the test ID: %s. ' 'on metric %s. Since the change point run %s ' 'lies outside the num_runs_in_change_point_window distance: %s, ' 'alert is not raised.' % ( - test_name, + test_id, metric_name, latest_change_point_run + 1, num_runs_in_change_point_window)) @@ -127,20 +133,21 @@ def run_change_point_analysis( min_runs_between_change_points=min_runs_between_change_points) if is_alert: issue_number, issue_url = create_performance_alert( - metric_name, test_name, timestamps, + metric_name, test_id, timestamps, metric_values, change_point_index, params.get('labels', None), last_reported_issue_number, test_description = params.get('test_description', None), + test_name = test_name ) issue_metadata = GitHubIssueMetaData( issue_timestamp=pd.Timestamp( datetime.now().replace(tzinfo=timezone.utc)), # BQ doesn't allow '.' in table name - test_name=test_name.replace('.', '_'), + test_id=test_id.replace('.', '_'), + test_name=test_name, metric_name=metric_name, - test_id=uuid.uuid4().hex, change_point=metric_values[change_point_index], issue_number=issue_number, issue_url=issue_url, @@ -177,10 +184,10 @@ def run( if not big_query_metrics_fetcher: big_query_metrics_fetcher = BigQueryMetricsFetcher() - for test_name, params in tests_config.items(): + for test_id, params in tests_config.items(): run_change_point_analysis( params=params, - test_name=test_name, + test_id=test_id, big_query_metrics_fetcher=big_query_metrics_fetcher) diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py index 3c934c295e72..35c1d4d9bbb8 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py @@ -159,7 +159,7 @@ def test_duplicate_change_points_are_not_valid_alerts(self): def test_no_alerts_when_no_change_points(self): is_alert = analysis.run_change_point_analysis( params=self.params, - test_name=self.test_id, + test_id=self.test_id, big_query_metrics_fetcher=BigQueryMetricsFetcher()) self.assertFalse(is_alert) @@ -181,7 +181,7 @@ def test_no_alerts_when_no_change_points(self): def test_alert_on_data_with_change_point(self, *args): is_alert = analysis.run_change_point_analysis( params=self.params, - test_name=self.test_id, + test_id=self.test_id, big_query_metrics_fetcher=BigQueryMetricsFetcher()) self.assertTrue(is_alert) @@ -202,7 +202,7 @@ def test_alert_on_data_with_change_point(self, *args): def test_alert_on_data_with_reported_change_point(self, *args): is_alert = analysis.run_change_point_analysis( params=self.params, - test_name=self.test_id, + test_id=self.test_id, big_query_metrics_fetcher=BigQueryMetricsFetcher()) self.assertFalse(is_alert) @@ -212,7 +212,8 @@ def test_change_point_has_anomaly_marker_in_gh_description(self): change_point_index = find_latest_change_point_index(metric_values) description = github_issues_utils.get_issue_description( - test_name=self.test_id, + test_id=self.test_id, + test_name=self.params.get('test_name', None), test_description=self.params['test_description'], metric_name=self.params['metric_name'], metric_values=metric_values, diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py index d474ca5c5095..3009e4d9889b 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py @@ -59,7 +59,7 @@ def is_change_point_in_valid_window( return num_runs_in_change_point_window > latest_change_point_run -def get_existing_issues_data(table_name: str, ) -> Optional[pd.DataFrame]: +def get_existing_issues_data(table_name: str) -> Optional[pd.DataFrame]: """ Finds the most recent GitHub issue created for the test_name. If no table found with name=test_name, return (None, None) @@ -148,7 +148,7 @@ def find_latest_change_point_index(metric_values: List[Union[float, int]]): def publish_issue_metadata_to_big_query(issue_metadata, table_name): """ - Published issue_metadata to BigQuery with table name=test_name. + Published issue_metadata to BigQuery with table name. """ bq_metrics_publisher = BigQueryMetricsPublisher( project_name=constants._BQ_PROJECT_NAME, @@ -163,18 +163,21 @@ def publish_issue_metadata_to_big_query(issue_metadata, table_name): def create_performance_alert( metric_name: str, - test_name: str, + test_id: str, timestamps: List[pd.Timestamp], metric_values: List[Union[int, float]], change_point_index: int, labels: List[str], existing_issue_number: Optional[int], - test_description: Optional[str] = None) -> Tuple[int, str]: + test_description: Optional[str] = None, + test_name: Optional[str] = None, +) -> Tuple[int, str]: """ Creates performance alert on GitHub issues and returns GitHub issue number and issue URL. """ description = github_issues_utils.get_issue_description( + test_id=test_id, test_name=test_name, test_description=test_description, metric_name=metric_name, @@ -186,7 +189,7 @@ def create_performance_alert( issue_number, issue_url = github_issues_utils.report_change_point_on_issues( title=github_issues_utils._ISSUE_TITLE_TEMPLATE.format( - test_name, metric_name + test_id, metric_name ), description=description, labels=labels, @@ -231,8 +234,12 @@ def filter_change_points_by_median_threshold( class MetricsFetcher: @abc.abstractmethod def fetch_metric_data( - self, project, metrics_dataset, metrics_table, - metric_name) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: + self, + project, + metrics_dataset, + metrics_table, + metric_name, + test_name=None) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: """ Define schema and fetch the timestamp values and metric values from BigQuery. @@ -245,8 +252,14 @@ def __init__(self, query: Optional[str] = None): self.query = query def fetch_metric_data( - self, *, project, metrics_dataset, metrics_table, - metric_name) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: + self, + *, + project, + metrics_dataset, + metrics_table, + metric_name, + test_name=None, + ) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: """ Args: params: Dict containing keys required to fetch data from a data source. diff --git a/sdks/python/apache_beam/testing/analyzers/tests_config.yaml b/sdks/python/apache_beam/testing/analyzers/tests_config.yaml index bc74f292c487..a3829f93c83c 100644 --- a/sdks/python/apache_beam/testing/analyzers/tests_config.yaml +++ b/sdks/python/apache_beam/testing/analyzers/tests_config.yaml @@ -16,7 +16,7 @@ # # for the unique key to define a test, please use the following format: -# {test_name}-{metric_name} +# {test_id}-{metric_name} pytorch_image_classification_benchmarks-resnet152-mean_inference_batch_latency_micro_secs: test_description: From ff23d9e2ff10e0366ef456c2bb4dde3ce4c0dc97 Mon Sep 17 00:00:00 2001 From: AnandInguva Date: Wed, 27 Sep 2023 17:44:18 -0400 Subject: [PATCH 04/13] Move client to the fetch method --- .../testing/analyzers/perf_analysis_utils.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py index 3009e4d9889b..e9a232c4b812 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py @@ -247,10 +247,6 @@ def fetch_metric_data( class BigQueryMetricsFetcher: - def __init__(self, query: Optional[str] = None): - self.client = bigquery.Client() - self.query = query - def fetch_metric_data( self, *, @@ -268,16 +264,15 @@ def fetch_metric_data( of metric_values and list of timestamps. Both are sorted in ascending order wrt timestamps. """ - if not self.query: - self.query = f""" + query = f""" SELECT * FROM {project}.{metrics_dataset}.{metrics_table} WHERE CONTAINS_SUBSTR(({load_test_metrics_utils.METRICS_TYPE_LABEL}), '{metric_name}') ORDER BY {load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL} DESC LIMIT {constants._NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS} """ - # metric_data: pd.DataFrame = self.fetch(query=query) - query_job = self.client.query(query=self.query) + client = bigquery.Client() + query_job = client.query(query=query) metric_data = query_job.result().to_dataframe() metric_data.sort_values( by=[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL], inplace=True) From 1f2e2d427dd94f08e1b46a7d427b4b03ed3b849c Mon Sep 17 00:00:00 2001 From: AnandInguva Date: Thu, 28 Sep 2023 12:00:58 -0400 Subject: [PATCH 05/13] Update skip condition --- .../apache_beam/testing/analyzers/perf_analysis_test.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py index 35c1d4d9bbb8..9c7921300d9d 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py @@ -42,7 +42,7 @@ from apache_beam.testing.analyzers.perf_analysis_utils import validate_config from apache_beam.testing.load_tests import load_test_metrics_utils except ImportError as e: - analysis = None # type: ignore + raise unittest.SkipTest('Missing dependencies to run perf analysis tests.') # mock methods. @@ -70,10 +70,6 @@ def get_existing_issue_data(**kwargs): }]) -@unittest.skipIf( - analysis is None, - 'Missing dependencies. ' - 'Test dependencies are missing for the Analyzer.') class TestChangePointAnalysis(unittest.TestCase): def setUp(self) -> None: self.single_change_point_series = [0] * 10 + [1] * 10 From 36ee55f86ff73afc34916ac361b5450e85df2c7e Mon Sep 17 00:00:00 2001 From: AnandInguva Date: Thu, 28 Sep 2023 12:51:04 -0400 Subject: [PATCH 06/13] Run on self hosted runner --- .github/workflows/run_perf_alert_tool.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/run_perf_alert_tool.yml b/.github/workflows/run_perf_alert_tool.yml index 6946011f0617..1bd8d525c2fb 100644 --- a/.github/workflows/run_perf_alert_tool.yml +++ b/.github/workflows/run_perf_alert_tool.yml @@ -30,7 +30,7 @@ on: jobs: python_run_change_point_analysis: name: Run Change Point Analysis. - runs-on: ubuntu-latest + runs-on: [self-hosted, ubuntu-20.04, main] permissions: issues: write steps: From 4226322f87039be5a28922a5af7d5bfb715c2dcb Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Thu, 28 Sep 2023 17:17:04 -0400 Subject: [PATCH 07/13] Update readme --- .../apache_beam/testing/analyzers/README.md | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/testing/analyzers/README.md b/sdks/python/apache_beam/testing/analyzers/README.md index 076f173f9d71..762ea9dcd881 100644 --- a/sdks/python/apache_beam/testing/analyzers/README.md +++ b/sdks/python/apache_beam/testing/analyzers/README.md @@ -55,11 +55,15 @@ test_1: num_runs_in_change_point_window: 30 # optional parameter ``` -**NOTE**: `test_target` is optional. It is used for identifying the test that was causing the regression. +#### Optional Parameters: -**Note**: By default, the tool fetches metrics from BigQuery tables. `metrics_dataset`, `metrics_table`, `project` and `metric_name` should match with the values defined for performance/load tests. -The above example uses this [test configuration](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30) -to fill up the values required to fetch the data from source. +- `test_target`: Identifies the test responsible for the regression. + +- `test_description`: Provides a brief overview of the test's function. + +- `test_name`: Denotes the name of the test as stored in the BigQuery table. + +**Note**: The tool, by default, pulls metrics from BigQuery tables. Ensure that the values for `metrics_dataset`, `metrics_table`, `project`, and `metric_name` align with those defined for performance/load tests. The provided example utilizes this [test configuration](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30) to populate the necessary values for data retrieval. ### Different ways to avoid false positive change points @@ -79,6 +83,22 @@ setting `num_runs_in_change_point_window=7` will achieve it. If a new test needs to be registered for the performance alerting tool, please add the required test parameters to the config file. + +### Integrating the Perf Alert Tool with a Custom BigQuery Schema + +By default, the Perf Alert Tool retrieves metrics from the `apache-beam-testing` BigQuery projects. All performance and load tests within Beam utilize a standard [schema](https://github.com/apache/beam/blob/a7e12db9b5977c4a7b13554605c0300389a3d6da/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py#L70) for metrics publication. The tool inherently recognizes and operates with this schema when extracting metrics from BigQuery tables. + +To fetch the data from a BigQuery dataset that is not a default setting of the Apache Beam's setting, One can inherit the `MetricsFetcher` class and implement the abstract method `fetch_metric_data`. This method should return a tuple of desired metric values and timestamps of the metric values of when it was published. + +``` +from apache_beam.testing.analyzers import perf_analysis +config_file_path = +my_metric_fetcher = MyMetricsFetcher() # inherited from MetricsFetcher +perf_analysis.run(config_file_path, my_metrics_fetcher) +``` + +``Note``: The metrics and timestamps should be sorted based on the timestamps values in ascending order. + ## Triage performance alert issues All the performance/load tests metrics defined at [beam/.test-infra/jenkins](https://github.com/apache/beam/tree/master/.test-infra/jenkins) are imported to [Grafana dashboards](http://metrics.beam.apache.org/d/1/getting-started?orgId=1) for visualization. Please From 5f28e9712fa9bd79415295424bafbafadd41ffa0 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Thu, 28 Sep 2023 19:45:30 -0400 Subject: [PATCH 08/13] Update README --- .../apache_beam/testing/analyzers/README.md | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/testing/analyzers/README.md b/sdks/python/apache_beam/testing/analyzers/README.md index 762ea9dcd881..91b21076f88a 100644 --- a/sdks/python/apache_beam/testing/analyzers/README.md +++ b/sdks/python/apache_beam/testing/analyzers/README.md @@ -35,16 +35,13 @@ update already created GitHub issue or ignore performance alert by not creating ## Config file structure -The config file defines the structure to run change point analysis on a given test. To add a test to the config file, +The yaml defines the structure to run change point analysis on a given test. To add a test config to the yaml file, please follow the below structure. -**NOTE**: The Change point analysis only supports reading the metric data from Big Query for now. +**NOTE**: The Change point analysis only supports reading the metric data from `BigQuery` only. ``` -# the test_1 must be a unique id. -test_1: - test_description: Pytorch image classification on 50k images of size 224 x 224 with resnet 152 - test_target: apache_beam.testing.benchmarks.inference.pytorch_image_classification_benchmarks +test_1: # a unique id for each test config. metrics_dataset: beam_run_inference metrics_table: torch_inference_imagenet_results_resnet152 project: apache-beam-testing @@ -57,6 +54,8 @@ test_1: #### Optional Parameters: +These are the optional parameters that can be added to the test config in addition to the parameters mentioned above. + - `test_target`: Identifies the test responsible for the regression. - `test_description`: Provides a brief overview of the test's function. @@ -80,11 +79,13 @@ setting `num_runs_in_change_point_window=7` will achieve it. ## Register a test for performance alerts -If a new test needs to be registered for the performance alerting tool, please add the required test parameters to the -config file. +If a new test needs to be registered for the performance alerting tool, + +- You can either add it to the config file that is already present. +- You can define your own yaml file and call the [perf_analysis.run()](https://github.com/apache/beam/blob/a46bc12a256dcaa3ae2cc9e5d6fdcaa82b59738b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py#L152) method. -### Integrating the Perf Alert Tool with a Custom BigQuery Schema +## Integrating the Perf Alert Tool with a Custom BigQuery Schema By default, the Perf Alert Tool retrieves metrics from the `apache-beam-testing` BigQuery projects. All performance and load tests within Beam utilize a standard [schema](https://github.com/apache/beam/blob/a7e12db9b5977c4a7b13554605c0300389a3d6da/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py#L70) for metrics publication. The tool inherently recognizes and operates with this schema when extracting metrics from BigQuery tables. @@ -99,6 +100,15 @@ perf_analysis.run(config_file_path, my_metrics_fetcher) ``Note``: The metrics and timestamps should be sorted based on the timestamps values in ascending order. +### Configuring GitHub Parameters + +Out of the box, the performance alert tool targets the `apache/beam` repository when raising issues. If you wish to utilize this tool for another repository, you'll need to pre-set a couple of environment variables: + +- `REPO_OWNER`: Represents the owner of the repository. (e.g., `apache`) +- `REPO_NAME`: Specifies the repository name itself. (e.g., `beam`) + +Before initiating the tool, also ensure that the `GITHUB_TOKEN` is set to an authenticated GitHub token. This permits the tool to generate GitHub issues whenever performance alerts arise. + ## Triage performance alert issues All the performance/load tests metrics defined at [beam/.test-infra/jenkins](https://github.com/apache/beam/tree/master/.test-infra/jenkins) are imported to [Grafana dashboards](http://metrics.beam.apache.org/d/1/getting-started?orgId=1) for visualization. Please From 85c46c08506649a44f9f7e4455cefd7cfd8fe305 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Thu, 28 Sep 2023 19:52:53 -0400 Subject: [PATCH 09/13] Pass test_name to the metrics_fetcher --- sdks/python/apache_beam/testing/analyzers/perf_analysis.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py index 9642547f1e94..ab1523e879cd 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py @@ -85,7 +85,8 @@ def run_change_point_analysis( project=params['project'], metrics_dataset=params['metrics_dataset'], metrics_table=params['metrics_table'], - metric_name=params['metric_name'] + metric_name=params['metric_name'], + test_name=test_name ) change_point_index = find_latest_change_point_index( From 80ca6855cd9e80b30d152330c7b958984f3e4d43 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Thu, 28 Sep 2023 20:21:58 -0400 Subject: [PATCH 10/13] Fix linting issues --- .../apache_beam/testing/analyzers/perf_analysis.py | 9 ++++----- .../apache_beam/testing/analyzers/perf_analysis_utils.py | 4 ++-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py index ab1523e879cd..397e40b30d89 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py @@ -32,9 +32,9 @@ import pandas as pd from apache_beam.testing.analyzers import constants -from apache_beam.testing.analyzers.perf_analysis_utils import MetricsFetcher -from apache_beam.testing.analyzers.perf_analysis_utils import GitHubIssueMetaData from apache_beam.testing.analyzers.perf_analysis_utils import BigQueryMetricsFetcher +from apache_beam.testing.analyzers.perf_analysis_utils import GitHubIssueMetaData +from apache_beam.testing.analyzers.perf_analysis_utils import MetricsFetcher from apache_beam.testing.analyzers.perf_analysis_utils import create_performance_alert from apache_beam.testing.analyzers.perf_analysis_utils import find_latest_change_point_index from apache_beam.testing.analyzers.perf_analysis_utils import get_existing_issues_data @@ -116,8 +116,7 @@ def run_change_point_analysis( last_reported_issue_number = None issue_metadata_table_name = f'{params.get("metrics_table")}_{metric_name}' existing_issue_data = get_existing_issues_data( - table_name=issue_metadata_table_name, - big_query_metrics_fetcher=big_query_metrics_fetcher) + table_name=issue_metadata_table_name) if existing_issue_data is not None: existing_issue_timestamps = existing_issue_data[ @@ -183,7 +182,7 @@ def run( tests_config: Dict[str, Dict[str, Any]] = read_test_config(config_file_path) if not big_query_metrics_fetcher: - big_query_metrics_fetcher = BigQueryMetricsFetcher() + big_query_metrics_fetcher: MetricsFetcher = BigQueryMetricsFetcher() for test_id, params in tests_config.items(): run_change_point_analysis( diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py index e9a232c4b812..3ccf2c14b554 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py @@ -241,8 +241,8 @@ def fetch_metric_data( metric_name, test_name=None) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: """ - Define schema and fetch the timestamp values and metric values - from BigQuery. + Define SQL query and fetch the timestamp values and metric values + from BigQuery tables. """ From c3f655674a942e4f8c3a7bfa15148703582432e3 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Thu, 28 Sep 2023 22:16:06 -0400 Subject: [PATCH 11/13] Fix lint --- sdks/python/apache_beam/testing/analyzers/perf_analysis.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py index 397e40b30d89..5c4eec22b5f8 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py @@ -160,8 +160,9 @@ def run_change_point_analysis( def run( + big_query_metrics_fetcher: MetricsFetcher = BigQueryMetricsFetcher(), config_file_path: Optional[str] = None, - big_query_metrics_fetcher: Optional[MetricsFetcher] = None) -> None: + ) -> None: """ run is the entry point to run change point analysis on test metric data, which is read from config file, and if there is a performance @@ -181,8 +182,6 @@ def run( tests_config: Dict[str, Dict[str, Any]] = read_test_config(config_file_path) - if not big_query_metrics_fetcher: - big_query_metrics_fetcher: MetricsFetcher = BigQueryMetricsFetcher() for test_id, params in tests_config.items(): run_change_point_analysis( From 34c525658917cac81d34e1d6e59ebce1d2502d4d Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Wed, 4 Oct 2023 23:35:51 -0400 Subject: [PATCH 12/13] Fix formatting and lint issues --- sdks/python/apache_beam/testing/analyzers/perf_analysis.py | 3 +-- .../apache_beam/testing/analyzers/perf_analysis_utils.py | 5 +++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py index 5c4eec22b5f8..c86ecb2c4e20 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py @@ -162,7 +162,7 @@ def run_change_point_analysis( def run( big_query_metrics_fetcher: MetricsFetcher = BigQueryMetricsFetcher(), config_file_path: Optional[str] = None, - ) -> None: +) -> None: """ run is the entry point to run change point analysis on test metric data, which is read from config file, and if there is a performance @@ -182,7 +182,6 @@ def run( tests_config: Dict[str, Dict[str, Any]] = read_test_config(config_file_path) - for test_id, params in tests_config.items(): run_change_point_analysis( params=params, diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py index 3ccf2c14b554..e8db128ba9f1 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py @@ -231,7 +231,7 @@ def filter_change_points_by_median_threshold( return valid_change_points -class MetricsFetcher: +class MetricsFetcher(metaclass=abc.ABCMeta): @abc.abstractmethod def fetch_metric_data( self, @@ -244,9 +244,10 @@ def fetch_metric_data( Define SQL query and fetch the timestamp values and metric values from BigQuery tables. """ + raise NotImplementedError -class BigQueryMetricsFetcher: +class BigQueryMetricsFetcher(MetricsFetcher): def fetch_metric_data( self, *, From 5d35e4e995ceda5acc7e8d0500599e2b2c5e15cb Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Thu, 5 Oct 2023 10:11:06 -0400 Subject: [PATCH 13/13] fix lint --- sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py index e8db128ba9f1..f9604c490fc0 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py @@ -235,6 +235,7 @@ class MetricsFetcher(metaclass=abc.ABCMeta): @abc.abstractmethod def fetch_metric_data( self, + *, project, metrics_dataset, metrics_table,