Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python BQ] Retry get_table for quota errors #28820

Merged
merged 18 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 200 additions & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from apache_beam.internal import pickler
from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.gcp import bigquery as beam_bq
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
Expand Down Expand Up @@ -83,11 +84,13 @@

try:
from apitools.base.py.exceptions import HttpError
from apitools.base.py.exceptions import HttpForbiddenError
from google.cloud import bigquery as gcp_bigquery
from google.api_core import exceptions
except ImportError:
gcp_bigquery = None
HttpError = None
HttpForbiddenError = None
exceptions = None
# pylint: enable=wrong-import-order, wrong-import-position

Expand Down Expand Up @@ -288,7 +291,9 @@ def test_repeatable_field_is_properly_converted(self):
self.assertEqual(expected_row, actual)


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
@unittest.skipIf(
HttpError is None or HttpForbiddenError is None,
'GCP dependencies are not installed')
class TestReadFromBigQuery(unittest.TestCase):
@classmethod
def setUpClass(cls):
Expand Down Expand Up @@ -419,6 +424,200 @@ def test_create_temp_dataset_exception(self, exception_type, error_message):
mock_insert.assert_called()
self.assertIn(error_message, exc.exception.args[0])

@parameterized.expand([
# first attempt returns a Http 500 blank error and retries
# second attempt returns a Http 408 blank error and retries,
# third attempt passes
param(
responses=[
HttpForbiddenError(
response={'status': 500}, content="something", url="")
if HttpForbiddenError else None,
HttpForbiddenError(
response={'status': 408}, content="blank", url="")
if HttpForbiddenError else None
],
expected_retries=2),
# first attempts returns a 403 rateLimitExceeded error
# second attempt returns a 429 blank error
# third attempt returns a Http 403 rateLimitExceeded error
# fourth attempt passes
param(
responses=[
exceptions.Forbidden(
"some message",
errors=({
"message": "transient", "reason": "rateLimitExceeded"
}, )) if exceptions else None,
exceptions.ResourceExhausted("some message")
if exceptions else None,
HttpForbiddenError(
response={'status': 403},
content={
"error": {
"errors": [{
"message": "transient",
"reason": "rateLimitExceeded"
}]
}
},
url="") if HttpForbiddenError else None,
],
expected_retries=3),
])
def test_get_table_transient_exception(self, responses, expected_retries):
class DummyTable:
class DummySchema:
fields = []

numBytes = 5
schema = DummySchema()

with mock.patch('time.sleep'), \
mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService,
'Get') as mock_get_table, \
mock.patch.object(BigQueryWrapper,
'wait_for_bq_job'), \
mock.patch.object(BigQueryWrapper,
'perform_extract_job'), \
mock.patch.object(FileSystems,
'match'), \
mock.patch.object(FileSystems,
'delete'), \
beam.Pipeline() as p:
call_counter = 0

def store_callback(unused_request):
nonlocal call_counter
if call_counter < len(responses):
exception = responses[call_counter]
call_counter += 1
raise exception
else:
call_counter += 1
return DummyTable()

mock_get_table.side_effect = store_callback
_ = p | beam.io.ReadFromBigQuery(
table="project.dataset.table", gcs_location="gs://some_bucket")

# ReadFromBigQuery export mode calls get_table() twice. Once to get
# metadata (numBytes), and once to retrieve the table's schema
# Any additional calls are retries
self.assertEqual(expected_retries, mock_get_table.call_count - 2)

@parameterized.expand([
# first attempt returns a Http 429 with transient reason and retries
# second attempt returns a Http 403 with non-transient reason and fails
param(
responses=[
HttpForbiddenError(
response={'status': 429},
content={
"error": {
"errors": [{
"message": "transient",
"reason": "rateLimitExceeded"
}]
}
},
url="") if HttpForbiddenError else None,
HttpForbiddenError(
response={'status': 403},
content={
"error": {
"errors": [{
"message": "transient", "reason": "accessDenied"
}]
}
},
url="") if HttpForbiddenError else None
],
expected_retries=1),
# first attempt returns a transient 403 error and retries
# second attempt returns a 403 error with bad contents and fails
param(
responses=[
HttpForbiddenError(
response={'status': 403},
content={
"error": {
"errors": [{
"message": "transient",
"reason": "rateLimitExceeded"
}]
}
},
url="") if HttpForbiddenError else None,
HttpError(
response={'status': 403}, content="bad contents", url="")
if HttpError else None
],
expected_retries=1),
# first attempt returns a transient 403 error and retries
# second attempt returns a 429 error and retries
# third attempt returns a 403 with non-transient reason and fails
param(
responses=[
exceptions.Forbidden(
"some error",
errors=({
"message": "transient", "reason": "rateLimitExceeded"
}, )) if exceptions else None,
exceptions.ResourceExhausted("some transient error")
if exceptions else None,
exceptions.Forbidden(
"some error",
errors=({
"message": "transient", "reason": "accessDenied"
}, )) if exceptions else None,
],
expected_retries=2),
])
def test_get_table_non_transient_exception(self, responses, expected_retries):
class DummyTable:
class DummySchema:
fields = []

