Skip to content

Commit

Permalink
Allow avro block size to be configurable in AvroIO.write (apache#24454)
Browse files Browse the repository at this point in the history
* add avro block size config param

* rename api to syncInterval

Co-authored-by: Kanishk Karanawat <[email protected]>
  • Loading branch information
2 people authored and lostluck committed Dec 22, 2022
1 parent 374394e commit e6d131d
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 7 deletions.
28 changes: 26 additions & 2 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -571,7 +572,8 @@ private static <UserT, OutputT> TypedWrite.Builder<UserT, Void, OutputT> default
.setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC)
.setMetadata(ImmutableMap.of())
.setWindowedWrites(false)
.setNoSpilling(false);
.setNoSpilling(false)
.setSyncInterval(DataFileConstants.DEFAULT_SYNC_INTERVAL);
}

@Experimental(Kind.SCHEMAS)
Expand Down Expand Up @@ -1318,6 +1320,8 @@ public abstract static class TypedWrite<UserT, DestinationT, OutputT>

abstract boolean getGenericRecords();

abstract int getSyncInterval();

abstract @Nullable Schema getSchema();

abstract boolean getWindowedWrites();
Expand Down Expand Up @@ -1362,6 +1366,8 @@ abstract Builder<UserT, DestinationT, OutputT> setShardTemplate(

abstract Builder<UserT, DestinationT, OutputT> setGenericRecords(boolean genericRecords);

abstract Builder<UserT, DestinationT, OutputT> setSyncInterval(int syncInterval);

abstract Builder<UserT, DestinationT, OutputT> setSchema(Schema schema);

abstract Builder<UserT, DestinationT, OutputT> setWindowedWrites(boolean windowedWrites);
Expand Down Expand Up @@ -1473,6 +1479,14 @@ public <NewDestinationT> TypedWrite<UserT, NewDestinationT, OutputT> to(
.build();
}

/**
* Sets the approximate number of uncompressed bytes to write in each block for the AVRO
* container format.
*/
public TypedWrite<UserT, DestinationT, OutputT> withSyncInterval(int syncInterval) {
return toBuilder().setSyncInterval(syncInterval).build();
}

/**
* Sets the output schema. Can only be used when the output type is {@link GenericRecord} and
* when not using {@link #to(DynamicAvroDestinations)}.
Expand Down Expand Up @@ -1659,7 +1673,11 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
}
WriteFiles<UserT, DestinationT, OutputT> write =
WriteFiles.to(
new AvroSink<>(tempDirectory, resolveDynamicDestinations(), getGenericRecords()));
new AvroSink<>(
tempDirectory,
resolveDynamicDestinations(),
getGenericRecords(),
getSyncInterval()));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
Expand Down Expand Up @@ -1742,10 +1760,16 @@ public Write<T> to(DynamicAvroDestinations<T, ?, T> dynamicDestinations) {
return new Write<>(inner.to(dynamicDestinations).withFormatFunction(null));
}

/** See {@link TypedWrite#withSyncInterval}. */
public Write<T> withSyncInterval(int syncInterval) {
return new Write<>(inner.withSyncInterval(syncInterval));
}

/** See {@link TypedWrite#withSchema}. */
public Write<T> withSchema(Schema schema) {
return new Write<>(inner.withSchema(schema));
}

/** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */
@Experimental(Kind.FILESYSTEM)
public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
Expand Down
20 changes: 15 additions & 5 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
public class AvroSink<UserT, DestinationT, OutputT>
extends FileBasedSink<UserT, DestinationT, OutputT> {
private final boolean genericRecords;
private final int syncInterval;

@FunctionalInterface
public interface DatumWriterFactory<T> extends Serializable {
Expand All @@ -48,10 +49,12 @@ public interface DatumWriterFactory<T> extends Serializable {
AvroSink(
ValueProvider<ResourceId> outputPrefix,
DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations,
boolean genericRecords) {
boolean genericRecords,
int syncInterval) {
// Avro handles compression internally using the codec.
super(outputPrefix, dynamicDestinations, Compression.UNCOMPRESSED);
this.genericRecords = genericRecords;
this.syncInterval = syncInterval;
}

@Override
Expand All @@ -61,24 +64,27 @@ public DynamicAvroDestinations<UserT, DestinationT, OutputT> getDynamicDestinati

@Override
public WriteOperation<DestinationT, OutputT> createWriteOperation() {
return new AvroWriteOperation<>(this, genericRecords);
return new AvroWriteOperation<>(this, genericRecords, syncInterval);
}

/** A {@link WriteOperation WriteOperation} for Avro files. */
private static class AvroWriteOperation<DestinationT, OutputT>
extends WriteOperation<DestinationT, OutputT> {
private final DynamicAvroDestinations<?, DestinationT, OutputT> dynamicDestinations;
private final boolean genericRecords;
private final int syncInterval;

private AvroWriteOperation(AvroSink<?, DestinationT, OutputT> sink, boolean genericRecords) {
private AvroWriteOperation(
AvroSink<?, DestinationT, OutputT> sink, boolean genericRecords, int syncInterval) {
super(sink);
this.dynamicDestinations = sink.getDynamicDestinations();
this.genericRecords = genericRecords;
this.syncInterval = syncInterval;
}

@Override
public Writer<DestinationT, OutputT> createWriter() throws Exception {
return new AvroWriter<>(this, dynamicDestinations, genericRecords);
return new AvroWriter<>(this, dynamicDestinations, genericRecords, syncInterval);
}
}

Expand All @@ -90,14 +96,17 @@ private static class AvroWriter<DestinationT, OutputT> extends Writer<Destinatio

private final DynamicAvroDestinations<?, DestinationT, OutputT> dynamicDestinations;
private final boolean genericRecords;
private final int syncInterval;

public AvroWriter(
WriteOperation<DestinationT, OutputT> writeOperation,
DynamicAvroDestinations<?, DestinationT, OutputT> dynamicDestinations,
boolean genericRecords) {
boolean genericRecords,
int syncInterval) {
super(writeOperation, MimeTypes.BINARY);
this.dynamicDestinations = dynamicDestinations;
this.genericRecords = genericRecords;
this.syncInterval = syncInterval;
}

@SuppressWarnings("deprecation") // uses internal test functionality.
Expand Down Expand Up @@ -133,6 +142,7 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception {
+ v.getClass().getSimpleName());
}
}
dataFileWriter.setSyncInterval(syncInterval);
dataFileWriter.create(schema, Channels.newOutputStream(channel));
}

Expand Down

0 comments on commit e6d131d

Please sign in to comment.