diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java index 38cd97da860a..809d7a275512 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java @@ -288,6 +288,7 @@ public static class Configuration extends CrossLanguageConfiguration { private @Nullable Integer groupingFactor; private @Nullable Duration commitDeadline; private @Nullable Duration maxCumulativeBackoff; + private @Nullable String failureMode; public void setTable(String table) { this.table = table; @@ -322,6 +323,10 @@ public void setMaxCumulativeBackoff(@Nullable Long maxCumulativeBackoff) { this.maxCumulativeBackoff = Duration.standardSeconds(maxCumulativeBackoff); } } + + public void setFailureMode(@Nullable String failureMode) { + this.failureMode = failureMode; + } } @Override @@ -361,6 +366,11 @@ public PTransform, PDone> buildExternal( writeTransform = writeTransform.withMaxCumulativeBackoff(configuration.maxCumulativeBackoff); } + if (configuration.failureMode != null) { + writeTransform = + writeTransform.withFailureMode( + SpannerIO.FailureMode.valueOf(configuration.failureMode)); + } return SpannerIO.WriteRows.of(writeTransform, operation, configuration.table); } } diff --git a/sdks/python/apache_beam/io/gcp/spanner.py b/sdks/python/apache_beam/io/gcp/spanner.py index c16daa4448b1..51c7fc65c171 100644 --- a/sdks/python/apache_beam/io/gcp/spanner.py +++ b/sdks/python/apache_beam/io/gcp/spanner.py @@ -124,6 +124,11 @@ class TimestampBoundMode(Enum): STRONG = auto() +class FailureMode(Enum): + FAIL_FAST = auto() + REPORT_FAILURES = auto() + + class ReadFromSpannerSchema(NamedTuple): instance_id: str database_id: str @@ -282,6 +287,7 @@ class WriteToSpannerSchema(NamedTuple): emulator_host: Optional[str] commit_deadline: Optional[int] max_cumulative_backoff: Optional[int] + failure_mode: Optional[str] _CLASS_DOC = \ @@ -346,6 +352,11 @@ class {row_type}(NamedTuple): (15min). If the mutations still have not been written after this time, they are treated as a failure, and handled according to the setting of failure_mode. Pass seconds as value. + :param failure_mode: Specifies the behavior for mutations that fail to be + written to Spanner. Default is FAIL_FAST. When FAIL_FAST is set, + an exception will be thrown for any failed mutation. When REPORT_FAILURES + is set, processing will continue instead of throwing an exception. Note + that REPORT_FAILURES can cause data loss if used incorrectly. :param expansion_service: The address (host:port) of the ExpansionService. """ @@ -392,6 +403,7 @@ def __init__( emulator_host=None, commit_deadline=None, max_cumulative_backoff=None, + failure_mode=None, expansion_service=None, ): max_cumulative_backoff = int( @@ -413,6 +425,7 @@ def __init__( emulator_host=emulator_host, commit_deadline=commit_deadline, max_cumulative_backoff=max_cumulative_backoff, + failure_mode=_get_enum_name(failure_mode), ), ), expansion_service=expansion_service or default_io_expansion_service(), @@ -445,6 +458,7 @@ def __init__( commit_deadline=None, max_cumulative_backoff=None, expansion_service=None, + failure_mode=None, ): max_cumulative_backoff = int( max_cumulative_backoff) if max_cumulative_backoff else None @@ -465,6 +479,7 @@ def __init__( emulator_host=emulator_host, commit_deadline=commit_deadline, max_cumulative_backoff=max_cumulative_backoff, + failure_mode=_get_enum_name(failure_mode), ), ), expansion_service=expansion_service or default_io_expansion_service(), @@ -497,6 +512,7 @@ def __init__( commit_deadline=None, max_cumulative_backoff=None, expansion_service=None, + failure_mode=None, ): max_cumulative_backoff = int( max_cumulative_backoff) if max_cumulative_backoff else None @@ -517,6 +533,7 @@ def __init__( emulator_host=emulator_host, commit_deadline=commit_deadline, max_cumulative_backoff=max_cumulative_backoff, + failure_mode=_get_enum_name(failure_mode), ), ), expansion_service=expansion_service or default_io_expansion_service(), @@ -548,6 +565,7 @@ def __init__( emulator_host=None, commit_deadline=None, max_cumulative_backoff=None, + failure_mode=None, expansion_service=None, ): max_cumulative_backoff = int( @@ -569,6 +587,7 @@ def __init__( emulator_host=emulator_host, commit_deadline=commit_deadline, max_cumulative_backoff=max_cumulative_backoff, + failure_mode=_get_enum_name(failure_mode), ), ), expansion_service=expansion_service or default_io_expansion_service(), @@ -600,6 +619,7 @@ def __init__( emulator_host=None, commit_deadline=None, max_cumulative_backoff=None, + failure_mode=None, expansion_service=None, ): max_cumulative_backoff = int( @@ -621,6 +641,7 @@ def __init__( emulator_host=emulator_host, commit_deadline=commit_deadline, max_cumulative_backoff=max_cumulative_backoff, + failure_mode=_get_enum_name(failure_mode), ), ), expansion_service=expansion_service or default_io_expansion_service(), diff --git a/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py b/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py index 5d701052965b..43a74f170531 100644 --- a/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py +++ b/sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py @@ -234,6 +234,7 @@ def run_write_pipeline( database_id=self.database_id, project_id=self.project_id, table=self.table, + failure_mode=beam.io.gcp.spanner.FailureMode.REPORT_FAILURES, emulator_host=self.spanner_helper.get_emulator_host(), ))