Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request]: Enable withFormatRecordOnFailureFunction() equivalent for BigQuery STORAGE_WRITE_API #31354

Closed
1 of 16 tasks
sarinasij opened this issue May 21, 2024 · 3 comments · Fixed by #31659
Closed
1 of 16 tasks

Comments

@sarinasij
Copy link

What would you like to happen?

We have a dataflow pipeline that reads data from PubSub and writes to BigQuery.

Current status:
BigQuery write method: STREAMING_INSERTS
Function used to get BigQuery deadletter: getFailedInsertsWithErr()
Deadletter format function: withFormatRecordOnFailureFunction()

We have a pipeline that writes multiple events dynamically to different BQ destination tables. withFormatRecordOnFailureFunction() is currently used to transform the bad inserts to the desired format for further deadletter processing - rather than return the original TableRow itself, we provide a customized function encoding the returned TableRow object by adding eventType field thus we can figure out which table it writes to.

As we are enhancing the pipeline by using STORAGE_WRITE_API, we are facing the below issue.

BigQuery write method: STORAGE_WRITE_API
Function used to get BigQuery deadletter: getFailedStorageApiInserts() (as getFailedInsertsWithErr() cannot be used for STORAGE_WRITE_API)
Deadletter format function: N/A

Without a withFormatRecordOnFailureFunction() equivalent, we cannot format the failed inserts TableRows which is a blocker for our upgrade.

How this might work:
Add withFormatRecordOnFailureFunction() equivalent for BigQuerySTORAGE_WRITE_API

As we are dynamically writing to BigQuery tables, the Pcollection may contains multiple eventTypes, we will lose the eventType info if the failure transformation function cannot be added to those failure inserts.

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@Amar3tto
Copy link
Contributor

.take-issue

@Amar3tto
Copy link
Contributor

There are some difficulties for making such withFormatRecordOnFailureFunction() equivalent for BigQuerySTORAGE_WRITE_API:

  • StorageApi has different implementations:
    • BeamRow implementation that transforms original ElementT to Row
    • Proto implementation that transforms original ElementT to byte[]
    • Avro implementation that transforms original ElementT to GenericRecord
    • TableRow implementation that transforms original ElementT to TableRow
  • After that element to payload transformation, at the write records step, different BigQueryStorageApiInsertErrors may occur (e.g. payload too large, etc.)
  • At this point (for any implementation), payload will be transformed to TableRow in order to output to failedRowsTag.
  • So it makes no sense to have withFormatRecordOnFailureFunction() from ElementT to TableRow (as it is implemented for STREAMING_INSERTS method), because at this step we only have a payload
  • There is a possibility to add withFormatRecordOnFailureFunction() from TableRow to TableRow, if it makes sense

@sarinasij Could you please share an example how do you use withFormatRecordOnFailureFunction() with STREAMING_INSERTS method? I think it may help to clarify next steps

@sarinasij
Copy link
Author

@Amar3tto

Below is our current usage description:

To facilitate the processing of failed records, we require the original AvroGenericRecordMessage for further deadletter handling. It's important to note that the AvroGenericRecordMessage contains more information than the TableRow in BigQuery. Specifically, it includes the eventType, which is not part of the table columns but is crucial for dead letter metrics.

To address this, we utilize the withFormatRecordOnFailureFunction() to construct a dummy TableRow that can be decoded back into the original AvroGenericRecordMessage.

return BigQueryIO.<AvroGenericRecordMessage>write()
           .to(input -> getTableDestination(input, outputTableProject, outputTableDataset, outputTableMap))
           .withMethod(STORAGE_WRITE_API)
           .withTriggeringFrequency(Duration.standardSeconds(bqWindowInSec))
           .withNumStorageWriteApiStreams(numStreams)
           .withFormatFunction(AvroGenericRecordToBigQuery::formatAvroToTableRow)
           .withFormatRecordOnFailureFunction(AvroGenericRecordToBigQuery::formatAvroToFailedTableRow)
           .withCreateDisposition(CREATE_NEVER)
           .withWriteDisposition(WRITE_APPEND)
           .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors());

   private static TableRow formatAvroToFailedTableRow(final AvroGenericRecordMessage message) {
       // Build a dummy table row containing the original Avro payload so it can be later used to
       // dead letter the event.  Unfortunately, we need to encode this into a TableRow since the BigQueryIO
       // module doesn't provide any other way to get back to the original AvroGenericRecordMessage object
       // that failed insertion.
       return FormatBigQueryDeadLetters.**encodeAvroRecordToTableRow**(message);
   }

   public static TableRow encodeAvroRecordToTableRow(AvroGenericRecordMessage record) {
       var row = new TableRow();
       row.set(BQ_TABLEROW_COLUMN_EVENT_NAME, record.getEventType());
       row.set(BQ_TABLEROW_COLUMN_PAYLOAD, Base64.getEncoder().encodeToString(record.getBinaryEncoding()));

       return row;
   }

With the case I would think "add withFormatRecordOnFailureFunction() from TableRow to TableRow" might not work since we need additional info for the failed rows (which is from the original AvroGenericRecordMessage).
Please kindly advise how we can implement the same with STORAGE_WRITE_API method.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants