From c6fe0119d452f090e8e68862c20bb05a90391c20 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 9 Jun 2023 10:12:48 -0400 Subject: [PATCH 1/5] fix: fix beam metrics after migrating to async batcher --- sdks/python/apache_beam/io/gcp/bigtableio.py | 44 +++----------------- 1 file changed, 5 insertions(+), 39 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py index d1a1f86f1cce..d3e2baae304c 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio.py @@ -55,41 +55,6 @@ FLUSH_COUNT = 1000 MAX_ROW_BYTES = 5242880 # 5MB - class _MutationsBatcher(MutationsBatcher): - def __init__( - self, table, flush_count=FLUSH_COUNT, max_row_bytes=MAX_ROW_BYTES): - super().__init__(table, flush_count, max_row_bytes) - self.rows = [] - - def set_flush_callback(self, callback_fn): - self.callback_fn = callback_fn - - def flush(self): - if len(self.rows) != 0: - status_list = self.table.mutate_rows(self.rows) - self.callback_fn(status_list) - - # If even one request fails we retry everything. BigTable mutations are - # idempotent so this should be correct. - # TODO(https://github.com/apache/beam/issues/21396): make this more - # efficient by retrying only re-triable failed requests. - for status in status_list: - if not status: - # BigTable client may return 'None' instead of a valid status in - # some cases due to - # https://github.com/googleapis/python-bigtable/issues/485 - raise Exception( - 'Failed to write a batch of %r records' % len(self.rows)) - elif status.code != 0: - raise Exception( - 'Failed to write a batch of %r records due to %r' % ( - len(self.rows), - ServiceCallMetric.bigtable_error_code_to_grpc_status_string( - status.code))) - - self.total_mutation_count = 0 - self.total_size = 0 - self.rows = [] except ImportError: _LOGGER.warning( @@ -168,8 +133,8 @@ def start_bundle(self): self.beam_options['project_id'], self.beam_options['instance_id'], self.beam_options['table_id']) - self.batcher = _MutationsBatcher(self.table) - self.batcher.set_flush_callback(self.write_mutate_metrics) + self.batcher = MutationsBatcher(self.table, + batch_completed_callback=self.write_mutate_metrics) def process(self, row): self.written.inc() @@ -184,8 +149,9 @@ def process(self, row): self.batcher.mutate(row) def finish_bundle(self): - self.batcher.flush() - self.batcher = None + if self.batcher: + self.batcher.close() + self.batcher = None def display_data(self): return { From 98e67a72978202a67a62e12e44d2f8d589cf399f Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 9 Jun 2023 10:17:23 -0400 Subject: [PATCH 2/5] update version --- sdks/python/setup.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 1b5c886a5748..c83029f2c08c 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -314,9 +314,7 @@ def get_portability_package_data(): 'google-cloud-bigquery>=2.0.0,<4', 'google-cloud-bigquery-storage>=2.6.3,<3', 'google-cloud-core>=2.0.0,<3', - # TODO(https://github.com/apache/beam/issues/26673) - # 2.18.x breaks unit test - 'google-cloud-bigtable>=2.0.0,<2.18.0', + 'google-cloud-bigtable>=2.0.0,<3', 'google-cloud-spanner>=3.0.0,<4', # GCP Packages required by ML functionality 'google-cloud-dlp>=3.0.0,<4', From 6c9ae46a974f0d6b8fffbee92c22532cbda89cae Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 9 Jun 2023 10:35:48 -0400 Subject: [PATCH 3/5] update bigtable version --- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index c83029f2c08c..a62e292085f3 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -314,7 +314,7 @@ def get_portability_package_data(): 'google-cloud-bigquery>=2.0.0,<4', 'google-cloud-bigquery-storage>=2.6.3,<3', 'google-cloud-core>=2.0.0,<3', - 'google-cloud-bigtable>=2.0.0,<3', + 'google-cloud-bigtable>=2.19.0,<3', 'google-cloud-spanner>=3.0.0,<4', # GCP Packages required by ML functionality 'google-cloud-dlp>=3.0.0,<4', From 07e00ba36bf7c56ef12a83f5f2814b0323b36192 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 9 Jun 2023 10:46:45 -0400 Subject: [PATCH 4/5] update format --- sdks/python/apache_beam/io/gcp/bigtableio.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py index d3e2baae304c..5169ef0a5c93 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio.py @@ -133,8 +133,8 @@ def start_bundle(self): self.beam_options['project_id'], self.beam_options['instance_id'], self.beam_options['table_id']) - self.batcher = MutationsBatcher(self.table, - batch_completed_callback=self.write_mutate_metrics) + self.batcher = MutationsBatcher( + self.table, batch_completed_callback=self.write_mutate_metrics) def process(self, row): self.written.inc() From 77a7f95f34e4aec6b81f184260720204983b0e5d Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 9 Jun 2023 10:57:58 -0400 Subject: [PATCH 5/5] fix format --- sdks/python/apache_beam/io/gcp/bigtableio.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py index 5169ef0a5c93..3f045ee26f86 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio.py @@ -55,7 +55,6 @@ FLUSH_COUNT = 1000 MAX_ROW_BYTES = 5242880 # 5MB - except ImportError: _LOGGER.warning( 'ImportError: from google.cloud.bigtable import Client', exc_info=True) @@ -134,7 +133,7 @@ def start_bundle(self): self.beam_options['instance_id'], self.beam_options['table_id']) self.batcher = MutationsBatcher( - self.table, batch_completed_callback=self.write_mutate_metrics) + self.table, batch_completed_callback=self.write_mutate_metrics) def process(self, row): self.written.inc()