Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support writing to BQ via Storage Write API with SchemaTransforms #23988

Merged
merged 16 commits into from
Dec 20, 2022

Conversation

ahmedabu98
Copy link
Contributor

Adding SchemaTransform capability to BigQueryIO Storage Write API.

@ahmedabu98 ahmedabu98 force-pushed the bq_write_schematransforms branch from acde5b7 to 7f41639 Compare November 8, 2022 15:36
@ahmedabu98 ahmedabu98 force-pushed the bq_write_schematransforms branch from 6e509be to 8177bca Compare December 5, 2022 17:48
@ahmedabu98 ahmedabu98 marked this pull request as ready for review December 6, 2022 00:30
@ahmedabu98
Copy link
Contributor Author

R: @johnjcasey
R: @pabloem

@github-actions
Copy link
Contributor

github-actions bot commented Dec 6, 2022

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@ahmedabu98 ahmedabu98 force-pushed the bq_write_schematransforms branch from 613c463 to 633ef7d Compare December 6, 2022 01:21
@ahmedabu98 ahmedabu98 force-pushed the bq_write_schematransforms branch from 633ef7d to 8aa51d4 Compare December 6, 2022 12:30
Comment on lines 226 to 228
public abstract Builder setNumStorageWriteApiStreams(Integer numStorageWriteApiStreams);

public abstract Builder setNumFileShards(Integer numFileShards);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have default values for these (file shards are not relevant for storage, right?) - and avoid exposing them if possible. In fact, we should use auto sharding if it's supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do, that sounds better. fyi support was added in #16795

Comment on lines 348 to 352
if (configuration.getTriggeringFrequencySeconds() != null) {
write =
write.withTriggeringFrequency(
Duration.standardSeconds(configuration.getTriggeringFrequencySeconds()));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably have a default value for this. Something like:

if input.isUnbounded():
  write = write.withTriggeringFrequency(config.getTriggeringFrequency() == null ? DEFAULT_VALUE : conf.getTriggeringFrequency())

so that this parameter can be optional even for streaming pipelines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, what default do you think is good here? I've often seen 60s

@ahmedabu98
Copy link
Contributor Author

retest this please

@ahmedabu98
Copy link
Contributor Author

Thank you @pabloem, PTAL

@ahmedabu98
Copy link
Contributor Author

Friendly ping :) needs a final review

@ahmedabu98
Copy link
Contributor Author

R: @johnjcasey

@ahmedabu98 ahmedabu98 mentioned this pull request Dec 19, 2022
3 tasks
Copy link
Member

@pabloem pabloem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thanks!

@ahmedabu98
Copy link
Contributor Author

Comments addressed, PTAL :)

@pabloem
Copy link
Member

pabloem commented Dec 20, 2022

lgtm thanks!

@pabloem pabloem merged commit 8dab5dc into apache:master Dec 20, 2022
lostluck pushed a commit to lostluck/beam that referenced this pull request Dec 22, 2022
…ache#23988)

* storage write api provider class skeleton

* storage write schematransform configuration with validation

* small fixes, more validation checks, and setting up the storage write transform

* PCollectionRowTuple with two outputs: failed rows and (errorrors+failed rows)

* spotless

* bq services does not belong as a configuration field

* add AT_LEAST_ONCE semantics. also set test bq services

* test config validation and successful writes

* test for failed rows

* test for failed rows

* some fixes

* experiment

* use autoSharding, set default triggering frequency, use beameam schema

* use Long instead of Integer

* address comments: use autosharding, 5s commit interval, change input and output tags
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants