Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python BQ] Allow setting a fixed number of Storage API streams #28592

Merged
merged 5 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO
// Collects tests with the @pytest.mark.uses_gcp_java_expansion_service decorator
PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Dataflow',
'Run Python_Xlang_Gcp_Dataflow PostCommit', 'Python_Xlang_Gcp_Dataflow (\"Run Python_Xlang_Gcp_Dataflow PostCommit\")', this) {
description('Runs end-to-end cross language GCP IO tests on the Dataflow runner.')
description('Runs end-to-end cross language GCP IO tests on the Dataflow runner. \"Run Python_Xlang_Gcp_Dataflow PostCommit\"')


// Set common parameters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import static PythonTestProperties.CROSS_LANGUAGE_VALIDATES_RUNNER_PYTHON_VERSIO
// Collects tests with the @pytest.mark.uses_gcp_java_expansion_service decorator
PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Xlang_Gcp_Direct',
'Run Python_Xlang_Gcp_Direct PostCommit', 'Python_Xlang_Gcp_Direct (\"Run Python_Xlang_Gcp_Direct PostCommit\")', this) {
description('Runs end-to-end cross language GCP IO tests on the Direct runner.')
description('Runs end-to-end cross language GCP IO tests on the Direct runner. \"Run Python_Xlang_Gcp_Direct PostCommit\"')

// Set common parameters.
commonJobProperties.setTopLevelMainJobProperties(delegate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@ public void validate() {
!Strings.isNullOrEmpty(this.getErrorHandling().getOutput()),
invalidConfigMessage + "Output must not be empty if error handling specified.");
}

if (this.getAutoSharding() != null && this.getAutoSharding()) {
checkArgument(
this.getNumStreams() == 0,
invalidConfigMessage
+ "Cannot set a fixed number of streams when auto-sharding is enabled. Please pick only one of the two options.");
}
}

/**
Expand Down Expand Up @@ -218,11 +225,17 @@ public static Builder builder() {
public abstract Boolean getUseAtLeastOnceSemantics();

@SchemaFieldDescription(
"This option enables using a dynamically determined number of shards to write to "
"This option enables using a dynamically determined number of Storage Write API streams to write to "
+ "BigQuery. Only applicable to unbounded data.")
@Nullable
public abstract Boolean getAutoSharding();

@SchemaFieldDescription(
"If set, the Storage API sink will default to using this number of write streams. " +
"Only applicable to unbounded data.")
@Nullable
public abstract Integer getNumStreams();

@SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
@Nullable
public abstract ErrorHandling getErrorHandling();
Expand All @@ -243,6 +256,8 @@ public abstract static class Builder {

public abstract Builder setAutoSharding(Boolean autoSharding);

public abstract Builder setNumStreams(Integer numStreams);

public abstract Builder setErrorHandling(ErrorHandling errorHandling);

/** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} instance. */
Expand Down Expand Up @@ -321,13 +336,19 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
if (inputRows.isBounded() == IsBounded.UNBOUNDED) {
Long triggeringFrequency = configuration.getTriggeringFrequencySeconds();
Boolean autoSharding = configuration.getAutoSharding();
write =
write.withTriggeringFrequency(
(triggeringFrequency == null || triggeringFrequency <= 0)
? DEFAULT_TRIGGERING_FREQUENCY
: Duration.standardSeconds(triggeringFrequency));
// use default value true for autoSharding if not configured for STORAGE_WRITE_API
if (autoSharding == null || autoSharding) {
Integer numStreams = configuration.getNumStreams();
// Triggering frequency is only applicable for exactly-once
if (!configuration.getUseAtLeastOnceSemantics()) {
write =
write.withTriggeringFrequency(
(triggeringFrequency == null || triggeringFrequency <= 0)
? DEFAULT_TRIGGERING_FREQUENCY
: Duration.standardSeconds(triggeringFrequency));
}
// set num streams if specified, otherwise default to autoSharding
if (numStreams > 0) {
write = write.withNumStorageWriteApiStreams(numStreams);
} else if (autoSharding == null || autoSharding) {
write = write.withAutoSharding();
}
}
Expand Down
38 changes: 25 additions & 13 deletions sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@
from hamcrest.core import assert_that as hamcrest_assert

import apache_beam as beam
from apache_beam.io.external.generate_sequence import GenerateSequence
from apache_beam.io.gcp.bigquery import StorageWriteToBigQuery
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamingMatcher
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.utils.timestamp import Timestamp

# Protect against environments where bigquery library is not available.
Expand Down Expand Up @@ -99,11 +100,13 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
ALL_TYPES_SCHEMA = (
"int:INTEGER,float:FLOAT,numeric:NUMERIC,str:STRING,"
"bool:BOOLEAN,bytes:BYTES,timestamp:TIMESTAMP")
_RUNNER = ""

def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.args = self.test_pipeline.get_full_options_as_args()
self.project = self.test_pipeline.get_option('project')
_RUNNER = PipelineOptions(self.args).get_all_options()['runner']

self.bigquery_client = BigQueryWrapper()
self.dataset_id = '%s_%s_%s' % (
Expand Down Expand Up @@ -244,8 +247,7 @@ def test_write_with_beam_rows(self):
table=table_id, expansion_service=self.expansion_service))
hamcrest_assert(p, bq_matcher)

