diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 7e9c1e634748..282e9f34be7f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -43,6 +43,7 @@ from apache_beam.internal import pickler from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp +from apache_beam.io.filesystems import FileSystems from apache_beam.io.gcp import bigquery as beam_bq from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.bigquery import ReadFromBigQuery @@ -83,11 +84,13 @@ try: from apitools.base.py.exceptions import HttpError + from apitools.base.py.exceptions import HttpForbiddenError from google.cloud import bigquery as gcp_bigquery from google.api_core import exceptions except ImportError: gcp_bigquery = None HttpError = None + HttpForbiddenError = None exceptions = None # pylint: enable=wrong-import-order, wrong-import-position @@ -288,7 +291,9 @@ def test_repeatable_field_is_properly_converted(self): self.assertEqual(expected_row, actual) -@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') +@unittest.skipIf( + HttpError is None or HttpForbiddenError is None, + 'GCP dependencies are not installed') class TestReadFromBigQuery(unittest.TestCase): @classmethod def setUpClass(cls): @@ -419,6 +424,200 @@ def test_create_temp_dataset_exception(self, exception_type, error_message): mock_insert.assert_called() self.assertIn(error_message, exc.exception.args[0]) + @parameterized.expand([ + # first attempt returns a Http 500 blank error and retries + # second attempt returns a Http 408 blank error and retries, + # third attempt passes + param( + responses=[ + HttpForbiddenError( + response={'status': 500}, content="something", url="") + if HttpForbiddenError else None, + HttpForbiddenError( + response={'status': 408}, content="blank", url="") + if HttpForbiddenError else None + ], + expected_retries=2), + # first attempts returns a 403 rateLimitExceeded error + # second attempt returns a 429 blank error + # third attempt returns a Http 403 rateLimitExceeded error + # fourth attempt passes + param( + responses=[ + exceptions.Forbidden( + "some message", + errors=({ + "message": "transient", "reason": "rateLimitExceeded" + }, )) if exceptions else None, + exceptions.ResourceExhausted("some message") + if exceptions else None, + HttpForbiddenError( + response={'status': 403}, + content={ + "error": { + "errors": [{ + "message": "transient", + "reason": "rateLimitExceeded" + }] + } + }, + url="") if HttpForbiddenError else None, + ], + expected_retries=3), + ]) + def test_get_table_transient_exception(self, responses, expected_retries): + class DummyTable: + class DummySchema: + fields = [] + + numBytes = 5 + schema = DummySchema() + + with mock.patch('time.sleep'), \ + mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService, + 'Get') as mock_get_table, \ + mock.patch.object(BigQueryWrapper, + 'wait_for_bq_job'), \ + mock.patch.object(BigQueryWrapper, + 'perform_extract_job'), \ + mock.patch.object(FileSystems, + 'match'), \ + mock.patch.object(FileSystems, + 'delete'), \ + beam.Pipeline() as p: + call_counter = 0 + + def store_callback(unused_request): + nonlocal call_counter + if call_counter < len(responses): + exception = responses[call_counter] + call_counter += 1 + raise exception + else: + call_counter += 1 + return DummyTable() + + mock_get_table.side_effect = store_callback + _ = p | beam.io.ReadFromBigQuery( + table="project.dataset.table", gcs_location="gs://some_bucket") + + # ReadFromBigQuery export mode calls get_table() twice. Once to get + # metadata (numBytes), and once to retrieve the table's schema + # Any additional calls are retries + self.assertEqual(expected_retries, mock_get_table.call_count - 2) + + @parameterized.expand([ + # first attempt returns a Http 429 with transient reason and retries + # second attempt returns a Http 403 with non-transient reason and fails + param( + responses=[ + HttpForbiddenError( + response={'status': 429}, + content={ + "error": { + "errors": [{ + "message": "transient", + "reason": "rateLimitExceeded" + }] + } + }, + url="") if HttpForbiddenError else None, + HttpForbiddenError( + response={'status': 403}, + content={ + "error": { + "errors": [{ + "message": "transient", "reason": "accessDenied" + }] + } + }, + url="") if HttpForbiddenError else None + ], + expected_retries=1), + # first attempt returns a transient 403 error and retries + # second attempt returns a 403 error with bad contents and fails + param( + responses=[ + HttpForbiddenError( + response={'status': 403}, + content={ + "error": { + "errors": [{ + "message": "transient", + "reason": "rateLimitExceeded" + }] + } + }, + url="") if HttpForbiddenError else None, + HttpError( + response={'status': 403}, content="bad contents", url="") + if HttpError else None + ], + expected_retries=1), + # first attempt returns a transient 403 error and retries + # second attempt returns a 429 error and retries + # third attempt returns a 403 with non-transient reason and fails + param( + responses=[ + exceptions.Forbidden( + "some error", + errors=({ + "message": "transient", "reason": "rateLimitExceeded" + }, )) if exceptions else None, + exceptions.ResourceExhausted("some transient error") + if exceptions else None, + exceptions.Forbidden( + "some error", + errors=({ + "message": "transient", "reason": "accessDenied" + }, )) if exceptions else None, + ], + expected_retries=2), + ]) + def test_get_table_non_transient_exception(self, responses, expected_retries): + class DummyTable: + class DummySchema: + fields = [] + + numBytes = 5 + schema = DummySchema() + + with mock.patch('time.sleep'), \ + mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService, + 'Get') as mock_get_table, \ + mock.patch.object(BigQueryWrapper, + 'wait_for_bq_job'), \ + mock.patch.object(BigQueryWrapper, + 'perform_extract_job'), \ + mock.patch.object(FileSystems, + 'match'), \ + mock.patch.object(FileSystems, + 'delete'), \ + self.assertRaises(Exception), \ + beam.Pipeline() as p: + call_counter = 0 + + def store_callback(unused_request): + nonlocal call_counter + if call_counter < len(responses): + exception = responses[call_counter] + call_counter += 1 + raise exception + else: + call_counter += 1 + return DummyTable() + + mock_get_table.side_effect = store_callback + _ = p | beam.io.ReadFromBigQuery( + table="project.dataset.table", gcs_location="gs://some_bucket") + + # ReadFromBigQuery export mode calls get_table() twice. Once to get + # metadata (numBytes), and once to retrieve the table's schema + # However, the second call is never reached because this test will always + # fail before it does so + # After the first call, any additional calls are retries + self.assertEqual(expected_retries, mock_get_table.call_count - 1) + @parameterized.expand([ param( exception_type=exceptions.BadRequest if exceptions else None, diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 2f9420795288..387a99312561 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -747,7 +747,7 @@ def _insert_all_rows( @retry.with_exponential_backoff( num_retries=MAX_RETRIES, - retry_filter=retry.retry_on_server_errors_and_timeout_filter) + retry_filter=retry.retry_on_server_errors_timeout_or_quota_issues_filter) def get_table(self, project_id, dataset_id, table_id): """Lookup a table's metadata object. diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index 6eed2900b9a4..485fc9d627e9 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -28,6 +28,7 @@ # pytype: skip-file import functools +import json import logging import random import sys @@ -57,6 +58,7 @@ # pylint: enable=wrong-import-order, wrong-import-position _LOGGER = logging.getLogger(__name__) +_RETRYABLE_REASONS = ["rateLimitExceeded", "internalError", "backendError"] class PermanentException(Exception): @@ -166,17 +168,38 @@ def retry_on_server_errors_and_timeout_filter(exception): def retry_on_server_errors_timeout_or_quota_issues_filter(exception): - """Retry on server, timeout and 403 errors. + """Retry on server, timeout, 429, and some 403 errors. - 403 errors can be accessDenied, billingNotEnabled, and also quotaExceeded, - rateLimitExceeded.""" + 403 errors from BigQuery include both non-transient (accessDenied, + billingNotEnabled) and transient errors (rateLimitExceeded). + Only retry transient errors.""" if HttpError is not None and isinstance(exception, HttpError): - if exception.status_code == 403: + if exception.status_code == 429: return True + if exception.status_code == 403: + try: + # attempt to extract the reason and check if it's retryable + content = exception.content + if not isinstance(content, dict): + content = json.loads(exception.content) + return content["error"]["errors"][0]["reason"] in _RETRYABLE_REASONS + except (KeyError, IndexError, TypeError) as e: + _LOGGER.warning( + "Could not determine if HttpError is transient. " + "Will not retry: %s", + e) + return False if GoogleAPICallError is not None and isinstance(exception, GoogleAPICallError): - if exception.code == 403: + if exception.code == 429: return True + if exception.code == 403: + if not hasattr(exception, "errors") or len(exception.errors) == 0: + # default to not retrying + return False + + reason = exception.errors[0]["reason"] + return reason in _RETRYABLE_REASONS if S3ClientError is not None and isinstance(exception, S3ClientError): if exception.code == 403: return True