Skip to content

Commit

Permalink
style fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Sep 12, 2022
1 parent 50c50e9 commit 1e39863
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ def finish_bundle(self):
TriggerCopyJobs.TRIGGER_DELETE_TEMP_TABLES,
GlobalWindows.windowed_value(None))


class TriggerLoadJobs(beam.DoFn):
"""Triggers the import jobs to BQ.
Expand Down
14 changes: 8 additions & 6 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,10 @@ def test_wait_for_load_job_completion(self, sleep_mock):
(partition_2[0], job_2.jobReference)]
with TestPipeline('DirectRunner') as p:
partitions = p | beam.Create([partition_1, partition_2])
outputs = (partitions
| beam.ParDo(bqfl.TriggerLoadJobs(test_client=bq_client),
test_job_prefix))
outputs = (
partitions
| beam.ParDo(
bqfl.TriggerLoadJobs(test_client=bq_client), test_job_prefix))

assert_that(outputs, equal_to(expected_dest_jobref_list))

Expand Down Expand Up @@ -616,9 +617,10 @@ def test_one_load_job_failed_after_waiting(self, sleep_mock):
with self.assertRaises(Exception):
with TestPipeline('DirectRunner') as p:
partitions = p | beam.Create([partition_1, partition_2])
_ = (partitions
| beam.ParDo(bqfl.TriggerLoadJobs(test_client=bq_client),
test_job_prefix))
_ = (
partitions
| beam.ParDo(
bqfl.TriggerLoadJobs(test_client=bq_client), test_job_prefix))

sleep_mock.assert_called_once()

Expand Down

0 comments on commit 1e39863

Please sign in to comment.