-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Return errors for insert_rows_json exceptions (#21080) #28091
Return errors for insert_rows_json exceptions (#21080) #28091
Conversation
941cdb3
to
834d9a9
Compare
R: @liferoad - would you be able to provide some feedback or suggest a reviewer? |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
@liferoad could you check or suggest a reviewer ? |
R: @Abacn Sorry, this was lost in my notifications. |
@liferoad @Abacn - Just looking for early feedback on the approach. Provided some background in this comment. |
I see. Thanks. R: @ahmedabu98 |
raise | ||
except HttpError as e: | ||
# Package exception up with required fields | ||
# Set reason to 'invalid' to consider these execptions as RetryStrategy._NON_TRANSIENT_ERRORS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If these errors are retry-able, the new changes will not be able to insert these rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @liferoad, I'll take a closer look at that. Let me know if I'm off on any of this, but my thinking had been - as the code currently stands the ClientError
, GoogleAPICallError
, and HttpError
exceptions never get a chance to be retried anyway. So this doesn't take away a chance to be retried, just makes sure the rows can be captured in failed_rows and provide a way to disposition the message and ack a pubsub message.
The reason I had thought that the ClientError, GoogleAPICallError, and HttpError exceptions never get a chance to be retried is:
- the exceptions gets re-raised in
_insert_all_rows
here - the exceptions is not caught by
insert_rows
here - and the
_flush_batch
method here isn't catching them either
End result of all that for ClientError
, GoogleAPICallError
, and HttpError
exceptions is that the exception results in a pipeline error rather than producing a usable errors
list, and RetryStrategy never gets a chance to be evaluated.
My thought was that it would be better to at least route those rows to the failed rows tag where users can choose what to do with them and avoid issues I've seen with pubsub messages that are never acked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we fix the code path to retry these errors (at least the errors are retry-able) instead of returning them as the error PColl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liferoad - I'm giving that some thought, will report back
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. In general, for any batch job, Dataflow will retry the failed work items 4 times; for streaming jobs, it will retry forever. So if one error is retry-able, we could just raise the exception and let Dataflow handle it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if one error is retry-able, we could just raise the exception and let Dataflow handle it
IIUC this is the current behavior already. When this exception is raised, Dataflow sees it as a failed work item and retries it (batch 4 times, streaming infinitely).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for taking this on @ajdub508! I've seen users running into this problem frequently. Dataflow streaming retries failed work items infinitely so in the worst of cases they end up with a stuck pipeline.
I like this approach of throwing a batch in the DLQ if the insert call fails, it's better than hopelessly retrying and at least users can decide what to do with the failed rows in a downstream stage. However, I'd like to do it so that we still retry when it's appropriate to.
# Set reason to 'invalid' to consider these execptions as RetryStrategy._NON_TRANSIENT_ERRORS | ||
error = { | ||
'message': e.message, | ||
'reason': 'invalid' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer a more granular approach than setting all these exceptions to invalid
. Are we sure these errors will always be non-transient? Perhaps we can have an if-tree that sets an appropriate reason based on the error code. Or maybe better to pass in the actual error reason and update _NON_TRANSIENT_ERRORS
to include more reasons.
For example the code 400
indicates an invalid error so we can say that's non-transient. But if we get something like a 500
or 503
indicates a temporary error and BQ suggests retrying. More info in this error messages documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll look at adding to _NON_TRANSIENT_ERRORS
, that's a good idea.
raise | ||
except HttpError as e: | ||
# Package exception up with required fields | ||
# Set reason to 'invalid' to consider these execptions as RetryStrategy._NON_TRANSIENT_ERRORS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if one error is retry-able, we could just raise the exception and let Dataflow handle it
IIUC this is the current behavior already. When this exception is raised, Dataflow sees it as a failed work item and retries it (batch 4 times, streaming infinitely).
def test_big_query_write_insert_not_found_errors(self): | ||
""" | ||
Test that NotFound errors returned by beam.io.WriteToBigQuery | ||
contain both the failed rows amd the reason for it failing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
contain both the failed rows amd the reason for it failing. | |
contain both the failed rows and the reason for it failing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback, @ahmedabu98. I think we could raise 429, 500, and 503 errors to let the pipeline errors be retried indefinitely and route all the others to the failed rows tag. This would be consistent with errors defined as transient in API Core Retry and AIP-194
Those retryable errors would all be caught in the first except clause. The second clause catching HttpError type doesn't contain anything that is typically considered transient and they'll go straight to failed rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this some more along with the suggestion to add to _NON_TRANSIENT_ERRORS
, should be able to use the response reasons to let RetryStrategy do its thing, may not need to raise and cause a pipeline error.
d1a7db7
to
b5b863d
Compare
Restructured tests to cover the 2 types of issues that can arise from insert_rows_json calls:
Separate tests added for each retry strategy for both of those types of issues. Will cleanup any linter or test issues, incorporate any additional feedback, and get ready to move out of draft status next. |
Codecov Report
@@ Coverage Diff @@
## master #28091 +/- ##
==========================================
+ Coverage 72.24% 72.26% +0.01%
==========================================
Files 684 684
Lines 100952 100953 +1
==========================================
+ Hits 72929 72949 +20
+ Misses 26447 26428 -19
Partials 1576 1576
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 11 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
10aaa70
to
dbb6573
Compare
@liferoad @ahmedabu98 Moving this out of draft status, ready for review. I did not change handling of HttpErrors because they are different from GoogleAPICallError exceptions. Exceptions from |
Run Python 3.8 PostCommit |
@ahmedabu98 The test failed but looks like the failures were unrelated. I still need to add to CHANGES.md. Should I add under Bugfixes since the #21080 issue is tagged as a bug or is there a chance this should be considered a breaking change for anyone depending on error handling logic? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments documenting the test cases! was really helpful when following along. I have a few clarifying questions there and a couple requests:
- Can we keep the types of exceptions we currently have? We wouldn't want to lose all that coverage.
- I see that the reasons added to
_NON_TRANSIENT_ERRORS
may be geared towards GRPC errors? Can we add a few reasons that BQ would return to us? for examplenotFound
(this particular one is mentioned in WriteToBigQuery ignores insert_retry_strategy on HttpErrors #21080) oraccessDenied
# Values below may be found in reasons provided either in an | ||
# error returned by a client method or by an http response as | ||
# defined in google.api_core.exceptions | ||
_NON_TRANSIENT_ERRORS = { | ||
'invalid', | ||
'invalidQuery', | ||
'notImplemented', | ||
'Moved Permanently', | ||
'Not Modified', | ||
'Temporary Redirect', | ||
'Resume Incomplete', | ||
'Bad Request', | ||
'Unauthorized', | ||
'Forbidden', | ||
'Not Found', | ||
'Method Not Allowed', | ||
'Conflict', | ||
'Length Required', | ||
'Precondition Failed', | ||
'Not Implemented', | ||
'Bad Gateway', | ||
'Gateway Timeout', | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this comprehensive list! I'm a little unfamiliar with a few of these so will need another pair of eyes to check.
From a cursory search though, it appears "bad gateway" may be a transient error? Same with "gateway timeout". I would defer to https://cloud.google.com/bigquery/docs/error-messages for a good list to take from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also error reasons from BQ seem to appear in camel case format (e.g. notFound
instead of Not Found
).
Is it different for GRPC errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this comprehensive list! I'm a little unfamiliar with a few of these so will need another pair of eyes to check.
From a cursory search though, it appears "bad gateway" may be a transient error? Same with "gateway timeout". I would defer to https://cloud.google.com/bigquery/docs/error-messages for a good list to take from.
I see what you mean about some of the one I added possibly being transient, we certainly could take some of the 5XX errors out of that list to broaden it a bit.
I went narrow, initially, based on the narrow set of api core exceptions considered transient in api core retry here, where it's only 429 Too Many Requests, 500 Internal Server Error, and 503 Service Unavailable that are considered transient. Other than those 3, all remaining 4XX, 5XX, and 3XX errors are not considered transient.
Let me know if you think we should remove some 5XX exceptions from the list or if we should stick with just retrying 429, 500, and 503.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also error reasons from BQ seem to appear in camel case format (e.g.
notFound
instead ofNot Found
). Is it different for GRPC errors?
Posted this in a different comment but will include here to make it easy to follow in this thread -
There are 2 types of error reasons in that list:
- Reasons returned in the error list response of the
self.gcp_bq_client.insert_rows_json
call. Those are in camel case and are the ones found here in this doc - https://cloud.google.com/bigquery/docs/error-messages. - Reasons returned by the
GoogleAPICalError
exceptions. Those follow the HTTP standard reason format. A 404, for example will return a reason ofNot Found
, which is verified with this integration test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah let's be cautious which errors we label as "non-transient", because that could break many current pipelines that rely on Beam to successfully retry these errors. Previously these GRPC errors would be raised and the runner can retry the whole bundle.
I would lean towards only listing the errors we are sure to be non-transient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, I'll review that and trim that list down a bit, seems like retrying more of the 5XX errors would make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this commit which removed a few exceptions based on our conversation and a review of the bq error table:
- Errors that don't seem to be relevant:
- 3XX Errors - Moved Permanently, Not Modified, Temporary Redirect, Resume Incomplete
- Some 4XX Errors - Method Not Allowed, Length Required, Precondition Failed
- Errors that should be retried:
- 409 Conflict
- 502 Bad Gateway
- 504 Gateway Timeout
exception_type=requests.exceptions.ConnectionError, | ||
error_message='some connection error'), | ||
exception_type=exceptions.TooManyRequests if exceptions else None, | ||
error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS | ||
streaming=False), | ||
param( | ||
exception_type=requests.exceptions.Timeout, | ||
error_message='some timeout error'), | ||
exception_type=exceptions.Forbidden if exceptions else None, | ||
error_reason='Forbidden', # in _NON_TRANSIENT_ERRORS | ||
streaming=False), | ||
param( | ||
exception_type=ConnectionError, | ||
error_message='some py connection error'), | ||
exception_type=exceptions.TooManyRequests if exceptions else None, | ||
error_reason='Too Many Requests', # not in _NON_TRANSIENT_ERRORS | ||
streaming=True), | ||
param( | ||
exception_type=exceptions.BadGateway if exceptions else None, | ||
error_message='some badgateway error'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, these exceptions are important to keep. There was a large effort last year to add wide exception testing coverage to make sure these exceptions are handled properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say the same with the rest of the current exception tests that we have. You can maybe try integrating those exceptions with your tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will add back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking closer at this - I can add these back, but I still think they might be redundant. I think the tests in this change cover the possible code paths. This is my line of thinking, let me know where I'm off track:
- All of the existing exception tests are mocking
google.cloud.bigquery.Client.insert_rows_json
- There 2 scenarios we need to handle when calling that method with regard to errors/exceptions:
In each of those scenarios, a different path can be taken based on whether the reason is in _NON_TRANSIENT_ERRORS
, so the tests in this change exercise both transient/non-transient to cover both paths.
The tests are re-organized in this change to try to make it clear which path is being tested, with test_insert_rows_json_exception_*
tests for the first scenario and test_insert_rows_json_errors_*
tests for the 2nd scenario. Also exercising retry strategies and transient/non-transient errors for each scenario to cover those paths for each scenario.
I don't mean to be a pain on this and certainly may be missing something, so I will defer to project committers on this, just wanted to see if I can understand what I'm missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're certainly right that fundamentally we should be focusing on those two code paths. I agree with not bringing those tests back, but maybe the exceptions themselves should be spread out across your tests. For example, we can replace one of the many exceptions.TooManyRequests
in your test cases with exceptions.Timeout
, and in another test case replace it with exceptions.BadGateway
etc. So the tests you wrote can stay where they are, but just the parameters can change to include a wider variety of exceptions.
Although these exceptions ultimately take the same code path in the end, the variety of exceptions tested can guard against future unwanted changes to _NON_TRANSIENT_ERRORS
. (e.g. someone making Timeout
a non-transient error)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that's a great point and good idea, I'll switch up some of the exceptions in those tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this commit which adds a variety of exceptions and stops setting the streaming option in the tests. I added test cases for the test_insert_rows_json_exception_retry_on_transient_error
test to cover all expected retriable exceptions and all listed non-transient exceptions.
param( | ||
exception_type=requests.exceptions.Timeout, | ||
error_args=('some timeout error', )), | ||
insert_response=[ | ||
[{ | ||
'index': 0, 'errors': [{ | ||
'reason': 'invalid' | ||
}] | ||
}], | ||
], | ||
failed_rows=['value1'], | ||
streaming=True), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this is the same test case as the first one in this sequence. What is the significance of streaming=True
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that there were some separate tests, some with streaming true/false since the behavior with pipeline errors is different. I added them to demonstrate/check behavior for streaming true/false. Let me know if you think that's unnecessary, I can remove one or the other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you remember which tests?
I think it should be safe to remove them, the same code path is taken either way. I also don't see the connector checking streaming
besides for validation reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old test_insert_all_unretriable_errors_streaming test used to set opt.streaming = True.
I will take those out, agree that they aren't needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed with this commit
I kept the original 3, There are 2 types of error reasons in that list:
I can definitely add some errors from the bq docs to the list, wouldn't hurt anything, let me know if you think we should go that route. I think a lot of them probably weren't included originally because they'll actually surface a |
Ahh I see, that clarifies it for me. Thank you! |
7c11461
to
65d206d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some exceptions are still missing (e.g. ConnectionError
, Timeout
, DeadlineExceeded
), but I won't block this PR on that.
LGTM, thank you @ajdub508!
Thanks for the approval, @ahmedabu98, should I add a CHANGE.md entry and squash commits now? |
I think this could be added as a bug fix on |
'Bad Request', | ||
'Unauthorized', | ||
'Forbidden', | ||
'Not Found', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not found could be transient, in that it could show up later. I don't think we should treat it as transient though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I think we catch that earlier when validating the create disposition. In Java SDK we also tell the user to make sure the table is created before processing the writes. And if the table gets dropped halfway thru I think it makes sense to treat it as non-transient
65d206d
to
2589f22
Compare
@ahmedabu98 Squashed commits, added |
Run Python 3.9 PostCommit |
Tests are passing, will merge now. Thanks for sticking this one out @ajdub508! |
Would like to get feedback on this approach. Marking the MR as draft because more tests would need to be added and I haven't run existing tests yet, either.
The purpose of this change is to get exceptions caught and re-raised here to output rows to failed_rows rather than just causing a pipeline error. Outputting to failed_rows tag is necessary to avoid streaming pipelines that infinitely retry messages that result in these pipeline errors.
These exceptions cause infinite retries because this self.bigquery_wrapper.insert_rows call ends up raising an exception rather than returning errors due to the exception raised by self.gcp_bq_client.insert_rows_json that prevents the errors list from being instantiated. The exception will bubble up (raised here) to the _flush_batch method where it is unhandled and will produce a pipeline error.
Getting those rows to the failed_rows tag may require doing some things that may or may not be acceptable:
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.