From cf5aa74b47622085bdd596055cb16b7abd41e365 Mon Sep 17 00:00:00 2001 From: AnandInguva Date: Tue, 26 Sep 2023 18:29:34 -0400 Subject: [PATCH] Add test_name, test_id --- .../testing/analyzers/github_issues_utils.py | 14 ++++++--- .../testing/analyzers/perf_analysis.py | 31 ++++++++++++------- .../testing/analyzers/perf_analysis_test.py | 9 +++--- .../testing/analyzers/perf_analysis_utils.py | 31 +++++++++++++------ .../testing/analyzers/tests_config.yaml | 2 +- 5 files changed, 56 insertions(+), 31 deletions(-) diff --git a/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py b/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py index 7df4c7f6a167..82758be8f180 100644 --- a/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py +++ b/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py @@ -34,7 +34,7 @@ 'A Github Personal Access token is required ' 'to create Github Issues.') -_GITHUB_REPO_OWNER = os.environ.get('REPO_OWNER', 'AnandInguva') +_GITHUB_REPO_OWNER = os.environ.get('REPO_OWNER', 'apache') _GITHUB_REPO_NAME = os.environ.get('REPO_NAME', 'beam') # Adding GitHub Rest API version to the header to maintain version stability. # For more information, please look at @@ -140,7 +140,8 @@ def add_awaiting_triage_label(issue_number: int): def get_issue_description( - test_name: str, + test_id: str, + test_name: Optional[str], metric_name: str, timestamps: List[pd.Timestamp], metric_values: List, @@ -167,10 +168,13 @@ def get_issue_description( description = [] - description.append(_ISSUE_DESCRIPTION_TEMPLATE.format(test_name, metric_name)) + description.append(_ISSUE_DESCRIPTION_TEMPLATE.format(test_id, metric_name)) - description.append(("`Test description:` " + - f'{test_description}') if test_description else '') + if test_name: + description.append(("`test_name:` " + f'{test_name}')) + + if test_description: + description.append(("`Test description:` " + f'{test_description}')) description.append('```') diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py index c2118d1e51e5..9642547f1e94 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py @@ -23,7 +23,6 @@ import argparse import logging import os -import uuid from datetime import datetime from datetime import timezone from typing import Any @@ -47,7 +46,7 @@ def run_change_point_analysis( - params, test_name, big_query_metrics_fetcher: MetricsFetcher): + params, test_id, big_query_metrics_fetcher: MetricsFetcher): """ Args: params: Dict containing parameters to run change point analysis. @@ -57,14 +56,21 @@ def run_change_point_analysis( Returns: bool indicating if a change point is observed and alerted on GitHub. """ - logging.info("Running change point analysis for test %s" % test_name) + logging.info("Running change point analysis for test ID %s" % test_id) if not validate_config(params.keys()): raise ValueError( f"Please make sure all these keys {constants._PERF_TEST_KEYS} " - f"are specified for the {test_name}") + f"are specified for the {test_id}") metric_name = params['metric_name'] + # test_name will be used to query a single test from + # multiple tests in a single BQ table. Right now, the default + # assumption is that all the test have an individual BQ table + # but this might not be case for other tests(such as IO tests where + # a single BQ tables stores all the data) + test_name = params.get('test_name', None) + min_runs_between_change_points = ( constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS) if 'min_runs_between_change_points' in params: @@ -85,7 +91,7 @@ def run_change_point_analysis( change_point_index = find_latest_change_point_index( metric_values=metric_values) if not change_point_index: - logging.info("Change point is not detected for the test %s" % test_name) + logging.info("Change point is not detected for the test ID %s" % test_id) return False # since timestamps are ordered in ascending order and # num_runs_in_change_point_window refers to the latest runs, @@ -95,11 +101,11 @@ def run_change_point_analysis( if not is_change_point_in_valid_window(num_runs_in_change_point_window, latest_change_point_run): logging.info( - 'Performance regression/improvement found for the test: %s. ' + 'Performance regression/improvement found for the test ID: %s. ' 'on metric %s. Since the change point run %s ' 'lies outside the num_runs_in_change_point_window distance: %s, ' 'alert is not raised.' % ( - test_name, + test_id, metric_name, latest_change_point_run + 1, num_runs_in_change_point_window)) @@ -127,20 +133,21 @@ def run_change_point_analysis( min_runs_between_change_points=min_runs_between_change_points) if is_alert: issue_number, issue_url = create_performance_alert( - metric_name, test_name, timestamps, + metric_name, test_id, timestamps, metric_values, change_point_index, params.get('labels', None), last_reported_issue_number, test_description = params.get('test_description', None), + test_name = test_name ) issue_metadata = GitHubIssueMetaData( issue_timestamp=pd.Timestamp( datetime.now().replace(tzinfo=timezone.utc)), # BQ doesn't allow '.' in table name - test_name=test_name.replace('.', '_'), + test_id=test_id.replace('.', '_'), + test_name=test_name, metric_name=metric_name, - test_id=uuid.uuid4().hex, change_point=metric_values[change_point_index], issue_number=issue_number, issue_url=issue_url, @@ -177,10 +184,10 @@ def run( if not big_query_metrics_fetcher: big_query_metrics_fetcher = BigQueryMetricsFetcher() - for test_name, params in tests_config.items(): + for test_id, params in tests_config.items(): run_change_point_analysis( params=params, - test_name=test_name, + test_id=test_id, big_query_metrics_fetcher=big_query_metrics_fetcher) diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py index 3c934c295e72..35c1d4d9bbb8 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py @@ -159,7 +159,7 @@ def test_duplicate_change_points_are_not_valid_alerts(self): def test_no_alerts_when_no_change_points(self): is_alert = analysis.run_change_point_analysis( params=self.params, - test_name=self.test_id, + test_id=self.test_id, big_query_metrics_fetcher=BigQueryMetricsFetcher()) self.assertFalse(is_alert) @@ -181,7 +181,7 @@ def test_no_alerts_when_no_change_points(self): def test_alert_on_data_with_change_point(self, *args): is_alert = analysis.run_change_point_analysis( params=self.params, - test_name=self.test_id, + test_id=self.test_id, big_query_metrics_fetcher=BigQueryMetricsFetcher()) self.assertTrue(is_alert) @@ -202,7 +202,7 @@ def test_alert_on_data_with_change_point(self, *args): def test_alert_on_data_with_reported_change_point(self, *args): is_alert = analysis.run_change_point_analysis( params=self.params, - test_name=self.test_id, + test_id=self.test_id, big_query_metrics_fetcher=BigQueryMetricsFetcher()) self.assertFalse(is_alert) @@ -212,7 +212,8 @@ def test_change_point_has_anomaly_marker_in_gh_description(self): change_point_index = find_latest_change_point_index(metric_values) description = github_issues_utils.get_issue_description( - test_name=self.test_id, + test_id=self.test_id, + test_name=self.params.get('test_name', None), test_description=self.params['test_description'], metric_name=self.params['metric_name'], metric_values=metric_values, diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py index d474ca5c5095..3009e4d9889b 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py @@ -59,7 +59,7 @@ def is_change_point_in_valid_window( return num_runs_in_change_point_window > latest_change_point_run -def get_existing_issues_data(table_name: str, ) -> Optional[pd.DataFrame]: +def get_existing_issues_data(table_name: str) -> Optional[pd.DataFrame]: """ Finds the most recent GitHub issue created for the test_name. If no table found with name=test_name, return (None, None) @@ -148,7 +148,7 @@ def find_latest_change_point_index(metric_values: List[Union[float, int]]): def publish_issue_metadata_to_big_query(issue_metadata, table_name): """ - Published issue_metadata to BigQuery with table name=test_name. + Published issue_metadata to BigQuery with table name. """ bq_metrics_publisher = BigQueryMetricsPublisher( project_name=constants._BQ_PROJECT_NAME, @@ -163,18 +163,21 @@ def publish_issue_metadata_to_big_query(issue_metadata, table_name): def create_performance_alert( metric_name: str, - test_name: str, + test_id: str, timestamps: List[pd.Timestamp], metric_values: List[Union[int, float]], change_point_index: int, labels: List[str], existing_issue_number: Optional[int], - test_description: Optional[str] = None) -> Tuple[int, str]: + test_description: Optional[str] = None, + test_name: Optional[str] = None, +) -> Tuple[int, str]: """ Creates performance alert on GitHub issues and returns GitHub issue number and issue URL. """ description = github_issues_utils.get_issue_description( + test_id=test_id, test_name=test_name, test_description=test_description, metric_name=metric_name, @@ -186,7 +189,7 @@ def create_performance_alert( issue_number, issue_url = github_issues_utils.report_change_point_on_issues( title=github_issues_utils._ISSUE_TITLE_TEMPLATE.format( - test_name, metric_name + test_id, metric_name ), description=description, labels=labels, @@ -231,8 +234,12 @@ def filter_change_points_by_median_threshold( class MetricsFetcher: @abc.abstractmethod def fetch_metric_data( - self, project, metrics_dataset, metrics_table, - metric_name) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: + self, + project, + metrics_dataset, + metrics_table, + metric_name, + test_name=None) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: """ Define schema and fetch the timestamp values and metric values from BigQuery. @@ -245,8 +252,14 @@ def __init__(self, query: Optional[str] = None): self.query = query def fetch_metric_data( - self, *, project, metrics_dataset, metrics_table, - metric_name) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: + self, + *, + project, + metrics_dataset, + metrics_table, + metric_name, + test_name=None, + ) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: """ Args: params: Dict containing keys required to fetch data from a data source. diff --git a/sdks/python/apache_beam/testing/analyzers/tests_config.yaml b/sdks/python/apache_beam/testing/analyzers/tests_config.yaml index bc74f292c487..a3829f93c83c 100644 --- a/sdks/python/apache_beam/testing/analyzers/tests_config.yaml +++ b/sdks/python/apache_beam/testing/analyzers/tests_config.yaml @@ -16,7 +16,7 @@ # # for the unique key to define a test, please use the following format: -# {test_name}-{metric_name} +# {test_id}-{metric_name} pytorch_image_classification_benchmarks-resnet152-mean_inference_batch_latency_micro_secs: test_description: