Skip to content

Commit

Permalink
Support for custom BigQueryMetricsFetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
AnandInguva committed Sep 26, 2023
1 parent d52b077 commit 5d292a9
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 64 deletions.
22 changes: 14 additions & 8 deletions sdks/python/apache_beam/testing/analyzers/perf_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,21 @@
import pandas as pd

from apache_beam.testing.analyzers import constants
from apache_beam.testing.analyzers.perf_analysis_utils import MetricsFetcher
from apache_beam.testing.analyzers.perf_analysis_utils import GitHubIssueMetaData
from apache_beam.testing.analyzers.perf_analysis_utils import BigQueryMetricsFetcher
from apache_beam.testing.analyzers.perf_analysis_utils import create_performance_alert
from apache_beam.testing.analyzers.perf_analysis_utils import fetch_metric_data
from apache_beam.testing.analyzers.perf_analysis_utils import find_latest_change_point_index
from apache_beam.testing.analyzers.perf_analysis_utils import get_existing_issues_data
from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window
from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert
from apache_beam.testing.analyzers.perf_analysis_utils import publish_issue_metadata_to_big_query
from apache_beam.testing.analyzers.perf_analysis_utils import read_test_config
from apache_beam.testing.analyzers.perf_analysis_utils import validate_config
from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsFetcher


def run_change_point_analysis(params, test_name, big_query_metrics_fetcher):
def run_change_point_analysis(
params, test_name, big_query_metrics_fetcher: MetricsFetcher):
"""
Args:
params: Dict containing parameters to run change point analysis.
Expand Down Expand Up @@ -74,9 +75,11 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher):
if 'num_runs_in_change_point_window' in params:
num_runs_in_change_point_window = params['num_runs_in_change_point_window']

metric_values, timestamps = fetch_metric_data(
params=params,
big_query_metrics_fetcher=big_query_metrics_fetcher
metric_values, timestamps = big_query_metrics_fetcher.fetch_metric_data(
project=params['project'],
metrics_dataset=params['metrics_dataset'],
metrics_table=params['metrics_table'],
metric_name=params['metric_name']
)

change_point_index = find_latest_change_point_index(
Expand Down Expand Up @@ -149,7 +152,9 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher):
return is_alert


def run(config_file_path: Optional[str] = None) -> None:
def run(
config_file_path: Optional[str] = None,
big_query_metrics_fetcher: Optional[MetricsFetcher] = None) -> None:
"""
run is the entry point to run change point analysis on test metric
data, which is read from config file, and if there is a performance
Expand All @@ -169,7 +174,8 @@ def run(config_file_path: Optional[str] = None) -> None:

tests_config: Dict[str, Dict[str, Any]] = read_test_config(config_file_path)

big_query_metrics_fetcher = BigQueryMetricsFetcher()
if not big_query_metrics_fetcher:
big_query_metrics_fetcher = BigQueryMetricsFetcher()

for test_name, params in tests_config.items():
run_change_point_analysis(
Expand Down
26 changes: 15 additions & 11 deletions sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from apache_beam.io.filesystems import FileSystems
from apache_beam.testing.analyzers import constants
from apache_beam.testing.analyzers import github_issues_utils
from apache_beam.testing.analyzers.perf_analysis_utils import BigQueryMetricsFetcher
from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window
from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert
from apache_beam.testing.analyzers.perf_analysis_utils import e_divisive
Expand All @@ -45,14 +46,14 @@


# mock methods.
def get_fake_data_with_no_change_point(**kwargs):
def get_fake_data_with_no_change_point(*args, **kwargs):
num_samples = 20
metric_values = [1] * num_samples
timestamps = list(range(num_samples))
return metric_values, timestamps


def get_fake_data_with_change_point(**kwargs):
def get_fake_data_with_change_point(*args, **kwargs):
# change point will be at index 13.
num_samples = 20
metric_values = [0] * 12 + [3] + [4] * 7
Expand Down Expand Up @@ -151,18 +152,20 @@ def test_duplicate_change_points_are_not_valid_alerts(self):
min_runs_between_change_points=min_runs_between_change_points)
self.assertFalse(is_alert)

@mock.patch(
'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data',
@mock.patch.object(
BigQueryMetricsFetcher,
'fetch_metric_data',
get_fake_data_with_no_change_point)
def test_no_alerts_when_no_change_points(self):
is_alert = analysis.run_change_point_analysis(
params=self.params,
test_name=self.test_id,
big_query_metrics_fetcher=None)
big_query_metrics_fetcher=BigQueryMetricsFetcher())
self.assertFalse(is_alert)

