Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JdbcIO] Adding disableAutoCommit flag #32988

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* (Java) Fixed tearDown not invoked when DoFn throws on Portable Runners ([#18592](https://github.com/apache/beam/issues/18592), [#31381](https://github.com/apache/beam/issues/31381)).
* (Java) Fixed protobuf error with MapState.remove() in Dataflow Streaming Java Legacy Runner without Streaming Engine ([#32892](https://github.com/apache/beam/issues/32892)).
* Adding flag to support conditionally disabling auto-commit in JdbcIO ReadFn ([#31111](https://github.com/apache/beam/issues/31111))

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ public static <T> Read<T> read() {
return new AutoValue_JdbcIO_Read.Builder<T>()
.setFetchSize(DEFAULT_FETCH_SIZE)
.setOutputParallelization(true)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.build();
}

Expand All @@ -341,6 +342,7 @@ public static ReadRows readRows() {
return new AutoValue_JdbcIO_ReadRows.Builder()
.setFetchSize(DEFAULT_FETCH_SIZE)
.setOutputParallelization(true)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setStatementPreparator(ignored -> {})
.build();
}
Expand All @@ -356,6 +358,7 @@ public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
return new AutoValue_JdbcIO_ReadAll.Builder<ParameterT, OutputT>()
.setFetchSize(DEFAULT_FETCH_SIZE)
.setOutputParallelization(true)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.build();
}

Expand All @@ -372,6 +375,7 @@ public static <T, PartitionColumnT> ReadWithPartitions<T, PartitionColumnT> read
.setPartitionColumnType(partitioningColumnType)
.setNumPartitions(DEFAULT_NUM_PARTITIONS)
.setFetchSize(DEFAULT_FETCH_SIZE)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setUseBeamSchema(false)
.build();
}
Expand All @@ -389,6 +393,7 @@ public static <T, PartitionColumnT> ReadWithPartitions<T, PartitionColumnT> read
.setPartitionsHelper(partitionsHelper)
.setNumPartitions(DEFAULT_NUM_PARTITIONS)
.setFetchSize(DEFAULT_FETCH_SIZE)
.setDisableAutoCommit(DEFAULT_DISABLE_AUTO_COMMIT)
.setUseBeamSchema(false)
.build();
}
Expand All @@ -400,6 +405,7 @@ public static <T> ReadWithPartitions<T, Long> readWithPartitions() {
private static final long DEFAULT_BATCH_SIZE = 1000L;
private static final long DEFAULT_MAX_BATCH_BUFFERING_DURATION = 200L;
private static final int DEFAULT_FETCH_SIZE = 50_000;
private static final boolean DEFAULT_DISABLE_AUTO_COMMIT = true;
// Default values used from fluent backoff.
private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(1);
private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardDays(1000);
Expand Down Expand Up @@ -733,6 +739,9 @@ public abstract static class ReadRows extends PTransform<PBegin, PCollection<Row
@Pure
abstract boolean getOutputParallelization();

@Pure
abstract boolean getDisableAutoCommit();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -748,6 +757,8 @@ abstract Builder setDataSourceProviderFn(

abstract Builder setOutputParallelization(boolean outputParallelization);

abstract Builder setDisableAutoCommit(boolean disableAutoCommit);

abstract ReadRows build();
}

Expand Down Expand Up @@ -799,6 +810,15 @@ public ReadRows withOutputParallelization(boolean outputParallelization) {
return toBuilder().setOutputParallelization(outputParallelization).build();
}

/**
* Whether to disable auto commit on read. Defaults to true if not provided. The need for this
* config varies depending on the database platform. Informix requires this to be set to false
* while Postgres requires this to be set to true.
*/
public ReadRows withDisableAutoCommit(boolean disableAutoCommit) {
return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
}

@Override
public PCollection<Row> expand(PBegin input) {
ValueProvider<String> query = checkStateNotNull(getQuery(), "withQuery() is required");
Expand All @@ -816,6 +836,7 @@ public PCollection<Row> expand(PBegin input) {
.withCoder(RowCoder.of(schema))
.withRowMapper(SchemaUtil.BeamRowMapper.of(schema))
.withFetchSize(getFetchSize())
.withDisableAutoCommit(getDisableAutoCommit())
.withOutputParallelization(getOutputParallelization())
.withStatementPreparator(checkStateNotNull(getStatementPreparator())));
rows.setRowSchema(schema);
Expand Down Expand Up @@ -872,6 +893,9 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
@Pure
abstract boolean getOutputParallelization();

@Pure
abstract boolean getDisableAutoCommit();

@Pure
abstract Builder<T> toBuilder();

Expand All @@ -892,6 +916,8 @@ abstract Builder<T> setDataSourceProviderFn(

abstract Builder<T> setOutputParallelization(boolean outputParallelization);

abstract Builder<T> setDisableAutoCommit(boolean disableAutoCommit);

abstract Read<T> build();
}

Expand Down Expand Up @@ -958,6 +984,15 @@ public Read<T> withOutputParallelization(boolean outputParallelization) {
return toBuilder().setOutputParallelization(outputParallelization).build();
}

/**
* Whether to disable auto commit on read. Defaults to true if not provided. The need for this
* config varies depending on the database platform. Informix requires this to be set to false
* while Postgres requires this to be set to true.
*/
public Read<T> withDisableAutoCommit(boolean disableAutoCommit) {
return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
}

@Override
public PCollection<T> expand(PBegin input) {
ValueProvider<String> query = checkArgumentNotNull(getQuery(), "withQuery() is required");
Expand All @@ -974,6 +1009,7 @@ public PCollection<T> expand(PBegin input) {
.withRowMapper(rowMapper)
.withFetchSize(getFetchSize())
.withOutputParallelization(getOutputParallelization())
.withDisableAutoCommit(getDisableAutoCommit())
.withParameterSetter(
(element, preparedStatement) -> {
if (getStatementPreparator() != null) {
Expand Down Expand Up @@ -1029,6 +1065,8 @@ public abstract static class ReadAll<ParameterT, OutputT>

abstract boolean getOutputParallelization();

abstract boolean getDisableAutoCommit();

abstract Builder<ParameterT, OutputT> toBuilder();

@AutoValue.Builder
Expand All @@ -1049,6 +1087,8 @@ abstract Builder<ParameterT, OutputT> setParameterSetter(

abstract Builder<ParameterT, OutputT> setOutputParallelization(boolean outputParallelization);

abstract Builder<ParameterT, OutputT> setDisableAutoCommit(boolean disableAutoCommit);

abstract ReadAll<ParameterT, OutputT> build();
}

Expand Down Expand Up @@ -1127,6 +1167,15 @@ public ReadAll<ParameterT, OutputT> withOutputParallelization(boolean outputPara
return toBuilder().setOutputParallelization(outputParallelization).build();
}

/**
* Whether to disable auto commit on read. Defaults to true if not provided. The need for this
* config varies depending on the database platform. Informix requires this to be set to false
* while Postgres requires this to be set to true.
*/
public ReadAll<ParameterT, OutputT> withDisableAutoCommit(boolean disableAutoCommit) {
return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
}

private @Nullable Coder<OutputT> inferCoder(
CoderRegistry registry, SchemaRegistry schemaRegistry) {
if (getCoder() != null) {
Expand Down Expand Up @@ -1173,7 +1222,8 @@ public PCollection<OutputT> expand(PCollection<ParameterT> input) {
checkStateNotNull(getQuery()),
checkStateNotNull(getParameterSetter()),
checkStateNotNull(getRowMapper()),
getFetchSize())))
getFetchSize(),
getDisableAutoCommit())))
.setCoder(coder);

if (getOutputParallelization()) {
Expand Down Expand Up @@ -1254,6 +1304,9 @@ public abstract static class ReadWithPartitions<T, PartitionColumnT>
@Pure
abstract @Nullable JdbcReadWithPartitionsHelper<PartitionColumnT> getPartitionsHelper();

@Pure
abstract boolean getDisableAutoCommit();

@Pure
abstract Builder<T, PartitionColumnT> toBuilder();

Expand Down Expand Up @@ -1287,6 +1340,8 @@ abstract Builder<T, PartitionColumnT> setPartitionColumnType(
abstract Builder<T, PartitionColumnT> setPartitionsHelper(
JdbcReadWithPartitionsHelper<PartitionColumnT> partitionsHelper);

abstract Builder<T, PartitionColumnT> setDisableAutoCommit(boolean disableAutoCommit);

abstract ReadWithPartitions<T, PartitionColumnT> build();
}

Expand Down Expand Up @@ -1337,6 +1392,16 @@ public ReadWithPartitions<T, PartitionColumnT> withFetchSize(int fetchSize) {
return toBuilder().setFetchSize(fetchSize).build();
}

/**
* Whether to disable auto commit on read. Defaults to true if not provided. The need for this
* config varies depending on the database platform. Informix requires this to be set to false
* while Postgres requires this to be set to true.
*/
public ReadWithPartitions<T, PartitionColumnT> withDisableAutoCommit(
boolean disableAutoCommit) {
return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
}

/** Data output type is {@link Row}, and schema is auto-inferred from the database. */
public ReadWithPartitions<T, PartitionColumnT> withRowOutput() {
return toBuilder().setUseBeamSchema(true).build();
Expand Down Expand Up @@ -1419,7 +1484,8 @@ && getLowerBound() instanceof Comparable<?>) {
.withQuery(query)
.withDataSourceProviderFn(dataSourceProviderFn)
.withRowMapper(checkStateNotNull(partitionsHelper))
.withFetchSize(getFetchSize()))
.withFetchSize(getFetchSize())
.withDisableAutoCommit(getDisableAutoCommit()))
.apply(
MapElements.via(
new SimpleFunction<
Expand Down Expand Up @@ -1487,7 +1553,8 @@ public KV<Long, KV<PartitionColumnT, PartitionColumnT>> apply(
.withRowMapper(rowMapper)
.withFetchSize(getFetchSize())
.withParameterSetter(checkStateNotNull(partitionsHelper))
.withOutputParallelization(false);
.withOutputParallelization(false)
.withDisableAutoCommit(getDisableAutoCommit());

if (getUseBeamSchema()) {
checkStateNotNull(schema);
Expand Down Expand Up @@ -1537,6 +1604,7 @@ private static class ReadFn<ParameterT, OutputT> extends DoFn<ParameterT, Output
private final PreparedStatementSetter<ParameterT> parameterSetter;
private final RowMapper<OutputT> rowMapper;
private final int fetchSize;
private final boolean disableAutoCommit;

private @Nullable DataSource dataSource;
private @Nullable Connection connection;
Expand All @@ -1546,12 +1614,14 @@ private ReadFn(
ValueProvider<String> query,
PreparedStatementSetter<ParameterT> parameterSetter,
RowMapper<OutputT> rowMapper,
int fetchSize) {
int fetchSize,
boolean disableAutoCommit) {
this.dataSourceProviderFn = dataSourceProviderFn;
this.query = query;
this.parameterSetter = parameterSetter;
this.rowMapper = rowMapper;
this.fetchSize = fetchSize;
this.disableAutoCommit = disableAutoCommit;
}

@Setup
Expand All @@ -1577,8 +1647,12 @@ public void processElement(ProcessContext context) throws Exception {
Connection connection = getConnection();
// PostgreSQL requires autocommit to be disabled to enable cursor streaming
// see https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
LOG.info("Autocommit has been disabled");
connection.setAutoCommit(false);
// This option is configurable as Informix will error
// if calling setAutoCommit on a non-logged database
if (disableAutoCommit) {
LOG.info("Autocommit has been disabled");
connection.setAutoCommit(false);
cwashcraft marked this conversation as resolved.
Show resolved Hide resolved
}
try (PreparedStatement statement =
connection.prepareStatement(
query.get(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
if (outputParallelization != null) {
readRows = readRows.withOutputParallelization(outputParallelization);
}
Boolean disableAutoCommit = config.getDisableAutoCommit();
if (disableAutoCommit != null) {
readRows = readRows.withDisableAutoCommit(disableAutoCommit);
}
return PCollectionRowTuple.of("output", input.getPipeline().apply(readRows));
}
}
Expand Down Expand Up @@ -174,6 +178,9 @@ public abstract static class JdbcReadSchemaTransformConfiguration implements Ser
@Nullable
public abstract Boolean getOutputParallelization();

@Nullable
public abstract Boolean getDisableAutoCommit();

@Nullable
public abstract String getDriverJars();

Expand Down Expand Up @@ -238,6 +245,8 @@ public abstract static class Builder {

public abstract Builder setOutputParallelization(Boolean value);

public abstract Builder setDisableAutoCommit(Boolean value);

public abstract Builder setDriverJars(String value);

public abstract JdbcReadSchemaTransformConfiguration build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public Schema configurationSchema() {
.addNullableField("readQuery", FieldType.STRING)
.addNullableField("writeStatement", FieldType.STRING)
.addNullableField("fetchSize", FieldType.INT16)
.addNullableField("disableAutoCommit", FieldType.BOOLEAN)
cwashcraft marked this conversation as resolved.
Show resolved Hide resolved
.addNullableField("outputParallelization", FieldType.BOOLEAN)
.addNullableField("autosharding", FieldType.BOOLEAN)
// Partitioning support. If you specify a partition column we will use that instead of
Expand Down Expand Up @@ -140,6 +141,11 @@ public PCollection<Row> expand(PBegin input) {
readRows = readRows.withFetchSize(fetchSize);
}

@Nullable Boolean disableAutoCommit = config.getBoolean("disableAutoCommit");
if (disableAutoCommit != null) {
readRows = readRows.withDisableAutoCommit(disableAutoCommit);
}

return input.apply(readRows);
} else {

Expand All @@ -163,6 +169,11 @@ public PCollection<Row> expand(PBegin input) {
readRows = readRows.withOutputParallelization(outputParallelization);
}

@Nullable Boolean disableAutoCommit = config.getBoolean("disableAutoCommit");
if (disableAutoCommit != null) {
readRows = readRows.withDisableAutoCommit(disableAutoCommit);
}

return input.apply(readRows);
}
}
Expand Down
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/io/jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def default_io_expansion_service(classpath=None):
('read_query', typing.Optional[str]),
('write_statement', typing.Optional[str]),
('fetch_size', typing.Optional[np.int16]),
('disable_autocommit', typing.Optional[bool]),
('output_parallelization', typing.Optional[bool]),
('autosharding', typing.Optional[bool]),
('partition_column', typing.Optional[str]),
Expand Down Expand Up @@ -236,6 +237,7 @@ def __init__(
write_statement=statement,
read_query=None,
fetch_size=None,
disable_autocommit=None,
output_parallelization=None,
autosharding=autosharding,
max_connections=max_connections,
Expand Down Expand Up @@ -286,6 +288,7 @@ def __init__(
username,
password,
query=None,
disable_autocommit=None,
output_parallelization=None,
fetch_size=None,
partition_column=None,
Expand All @@ -305,6 +308,7 @@ def __init__(
:param username: database username
:param password: database password
:param query: sql query to be executed
:param disable_autocommit: disable autocommit on read
:param output_parallelization: is output parallelization on
:param fetch_size: how many rows to fetch
:param partition_column: enable partitioned reads by splitting on this
Expand Down Expand Up @@ -350,6 +354,7 @@ def __init__(
write_statement=None,
read_query=query,
fetch_size=fetch_size,
disable_autocommit=disable_autocommit,
output_parallelization=output_parallelization,
autosharding=None,
max_connections=max_connections,
Expand Down
Loading