-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for custom MetricsFetcher in Perf tooling. #28671
Changes from 12 commits
5d292a9
5adaf4f
cf5aa74
ff23d9e
1f2e2d4
563a7c5
36ee55f
4226322
5f28e97
85c46c0
80ca685
c3f6556
34c5256
5d35e4e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,7 +23,6 @@ | |
import argparse | ||
import logging | ||
import os | ||
import uuid | ||
from datetime import datetime | ||
from datetime import timezone | ||
from typing import Any | ||
|
@@ -33,20 +32,21 @@ | |
import pandas as pd | ||
|
||
from apache_beam.testing.analyzers import constants | ||
from apache_beam.testing.analyzers.perf_analysis_utils import BigQueryMetricsFetcher | ||
from apache_beam.testing.analyzers.perf_analysis_utils import GitHubIssueMetaData | ||
from apache_beam.testing.analyzers.perf_analysis_utils import MetricsFetcher | ||
from apache_beam.testing.analyzers.perf_analysis_utils import create_performance_alert | ||
from apache_beam.testing.analyzers.perf_analysis_utils import fetch_metric_data | ||
from apache_beam.testing.analyzers.perf_analysis_utils import find_latest_change_point_index | ||
from apache_beam.testing.analyzers.perf_analysis_utils import get_existing_issues_data | ||
from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window | ||
from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert | ||
from apache_beam.testing.analyzers.perf_analysis_utils import publish_issue_metadata_to_big_query | ||
from apache_beam.testing.analyzers.perf_analysis_utils import read_test_config | ||
from apache_beam.testing.analyzers.perf_analysis_utils import validate_config | ||
from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsFetcher | ||
|
||
|
||
def run_change_point_analysis(params, test_name, big_query_metrics_fetcher): | ||
def run_change_point_analysis( | ||
params, test_id, big_query_metrics_fetcher: MetricsFetcher): | ||
""" | ||
Args: | ||
params: Dict containing parameters to run change point analysis. | ||
|
@@ -56,14 +56,21 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher): | |
Returns: | ||
bool indicating if a change point is observed and alerted on GitHub. | ||
""" | ||
logging.info("Running change point analysis for test %s" % test_name) | ||
logging.info("Running change point analysis for test ID %s" % test_id) | ||
if not validate_config(params.keys()): | ||
raise ValueError( | ||
f"Please make sure all these keys {constants._PERF_TEST_KEYS} " | ||
f"are specified for the {test_name}") | ||
f"are specified for the {test_id}") | ||
|
||
metric_name = params['metric_name'] | ||
|
||
# test_name will be used to query a single test from | ||
# multiple tests in a single BQ table. Right now, the default | ||
# assumption is that all the test have an individual BQ table | ||
# but this might not be case for other tests(such as IO tests where | ||
# a single BQ tables stores all the data) | ||
test_name = params.get('test_name', None) | ||
|
||
min_runs_between_change_points = ( | ||
constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS) | ||
if 'min_runs_between_change_points' in params: | ||
|
@@ -74,15 +81,18 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher): | |
if 'num_runs_in_change_point_window' in params: | ||
num_runs_in_change_point_window = params['num_runs_in_change_point_window'] | ||
|
||
metric_values, timestamps = fetch_metric_data( | ||
params=params, | ||
big_query_metrics_fetcher=big_query_metrics_fetcher | ||
metric_values, timestamps = big_query_metrics_fetcher.fetch_metric_data( | ||
project=params['project'], | ||
metrics_dataset=params['metrics_dataset'], | ||
metrics_table=params['metrics_table'], | ||
metric_name=params['metric_name'], | ||
test_name=test_name | ||
) | ||
|
||
change_point_index = find_latest_change_point_index( | ||
metric_values=metric_values) | ||
if not change_point_index: | ||
logging.info("Change point is not detected for the test %s" % test_name) | ||
logging.info("Change point is not detected for the test ID %s" % test_id) | ||
return False | ||
# since timestamps are ordered in ascending order and | ||
# num_runs_in_change_point_window refers to the latest runs, | ||
|
@@ -92,11 +102,11 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher): | |
if not is_change_point_in_valid_window(num_runs_in_change_point_window, | ||
latest_change_point_run): | ||
logging.info( | ||
'Performance regression/improvement found for the test: %s. ' | ||
'Performance regression/improvement found for the test ID: %s. ' | ||
'on metric %s. Since the change point run %s ' | ||
'lies outside the num_runs_in_change_point_window distance: %s, ' | ||
'alert is not raised.' % ( | ||
test_name, | ||
test_id, | ||
metric_name, | ||
latest_change_point_run + 1, | ||
num_runs_in_change_point_window)) | ||
|
@@ -106,8 +116,7 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher): | |
last_reported_issue_number = None | ||
issue_metadata_table_name = f'{params.get("metrics_table")}_{metric_name}' | ||
existing_issue_data = get_existing_issues_data( | ||
table_name=issue_metadata_table_name, | ||
big_query_metrics_fetcher=big_query_metrics_fetcher) | ||
table_name=issue_metadata_table_name) | ||
|
||
if existing_issue_data is not None: | ||
existing_issue_timestamps = existing_issue_data[ | ||
|
@@ -124,20 +133,21 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher): | |
min_runs_between_change_points=min_runs_between_change_points) | ||
if is_alert: | ||
issue_number, issue_url = create_performance_alert( | ||
metric_name, test_name, timestamps, | ||
metric_name, test_id, timestamps, | ||
metric_values, change_point_index, | ||
params.get('labels', None), | ||
last_reported_issue_number, | ||
test_description = params.get('test_description', None), | ||
test_name = test_name | ||
) | ||
|
||
issue_metadata = GitHubIssueMetaData( | ||
issue_timestamp=pd.Timestamp( | ||
datetime.now().replace(tzinfo=timezone.utc)), | ||
# BQ doesn't allow '.' in table name | ||
test_name=test_name.replace('.', '_'), | ||
test_id=test_id.replace('.', '_'), | ||
test_name=test_name, | ||
metric_name=metric_name, | ||
test_id=uuid.uuid4().hex, | ||
change_point=metric_values[change_point_index], | ||
issue_number=issue_number, | ||
issue_url=issue_url, | ||
|
@@ -149,7 +159,10 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher): | |
return is_alert | ||
|
||
|
||
def run(config_file_path: Optional[str] = None) -> None: | ||
def run( | ||
big_query_metrics_fetcher: MetricsFetcher = BigQueryMetricsFetcher(), | ||
config_file_path: Optional[str] = None, | ||
) -> None: | ||
""" | ||
run is the entry point to run change point analysis on test metric | ||
data, which is read from config file, and if there is a performance | ||
|
@@ -169,12 +182,11 @@ def run(config_file_path: Optional[str] = None) -> None: | |
|
||
tests_config: Dict[str, Dict[str, Any]] = read_test_config(config_file_path) | ||
|
||
big_query_metrics_fetcher = BigQueryMetricsFetcher() | ||
|
||
for test_name, params in tests_config.items(): | ||
for test_id, params in tests_config.items(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We had spoken about supporting multiple There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be done by adding I'll defer to your opinion There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, passing metrics as list will be implemented in a different PR. I will tag you there once it is ready. |
||
run_change_point_analysis( | ||
params=params, | ||
test_name=test_name, | ||
test_id=test_id, | ||
big_query_metrics_fetcher=big_query_metrics_fetcher) | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
only
is repeated, we can omit it from the end of the sentence