@mock.patch(
'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data',
@mock.patch.object(
BigQueryMetricsFetcher,
'fetch_metric_data',
get_fake_data_with_change_point)
@mock.patch(
'apache_beam.testing.analyzers.perf_analysis.get_existing_issues_data',
Expand All @@ -179,11 +182,12 @@ def test_alert_on_data_with_change_point(self, *args):
is_alert = analysis.run_change_point_analysis(
params=self.params,
test_name=self.test_id,
big_query_metrics_fetcher=None)
big_query_metrics_fetcher=BigQueryMetricsFetcher())
self.assertTrue(is_alert)

@mock.patch(
'apache_beam.testing.analyzers.perf_analysis.fetch_metric_data',
@mock.patch.object(
BigQueryMetricsFetcher,
'fetch_metric_data',
get_fake_data_with_change_point)
@mock.patch(
'apache_beam.testing.analyzers.perf_analysis.get_existing_issues_data',
Expand All @@ -199,7 +203,7 @@ def test_alert_on_data_with_reported_change_point(self, *args):
is_alert = analysis.run_change_point_analysis(
params=self.params,
test_name=self.test_id,
big_query_metrics_fetcher=None)
big_query_metrics_fetcher=BigQueryMetricsFetcher())
self.assertFalse(is_alert)

def test_change_point_has_anomaly_marker_in_gh_description(self):
Expand Down
86 changes: 52 additions & 34 deletions sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import abc
import logging
from dataclasses import asdict
from dataclasses import dataclass
from statistics import median
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
Expand All @@ -28,11 +28,11 @@
import pandas as pd
import yaml
from google.api_core import exceptions
from google.cloud import bigquery

from apache_beam.testing.analyzers import constants
from apache_beam.testing.analyzers import github_issues_utils
from apache_beam.testing.load_tests import load_test_metrics_utils
from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsFetcher
from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher
from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive

Expand All @@ -59,9 +59,7 @@ def is_change_point_in_valid_window(
return num_runs_in_change_point_window > latest_change_point_run


def get_existing_issues_data(
table_name: str, big_query_metrics_fetcher: BigQueryMetricsFetcher
) -> Optional[pd.DataFrame]:
def get_existing_issues_data(table_name: str, ) -> Optional[pd.DataFrame]:
"""
Finds the most recent GitHub issue created for the test_name.
If no table found with name=test_name, return (None, None)
Expand All @@ -73,12 +71,14 @@ def get_existing_issues_data(
LIMIT 10
"""
try:
df = big_query_metrics_fetcher.fetch(query=query)
client = bigquery.Client()
query_job = client.query(query=query)
existing_issue_data = query_job.result().to_dataframe()
except exceptions.NotFound:
# If no table found, that means this is first performance regression
# on the current test+metric.
return None
return df
return existing_issue_data


def is_perf_alert(
Expand Down Expand Up @@ -123,33 +123,6 @@ def validate_config(keys):
return constants._PERF_TEST_KEYS.issubset(keys)


def fetch_metric_data(
params: Dict[str, Any], big_query_metrics_fetcher: BigQueryMetricsFetcher
) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]:
"""
Args:
params: Dict containing keys required to fetch data from a data source.
big_query_metrics_fetcher: A BigQuery metrics fetcher for fetch metrics.
Returns:
Tuple[List[Union[int, float]], List[pd.Timestamp]]: Tuple containing list
of metric_values and list of timestamps. Both are sorted in ascending
order wrt timestamps.
"""
query = f"""
SELECT *
FROM {params['project']}.{params['metrics_dataset']}.{params['metrics_table']}
WHERE CONTAINS_SUBSTR(({load_test_metrics_utils.METRICS_TYPE_LABEL}), '{params['metric_name']}')
ORDER BY {load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL} DESC
LIMIT {constants._NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS}
"""
metric_data: pd.DataFrame = big_query_metrics_fetcher.fetch(query=query)
metric_data.sort_values(
by=[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL], inplace=True)
return (
metric_data[load_test_metrics_utils.VALUE_LABEL].tolist(),
metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL].tolist())


def find_change_points(metric_values: List[Union[float, int]]):
return e_divisive(metric_values)

Expand Down Expand Up @@ -253,3 +226,48 @@ def filter_change_points_by_median_threshold(
if relative_change > threshold:
valid_change_points.append(idx)
return valid_change_points


class MetricsFetcher:
@abc.abstractmethod
def fetch_metric_data(
self, project, metrics_dataset, metrics_table,
metric_name) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]:
"""
Define schema and fetch the timestamp values and metric values
from BigQuery.
"""


class BigQueryMetricsFetcher:
def __init__(self, query: Optional[str] = None):
self.client = bigquery.Client()
self.query = query

def fetch_metric_data(
self, *, project, metrics_dataset, metrics_table,
metric_name) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]:
"""
Args:
params: Dict containing keys required to fetch data from a data source.
Returns:
Tuple[List[Union[int, float]], List[pd.Timestamp]]: Tuple containing list
of metric_values and list of timestamps. Both are sorted in ascending
order wrt timestamps.
"""
if not self.query:
self.query = f"""
SELECT *
FROM {project}.{metrics_dataset}.{metrics_table}
WHERE CONTAINS_SUBSTR(({load_test_metrics_utils.METRICS_TYPE_LABEL}), '{metric_name}')
ORDER BY {load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL} DESC
LIMIT {constants._NUM_DATA_POINTS_TO_RUN_CHANGE_POINT_ANALYSIS}
"""
# metric_data: pd.DataFrame = self.fetch(query=query)
query_job = self.client.query(query=self.query)
metric_data = query_job.result().to_dataframe()
metric_data.sort_values(
by=[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL], inplace=True)
return (
metric_data[load_test_metrics_utils.VALUE_LABEL].tolist(),
metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL].tolist())
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from typing import Optional
from typing import Union

import pandas as pd
import requests
from requests.auth import HTTPBasicAuth

Expand Down Expand Up @@ -650,13 +649,3 @@ def __init__(self):
def process(self, element):
yield self.timestamp_val_fn(
element, self.timestamp_fn(micros=int(self.time_fn() * 1000000)))


class BigQueryMetricsFetcher:
def __init__(self):
self.client = bigquery.Client()

def fetch(self, query) -> pd.DataFrame:
query_job = self.client.query(query=query)
result = query_job.result()
return result.to_dataframe()

0 comments on commit 5d292a9

Please sign in to comment.