Skip to content

Commit

Permalink
Save Job IDs of dataflow load tests (#29693)
Browse files Browse the repository at this point in the history
* Publish job (#29685)

* Update schema if the default schema is different than the table schema

* Add job id label to the BQ publisher for dataflow jobs

* Append job id to each metric

* Fix lint
  • Loading branch information
AnandInguva authored Dec 11, 2023
1 parent b91a517 commit f934230
Showing 1 changed file with 28 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import logging
import time
import uuid
from typing import Any
from typing import Dict
from typing import List
from typing import Mapping
from typing import Optional
Expand All @@ -45,6 +47,7 @@
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricResults
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
from apache_beam.runners.runner import PipelineResult
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils.timestamp import Timestamp
Expand All @@ -65,6 +68,7 @@
SUBMIT_TIMESTAMP_LABEL = 'timestamp'
METRICS_TYPE_LABEL = 'metric'
VALUE_LABEL = 'value'
JOB_ID_LABEL = 'job_id'

SCHEMA = [{
'name': ID_LABEL, 'field_type': 'STRING', 'mode': 'REQUIRED'
Expand All @@ -80,6 +84,8 @@
'mode': 'REQUIRED'
}, {
'name': VALUE_LABEL, 'field_type': 'FLOAT', 'mode': 'REQUIRED'
}, {
'name': JOB_ID_LABEL, 'field_type': 'STRING', 'mode': 'NULLABLE'
}]

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -254,13 +260,27 @@ def publish_metrics(
# Under each key there is list of objects of each metric type. It is
# required to prepare metrics for publishing purposes. Expected is to have
# a list of dictionaries matching the schema.

insert_dicts = self._prepare_all_metrics(metrics, metric_id)

insert_dicts += self._prepare_extra_metrics(metric_id, extra_metrics)

# Add job id for dataflow jobs for easier debugging.
job_id = None
if isinstance(result, DataflowPipelineResult):
job_id = result.job_id()
self._add_job_id_to_metrics(insert_dicts, job_id)

if len(insert_dicts) > 0:
for publisher in self.publishers:
publisher.publish(insert_dicts)

def _add_job_id_to_metrics(self, metrics: List[Dict[str, Any]],
job_id) -> List[Dict[str, Any]]:
for metric in metrics:
metric[JOB_ID_LABEL] = job_id
return metrics

def _prepare_extra_metrics(
self, metric_id: str, extra_metrics: Optional[dict] = None):
ts = time.time()
Expand Down Expand Up @@ -479,6 +499,12 @@ def _get_or_create_table(self, bq_schemas, dataset):
table = bigquery.Table(table_ref, schema=bq_schemas)
self._bq_table = self._client.create_table(table)

def _update_schema(self):
table_schema = self._bq_table.schema
if self.schema and len(table_schema) != self.schema:
self._bq_table.schema = self._prepare_schema()
self._bq_table = self._client.update_table(self._bq_table, ["schema"])

def _get_dataset(self, dataset_name):
bq_dataset_ref = self._client.dataset(dataset_name)
try:
Expand All @@ -490,6 +516,8 @@ def _get_dataset(self, dataset_name):
return bq_dataset

def save(self, results):
# update schema if needed
self._update_schema()
return self._client.insert_rows(self._bq_table, results)


Expand Down

0 comments on commit f934230

Please sign in to comment.