From 5d292a9a7d514c843c60d80b15db92301c853096 Mon Sep 17 00:00:00 2001 From: AnandInguva Date: Tue, 26 Sep 2023 15:40:22 -0400 Subject: [PATCH] Support for custom BigQueryMetricsFetcher --- .../testing/analyzers/perf_analysis.py | 22 +++-- .../testing/analyzers/perf_analysis_test.py | 26 +++--- .../testing/analyzers/perf_analysis_utils.py | 86 +++++++++++-------- .../load_tests/load_test_metrics_utils.py | 11 --- 4 files changed, 81 insertions(+), 64 deletions(-) diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py index 7f1ffbb944e9..c2118d1e51e5 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py @@ -33,9 +33,10 @@ import pandas as pd from apache_beam.testing.analyzers import constants +from apache_beam.testing.analyzers.perf_analysis_utils import MetricsFetcher from apache_beam.testing.analyzers.perf_analysis_utils import GitHubIssueMetaData +from apache_beam.testing.analyzers.perf_analysis_utils import BigQueryMetricsFetcher from apache_beam.testing.analyzers.perf_analysis_utils import create_performance_alert -from apache_beam.testing.analyzers.perf_analysis_utils import fetch_metric_data from apache_beam.testing.analyzers.perf_analysis_utils import find_latest_change_point_index from apache_beam.testing.analyzers.perf_analysis_utils import get_existing_issues_data from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window @@ -43,10 +44,10 @@ from apache_beam.testing.analyzers.perf_analysis_utils import publish_issue_metadata_to_big_query from apache_beam.testing.analyzers.perf_analysis_utils import read_test_config from apache_beam.testing.analyzers.perf_analysis_utils import validate_config -from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsFetcher -def run_change_point_analysis(params, test_name, big_query_metrics_fetcher): +def run_change_point_analysis( + params, test_name, big_query_metrics_fetcher: MetricsFetcher): """ Args: params: Dict containing parameters to run change point analysis. @@ -74,9 +75,11 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher): if 'num_runs_in_change_point_window' in params: num_runs_in_change_point_window = params['num_runs_in_change_point_window'] - metric_values, timestamps = fetch_metric_data( - params=params, - big_query_metrics_fetcher=big_query_metrics_fetcher + metric_values, timestamps = big_query_metrics_fetcher.fetch_metric_data( + project=params['project'], + metrics_dataset=params['metrics_dataset'], + metrics_table=params['metrics_table'], + metric_name=params['metric_name'] ) change_point_index = find_latest_change_point_index( @@ -149,7 +152,9 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher): return is_alert -def run(config_file_path: Optional[str] = None) -> None: +def run( + config_file_path: Optional[str] = None, + big_query_metrics_fetcher: Optional[MetricsFetcher] = None) -> None: """ run is the entry point to run change point analysis on test metric data, which is read from config file, and if there is a performance @@ -169,7 +174,8 @@ def run(config_file_path: Optional[str] = None) -> None: tests_config: Dict[str, Dict[str, Any]] = read_test_config(config_file_path) - big_query_metrics_fetcher = BigQueryMetricsFetcher() + if not big_query_metrics_fetcher: + big_query_metrics_fetcher = BigQueryMetricsFetcher() for test_name, params in tests_config.items(): run_change_point_analysis( diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py index 094cd9c47ec0..3c934c295e72 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py @@ -32,6 +32,7 @@ from apache_beam.io.filesystems import FileSystems from apache_beam.testing.analyzers import constants from apache_beam.testing.analyzers import github_issues_utils + from apache_beam.testing.analyzers.perf_analysis_utils import BigQueryMetricsFetcher from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert from apache_beam.testing.analyzers.perf_analysis_utils import e_divisive @@ -45,14 +46,14 @@ # mock methods. -def get_fake_data_with_no_change_point(**kwargs): +def get_fake_data_with_no_change_point(*args, **kwargs): num_samples = 20 metric_values = [1] * num_samples timestamps = list(range(num_samples)) return metric_values, timestamps -def get_fake_data_with_change_point(**kwargs): +def get_fake_data_with_change_point(*args, **kwargs): # change point will be at index 13. num_samples = 20 metric_values = [0] * 12 + [3] + [4] * 7 @@ -151,18 +152,20 @@ def test_duplicate_change_points_are_not_valid_alerts(self): min_runs_between_change_points=min_runs_between_change_points) self.assertFalse(is_alert) - @mock.patch( - 'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data', + @mock.patch.object( + BigQueryMetricsFetcher, + 'fetch_metric_data', get_fake_data_with_no_change_point) def test_no_alerts_when_no_change_points(self): is_alert = analysis.run_change_point_analysis( params=self.params, test_name=self.test_id, - big_query_metrics_fetcher=None) + big_query_metrics_fetcher=BigQueryMetricsFetcher()) self.assertFalse(is_alert) - @mock.patch( - 'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data', + @mock.patch.object( + BigQueryMetricsFetcher, + 'fetch_metric_data', get_fake_data_with_change_point) @mock.patch( 'apache_beam.testing.analyzers.perf_analysis.get_existing_issues_data', @@ -179,11 +182,12 @@ def test_alert_on_data_with_change_point(self, *args): is_alert = analysis.run_change_point_analysis( params=self.params, test_name=self.test_id, - big_query_metrics_fetcher=None) + big_query_metrics_fetcher=BigQueryMetricsFetcher()) self.assertTrue(is_alert) - @mock.patch( - 'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data', + @mock.patch.object( + BigQueryMetricsFetcher, + 'fetch_metric_data', get_fake_data_with_change_point) @mock.patch( 'apache_beam.testing.analyzers.perf_analysis.get_existing_issues_data', @@ -199,7 +203,7 @@ def test_alert_on_data_with_reported_change_point(self, *args): is_alert = analysis.run_change_point_analysis( params=self.params, test_name=self.test_id, - big_query_metrics_fetcher=None) + big_query_metrics_fetcher=BigQueryMetricsFetcher()) self.assertFalse(is_alert) def test_change_point_has_anomaly_marker_in_gh_description(self): diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py index 0a559fc4beeb..d474ca5c5095 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py @@ -14,11 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import abc import logging from dataclasses import asdict from dataclasses import dataclass from statistics import median -from typing import Any from typing import Dict from typing import List from typing import Optional @@ -28,11 +28,11 @@ import pandas as pd import yaml from google.api_core import exceptions +from google.cloud import bigquery from apache_beam.testing.analyzers import constants from apache_beam.testing.analyzers import github_issues_utils from apache_beam.testing.load_tests import load_test_metrics_utils -from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsFetcher from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive @@ -59,9 +59,7 @@ def is_change_point_in_valid_window( return num_runs_in_change_point_window > latest_change_point_run -def get_existing_issues_data( - table_name: str, big_query_metrics_fetcher: BigQueryMetricsFetcher -) -> Optional[pd.DataFrame]: +def get_existing_issues_data(table_name: str, ) -> Optional[pd.DataFrame]: """ Finds the most recent GitHub issue created for the test_name. If no table found with name=test_name, return (None, None) @@ -73,12 +71,14 @@ def get_existing_issues_data( LIMIT 10 """ try: - df = big_query_metrics_fetcher.fetch(query=query) + client = bigquery.Client() + query_job = client.query(query=query) + existing_issue_data = query_job.result().to_dataframe() except exceptions.NotFound: # If no table found, that means this is first performance regression # on the current test+metric. return None - return df + return existing_issue_data def is_perf_alert( @@ -123,33 +123,6 @@ def validate_config(keys): return constants._PERF_TEST_KEYS.issubset(keys) -def fetch_metric_data( - params: Dict[str, Any], big_query_metrics_fetcher: BigQueryMetricsFetcher -) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: - """ - Args: - params: Dict containing keys required to fetch data from a data source. - big_query_metrics_fetcher: A BigQuery metrics fetcher for fetch metrics. - Returns: - Tuple[List[Union[int, float]], List[pd.Timestamp]]: Tuple containing list - of metric_values and list of timestamps. Both are sorted in ascending - order wrt timestamps. - """ - query = f""" - SELECT * - FROM {params['project']}.{params['metrics_dataset']}.{params['metrics_table']} - WHERE CONTAINS_SUBSTR(({load_test_metrics_utils.METRICS_TYPE_LABEL}), '{params['metric_name']}') - ORDER BY {load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL} DESC - LIMIT {constants._NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS} - """ - metric_data: pd.DataFrame = big_query_metrics_fetcher.fetch(query=query) - metric_data.sort_values( - by=[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL], inplace=True) - return ( - metric_data[load_test_metrics_utils.VALUE_LABEL].tolist(), - metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL].tolist()) - - def find_change_points(metric_values: List[Union[float, int]]): return e_divisive(metric_values) @@ -253,3 +226,48 @@ def filter_change_points_by_median_threshold( if relative_change > threshold: valid_change_points.append(idx) return valid_change_points + + +class MetricsFetcher: + @abc.abstractmethod + def fetch_metric_data( + self, project, metrics_dataset, metrics_table, + metric_name) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: + """ + Define schema and fetch the timestamp values and metric values + from BigQuery. + """ + + +class BigQueryMetricsFetcher: + def __init__(self, query: Optional[str] = None): + self.client = bigquery.Client() + self.query = query + + def fetch_metric_data( + self, *, project, metrics_dataset, metrics_table, + metric_name) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: + """ + Args: + params: Dict containing keys required to fetch data from a data source. + Returns: + Tuple[List[Union[int, float]], List[pd.Timestamp]]: Tuple containing list + of metric_values and list of timestamps. Both are sorted in ascending + order wrt timestamps. + """ + if not self.query: + self.query = f""" + SELECT * + FROM {project}.{metrics_dataset}.{metrics_table} + WHERE CONTAINS_SUBSTR(({load_test_metrics_utils.METRICS_TYPE_LABEL}), '{metric_name}') + ORDER BY {load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL} DESC + LIMIT {constants._NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS} + """ + # metric_data: pd.DataFrame = self.fetch(query=query) + query_job = self.client.query(query=self.query) + metric_data = query_job.result().to_dataframe() + metric_data.sort_values( + by=[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL], inplace=True) + return ( + metric_data[load_test_metrics_utils.VALUE_LABEL].tolist(), + metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL].tolist()) diff --git a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py index 92a5f68351fe..01db2c114efb 100644 --- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py +++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py @@ -38,7 +38,6 @@ from typing import Optional from typing import Union -import pandas as pd import requests from requests.auth import HTTPBasicAuth @@ -650,13 +649,3 @@ def __init__(self): def process(self, element): yield self.timestamp_val_fn( element, self.timestamp_fn(micros=int(self.time_fn() * 1000000))) - - -class BigQueryMetricsFetcher: - def __init__(self): - self.client = bigquery.Client() - - def fetch(self, query) -> pd.DataFrame: - query_job = self.client.query(query=query) - result = query_job.result() - return result.to_dataframe()