Skip to content

Commit

Permalink
Adding support for high priority queries to xlang transforms writing … (
Browse files Browse the repository at this point in the history
apache#30869)

* Adding support for high priority queries to xlang transforms writing to spanner

* Comments
  • Loading branch information
pabloem authored Apr 11, 2024
1 parent b018c25 commit c1761ab
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ public static class Configuration extends CrossLanguageConfiguration {
private @Nullable Duration commitDeadline;
private @Nullable Duration maxCumulativeBackoff;
private @Nullable String failureMode;
private Boolean highPriority = false;

public void setTable(String table) {
this.table = table;
Expand Down Expand Up @@ -327,6 +328,10 @@ public void setMaxCumulativeBackoff(@Nullable Long maxCumulativeBackoff) {
public void setFailureMode(@Nullable String failureMode) {
this.failureMode = failureMode;
}

public void setHighPriority(Boolean highPriority) {
this.highPriority = highPriority;
}
}

@Override
Expand All @@ -341,6 +346,9 @@ public PTransform<PCollection<Row>, PDone> buildExternal(
.withDatabaseId(configuration.databaseId)
.withInstanceId(configuration.instanceId);

if (configuration.highPriority) {
writeTransform = writeTransform.withHighPriority();
}
if (configuration.maxBatchSizeBytes != null) {
writeTransform = writeTransform.withBatchSizeBytes(configuration.maxBatchSizeBytes);
}
Expand Down
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/io/gcp/spanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ class WriteToSpannerSchema(NamedTuple):
commit_deadline: Optional[int]
max_cumulative_backoff: Optional[int]
failure_mode: Optional[str]
high_priority: bool


_CLASS_DOC = \
Expand Down Expand Up @@ -405,6 +406,7 @@ def __init__(
max_cumulative_backoff=None,
failure_mode=None,
expansion_service=None,
high_priority=False,
):
max_cumulative_backoff = int(
max_cumulative_backoff) if max_cumulative_backoff else None
Expand All @@ -426,6 +428,7 @@ def __init__(
commit_deadline=commit_deadline,
max_cumulative_backoff=max_cumulative_backoff,
failure_mode=_get_enum_name(failure_mode),
high_priority=high_priority,
),
),
expansion_service=expansion_service or default_io_expansion_service(),
Expand Down Expand Up @@ -459,6 +462,7 @@ def __init__(
max_cumulative_backoff=None,
expansion_service=None,
failure_mode=None,
high_priority=False,
):
max_cumulative_backoff = int(
max_cumulative_backoff) if max_cumulative_backoff else None
Expand All @@ -480,6 +484,7 @@ def __init__(
commit_deadline=commit_deadline,
max_cumulative_backoff=max_cumulative_backoff,
failure_mode=_get_enum_name(failure_mode),
high_priority=high_priority,
),
),
expansion_service=expansion_service or default_io_expansion_service(),
Expand Down Expand Up @@ -513,6 +518,7 @@ def __init__(
max_cumulative_backoff=None,
expansion_service=None,
failure_mode=None,
high_priority=False,
):
max_cumulative_backoff = int(
max_cumulative_backoff) if max_cumulative_backoff else None
Expand All @@ -534,6 +540,7 @@ def __init__(
commit_deadline=commit_deadline,
max_cumulative_backoff=max_cumulative_backoff,
failure_mode=_get_enum_name(failure_mode),
high_priority=high_priority,
),
),
expansion_service=expansion_service or default_io_expansion_service(),
Expand Down Expand Up @@ -567,6 +574,7 @@ def __init__(
max_cumulative_backoff=None,
failure_mode=None,
expansion_service=None,
high_priority=False,
):
max_cumulative_backoff = int(
max_cumulative_backoff) if max_cumulative_backoff else None
Expand All @@ -588,6 +596,7 @@ def __init__(
commit_deadline=commit_deadline,
max_cumulative_backoff=max_cumulative_backoff,
failure_mode=_get_enum_name(failure_mode),
high_priority=high_priority,
),
),
expansion_service=expansion_service or default_io_expansion_service(),
Expand Down Expand Up @@ -621,6 +630,7 @@ def __init__(
max_cumulative_backoff=None,
failure_mode=None,
expansion_service=None,
high_priority=False,
):
max_cumulative_backoff = int(
max_cumulative_backoff) if max_cumulative_backoff else None
Expand All @@ -642,6 +652,7 @@ def __init__(
commit_deadline=commit_deadline,
max_cumulative_backoff=max_cumulative_backoff,
failure_mode=_get_enum_name(failure_mode),
high_priority=high_priority,
),
),
expansion_service=expansion_service or default_io_expansion_service(),
Expand Down

0 comments on commit c1761ab

Please sign in to comment.