def run_streaming(
self, table_name, auto_sharding=False, use_at_least_once=False):
def run_streaming(self, table_name, num_streams=0, use_at_least_once=False):
elements = self.ELEMENTS.copy()
schema = self.ALL_TYPES_SCHEMA
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table_name)
Expand All @@ -260,33 +262,43 @@ def run_streaming(
streaming=True,
allow_unsafe_triggers=True)

auto_sharding = num_streams == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took me some time to parse it a bit, nit and might be common practice, but auto_sharding = (num_streams == 0) looks better

with beam.Pipeline(argv=args) as p:
_ = (
p
| GenerateSequence(
start=0, stop=4, expansion_service=self.expansion_service)
| beam.Map(lambda x: elements[x])
| PeriodicImpulse(0, 4, 1)
| beam.Map(lambda t: elements[t])
| beam.io.WriteToBigQuery(
table=table_id,
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
schema=schema,
triggering_frequency=1,
with_auto_sharding=auto_sharding,
num_storage_api_streams=num_streams,
use_at_least_once=use_at_least_once,
expansion_service=self.expansion_service))
hamcrest_assert(p, bq_matcher)

def test_streaming(self):
table = 'streaming'
@unittest.skipUnless(
"dataflowrunner" in _RUNNER.lower(),
"The exactly-once route has the requirement "
"`beam:requirement:pardo:on_window_expiration:v1`, "
"which is currently only supported by the Dataflow runner.")
def test_streaming_with_fixed_num_streams(self):
table = 'streaming_fixed_num_streams'
self.run_streaming(table_name=table, num_streams=4)

@unittest.skip(
"Streaming to the Storage Write API sink with autosharding is broken "
"with Dataflow Runner V2.")
def test_streaming_with_auto_sharding(self):
table = 'streaming_with_auto_sharding'
self.run_streaming(table_name=table)

def test_streaming_with_at_least_once(self):
table = 'streaming'
table = 'streaming_with_at_least_once'
self.run_streaming(table_name=table, use_at_least_once=True)

def test_streaming_with_auto_sharding(self):
table = 'streaming_with_auto_sharding'
self.run_streaming(table_name=table, auto_sharding=True)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1869,6 +1869,7 @@ def __init__(
# TODO(https://github.com/apache/beam/issues/20712): Switch the default
# when the feature is mature.
with_auto_sharding=False,
num_storage_api_streams=0,
ignore_unknown_columns=False,
load_job_project_id=None,
max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE,
Expand Down Expand Up @@ -2018,6 +2019,8 @@ def __init__(
determined number of shards to write to BigQuery. This can be used for
all of FILE_LOADS, STREAMING_INSERTS, and STORAGE_WRITE_API. Only
applicable to unbounded input.
num_storage_api_streams: If set, the Storage API sink will default to
using this number of write streams. Only applicable to unbounded data.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_storage_api_streams: specifies the number of write streams that the Storage API sink will use. This parameter is only applicable to unbounded data.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we check this should be always set now for the unbounded data since it won't work otherwise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we check this should be always set now for the unbounded data since it won't work otherwise.

streaming writes with at-least-once still works without setting this parameter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed the documentation, thanks for the suggestion!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Shall we add something like "This parameter must be set for Storage API writes with the exactly once method."?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hesitate on doing this because conventionally we don't do runner-based checks in the SDK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I clarified it in the public BigQuery connector doc (https://beam.apache.org/documentation/io/built-in/google-bigquery/)

ignore_unknown_columns: Accept rows that contain values that do not match
the schema. The unknown values are ignored. Default is False,
which treats unknown values as errors. This option is only valid for
Expand Down Expand Up @@ -2060,6 +2063,7 @@ def __init__(
self.use_at_least_once = use_at_least_once
self.expansion_service = expansion_service
self.with_auto_sharding = with_auto_sharding
self._num_storage_api_streams = num_storage_api_streams
self.insert_retry_strategy = insert_retry_strategy
self._validate = validate
self._temp_file_format = temp_file_format or bigquery_tools.FileFormat.JSON
Expand Down Expand Up @@ -2259,6 +2263,7 @@ def find_in_nested_dict(schema):
triggering_frequency=triggering_frequency,
use_at_least_once=self.use_at_least_once,
with_auto_sharding=self.with_auto_sharding,
num_storage_api_streams=self._num_storage_api_streams,
expansion_service=self.expansion_service))

if is_rows:
Expand Down Expand Up @@ -2521,6 +2526,7 @@ def __init__(
triggering_frequency=0,
use_at_least_once=False,
with_auto_sharding=False,
num_storage_api_streams=0,
expansion_service=None):
"""Initialize a StorageWriteToBigQuery transform.

Expand Down Expand Up @@ -2558,6 +2564,7 @@ def __init__(
self._triggering_frequency = triggering_frequency
self._use_at_least_once = use_at_least_once
self._with_auto_sharding = with_auto_sharding
self._num_storage_api_streams = num_storage_api_streams
self._expansion_service = (
expansion_service or _default_io_expansion_service())
self.schematransform_config = SchemaAwareExternalTransform.discover_config(
Expand All @@ -2569,6 +2576,7 @@ def expand(self, input):
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
autoSharding=self._with_auto_sharding,
numStreams=self._num_storage_api_streams,
createDisposition=self._create_disposition,
table=self._table,
triggeringFrequencySeconds=self._triggering_frequency,
Expand Down
Loading