numBytes = 5
schema = DummySchema()

with mock.patch('time.sleep'), \
mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService,
'Get') as mock_get_table, \
mock.patch.object(BigQueryWrapper,
'wait_for_bq_job'), \
mock.patch.object(BigQueryWrapper,
'perform_extract_job'), \
mock.patch.object(FileSystems,
'match'), \
mock.patch.object(FileSystems,
'delete'), \
self.assertRaises(Exception), \
beam.Pipeline() as p:
call_counter = 0

def store_callback(unused_request):
nonlocal call_counter
if call_counter < len(responses):
exception = responses[call_counter]
call_counter += 1
raise exception
else:
call_counter += 1
return DummyTable()

mock_get_table.side_effect = store_callback
_ = p | beam.io.ReadFromBigQuery(
table="project.dataset.table", gcs_location="gs://some_bucket")

# ReadFromBigQuery export mode calls get_table() twice. Once to get
# metadata (numBytes), and once to retrieve the table's schema
# However, the second call is never reached because this test will always
# fail before it does so
# After the first call, any additional calls are retries
self.assertEqual(expected_retries, mock_get_table.call_count - 1)

@parameterized.expand([
param(
exception_type=exceptions.BadRequest if exceptions else None,
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ def _insert_all_rows(

@retry.with_exponential_backoff(
num_retries=MAX_RETRIES,
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
retry_filter=retry.retry_on_server_errors_timeout_or_quota_issues_filter)
def get_table(self, project_id, dataset_id, table_id):
"""Lookup a table's metadata object.

Expand Down
33 changes: 28 additions & 5 deletions sdks/python/apache_beam/utils/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
# pytype: skip-file

import functools
import json
import logging
import random
import sys
Expand Down Expand Up @@ -57,6 +58,7 @@
# pylint: enable=wrong-import-order, wrong-import-position

_LOGGER = logging.getLogger(__name__)
_RETRYABLE_REASONS = ["rateLimitExceeded", "internalError", "backendError"]


class PermanentException(Exception):
Expand Down Expand Up @@ -166,17 +168,38 @@ def retry_on_server_errors_and_timeout_filter(exception):


def retry_on_server_errors_timeout_or_quota_issues_filter(exception):
"""Retry on server, timeout and 403 errors.
"""Retry on server, timeout, 429, and some 403 errors.

403 errors can be accessDenied, billingNotEnabled, and also quotaExceeded,
rateLimitExceeded."""
403 errors from BigQuery include both non-transient (accessDenied,
billingNotEnabled) and transient errors (rateLimitExceeded).
Only retry transient errors."""
if HttpError is not None and isinstance(exception, HttpError):
if exception.status_code == 403:
if exception.status_code == 429:
return True
if exception.status_code == 403:
try:
# attempt to extract the reason and check if it's retryable
content = exception.content
if not isinstance(content, dict):
content = json.loads(exception.content)
return content["error"]["errors"][0]["reason"] in _RETRYABLE_REASONS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a deeply nested information. Is the response guaranteed (documented) to have this format? In any case, would you mind add a comment for the example error json loads, or is there an example error message could share?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm basing this off of the structure of the exception returned from get_table():

bq = bigquery_tools.BigQueryWrapper()
try:
  bq.get_table("google.com:clouddfe", "nonexistent", "nonexistent")
except Exception as e:
  print(type(e))                  # >>> <class 'apitools.base.py.exceptions.HttpNotFoundError'>
  print(e.status_code)              # >>> 404
  print(type(e.content))              # >>> <class 'bytes'>
  content = json.loads(e.content)

  print(content.keys())                   # >>> dict_keys(['error'])
  print(content['error'].keys())             # >>> dict_keys(['code', 'message', 'errors', 'status', 'details'])
  print(content['error']['errors'][0].keys())   # >>> dict_keys(['message', 'domain', 'reason', 'debugInfo'])
  print(content['error']['errors'][0]['reason'])  # >>> notFound

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detail. Anyways this is wrapped in a a fail-safe try clause so LGTM

except (KeyError, IndexError, TypeError) as e:
_LOGGER.warning(
"Could not determine if HttpError is transient. "
"Will not retry: %s",
e)
return False
if GoogleAPICallError is not None and isinstance(exception,
GoogleAPICallError):
if exception.code == 403:
if exception.code == 429:
return True
if exception.code == 403:
if not hasattr(exception, "errors") or len(exception.errors) == 0:
# default to not retrying
return False

reason = exception.errors[0]["reason"]
return reason in _RETRYABLE_REASONS
if S3ClientError is not None and isinstance(exception, S3ClientError):
if exception.code == 403:
return True
Expand Down
Loading