Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery FILE_LOADS failed with 400 error in streaming mode in Python #20824

Closed
damccorm opened this issue Jun 4, 2022 · 12 comments · Fixed by #23710
Closed

BigQuery FILE_LOADS failed with 400 error in streaming mode in Python #20824

damccorm opened this issue Jun 4, 2022 · 12 comments · Fixed by #23710

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 4, 2022

 

We are using FILE_LOADS to write to BigQuery in streaming mode using Python. 
after running for about 1 hours, beam job throws an exception with regards to RuntimeError: apitools.base.py.exceptions.HttpBadRequestError including error message "Load configuration must specify at least one source URI".


//
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 289, in _execute
    response = task()
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 362, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 606, in do_instruction
    return getattr(self, request_type)(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 644, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 999, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
 
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228,
in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py",
line 357, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py",
line 359, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py",
line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py",
line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py",
line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py",
line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py",
line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py",
line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py",
line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py",
line 768, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py",
line 891, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py",
line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_file_loads.py",
line 520, in process
    job_reference = self.bq_wrapper.perform_load_job(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py",
line 825, in perform_load_job
    return self._insert_load_job(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/retry.py",
line 260, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py",
line 438, in _insert_load_job
    return self._start_job(request).jobReference
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py",
line 449, in _start_job
    response = self.client.jobs.Insert(request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py",
line 345, in Insert
    return self._RunMethod(
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py",
line 731, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)

 File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse

   self.__ProcessHttpResponse(method_config, http_response, request))
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py",
line 603, in __ProcessHttpResponse
    raise exceptions.HttpError.FromResponse(
RuntimeError: apitools.base.py.exceptions.HttpBadRequestError:
HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/my-project/jobs?alt=json>:
response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8',
'date': 'Tue, 09 Mar 2021 09:31:01 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection':
'0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked',
'status': '400', 'content-length': '318', '-content-encoding': 'gzip'}>, content <{
  "error": {

   "code": 400,
    "message": "Load configuration must specify at least one source URI",
    "errors":
[
      {
        "message": "Load configuration must specify at least one source URI",
        "domain":
"global",
        "reason": "invalid"
      }
    ],
    "status": "INVALID_ARGUMENT"
  }
}

 

Perhaps, this can be fixed by validating the input value [files(= element[1])|https://github.com/apache/beam/blob/v2.28.0/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L469] is not empty.

Imported from Jira BEAM-11939. Original Jira may contain additional context.
Reported by: yshimizu.

@tvalentyn
Copy link
Contributor

is this still an issue?
cc: @pabloem

@rizenfrmtheashes
Copy link

Is this issue being actioned? We encounter this issue whenever we move a pipeline into draining. This issue prevents us from using the draining status as a mechanism for checking if a pipeline has fully cleared its backlog, and requires us to make estimations for when a pipeline is done, before directly canceling. This is not ideal. I can provide more info if requested!

@tvalentyn
Copy link
Contributor

@rizenfrmtheashes could you provide a code sample we could use as a repro?

@tvalentyn
Copy link
Contributor

cc: @johnjcasey @BjornPrime

@tvalentyn tvalentyn assigned johnjcasey and unassigned pabloem Oct 10, 2022
@rizenfrmtheashes
Copy link

rizenfrmtheashes commented Oct 17, 2022

sure. We ended up Dumping a LARGE amount of data with a specified schema through a Reshuffle and then into a bigquery file loads with dynamic table destinations

        bq_file_loads_output = (
            input_data
            | "Fusion Break Pre BQ" >> beam.transforms.util.Reshuffle()
            | "Write All RowsBigQuery"
            >> WriteToBigQuery(
                table=lambda row: row["table"], # we inlcude the table name in the row for easy dynamic table destinations
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                schema={"fields": static_schema_here},
                insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR,
                method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
                triggering_frequency=120, # low in testing, closer to 600 in prod
            )
        )

We used a similar input like described in this bug report doc here. (The bug in this doc in particular was reported #23104 and mostly fixed in #23012 )

When we set this job to draining after writing 10s of thousands of rows, this is the stacktrace we get

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
    response = task()
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 598, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1004, in process_bundle
    element.data)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 837, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 981, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1571, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "/usr/local/lib/python3.7/site-packages/steps/bigquery_file_loads_patch_40.py", line 724, in process
    load_job_project_id=self.load_job_project_id,
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1019, in perform_load_job
    job_labels=job_labels)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 275, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 538, in _insert_load_job
    return self._start_job(request, stream=source_stream).jobReference
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 557, in _start_job
    response = self.client.jobs.Insert(request, upload=upload)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 345, in Insert
    upload=upload, upload_config=upload_config)
  File "/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)
  File "/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
    self.__ProcessHttpResponse(method_config, http_response, request))
  File "/usr/local/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse
    http_response, method_config=method_config, request=request)
RuntimeError: apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/REDACTED_PROJECT_NAME/jobs?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Mon, 17 Oct 2022 16:02:07 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '318', '-content-encoding': 'gzip'}>, content <{
  "error": {
    "code": 400,
    "message": "Load configuration must specify at least one source URI",
    "errors": [
      {
        "message": "Load configuration must specify at least one source URI",
        "domain": "global",
        "reason": "invalid"
      }
    ],
    "status": "INVALID_ARGUMENT"
  }
}

as a note bigquery_file_loads_patch_40.py is just a reference to a copy/pasted version of the source bigquery_file_loads.py file in the gcp/io section of the SDK that we used to backport fixes from newer versions of beam (like #23012). We did dependency checking to make sure the backported fixes were okay.

(also redacting org names in stacktraces)

We are using beam version 2.40 and dataflow v2 runner when this happened.

@tvalentyn
Copy link
Contributor

This is very helpful, thank you so much, @rizenfrmtheashes .

As a next step, we should identify whether the error is caused in the drain logic, or this is a gap in BQ IO implementation (incorrect usage of BQ apis during the call in draining phase). I suspect it's the latter . Will try to find an owner for this to look closer.

@rizenfrmtheashes
Copy link

if you want a pip-tools style dependency/requirements file we use to build the container that runs in this dataflow job, I can provide that too. We used pip-tools to find the minimum versions that can safely run with beam 2.40 and maybe a version of a base GCP python package might be causing this issue.

@ahmedabu98
Copy link
Contributor

There was a similar issue a few months ago where a pipeline in draining was running into similar errors. This connector used to throw and early error when a source URI (ie. file to load to BQ) is not provided. The other issue was mitigated with https://github.com/apache/beam/pull/17566/files where the error was replaced with a warning.

In contrast, the error in this issue looks like it's from BigQuery...

Looks like this is running into a similar problem where there are no files to load since the pipeline is in draining phase, but load job requests are still being sent.

@ahmedabu98
Copy link
Contributor

We could perform a simple check to see if files is not None before performing the load job here

@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Oct 18, 2022

Could reproduce this with the following pipeline:

    destination = "<myproject>:<dataset>.<table>"
    empty_files = []
    load_job_prefix = "test_prefix"

    with beam.Pipeline() as p:
      (p
       | beam.Create([(destination, empty_files)])
       | beam.ParDo(bqfl.TriggerLoadJobs(), load_job_prefix))

@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Oct 18, 2022

I think it would be safe to log a warning and ignore the bundle in TriggerLoadJobs if no files are there.

Update:
Can catch it earlier in PartitionFiles

@ahmedabu98
Copy link
Contributor

Although I'm having trouble identifying what exactly causes us to end up with empty files during draining.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants