Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2778: Make AzureBlobOutputStream buffer initialization size configurable. #1662

Merged
merged 8 commits into from
Jun 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ Configs for producing to [Azure Blob Storage](https://azure.microsoft.com/en-us/
|systems.**_system-name_**.azureblob.maxFlushThresholdSize|10485760 (10 MB)|max size of the uncompressed block to be uploaded in bytes. Maximum size allowed by Azure is 100MB.|
|systems.**_system-name_**.azureblob.maxBlobSize|Long.MAX_VALUE (unlimited)|max size of the uncompressed blob in bytes.<br>If default value then size is unlimited capped only by Azure BlockBlob size of 4.75 TB (100 MB per block X 50,000 blocks).|
|systems.**_system-name_**.azureblob.maxMessagesPerBlob|Long.MAX_VALUE (unlimited)|max number of messages per blob.|
|systems.**_system-name_**.azureblob.initBufferSize.bytes|32|The amount of memory pre-allocated for `org.apache.samza.system.azureblob.avro.AzureBlobOutputStream` buffers.<br>Values must be between 32 (default) and `maxBlobSize`.<br>This should be increased for fast-filling buffers when buffer resize operations affect performance. Large values can lead to inefficent memory allocation with the G1 garbage collector. If the size is >= half of a region size, `G1HeapRegionSize`, consider switching to the ParallelGC.|
|systems.**_system-name_**.azureblob.threadPoolCount|2|number of threads for the asynchronous uploading of blocks.|
|systems.**_system-name_**.azureblob.blockingQueueSize|Thread Pool Count * 2|size of the queue to hold blocks ready to be uploaded by asynchronous threads.<br>If all threads are busy uploading then blocks are queued and if queue is full then main thread will start uploading which will block processing of incoming messages.|
|systems.**_system-name_**.azureblob.flushTimeoutMs|180000 (3 mins)|timeout to finish uploading all blocks before committing a blob.|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public class AzureBlobConfig extends MapConfig {
public static final String SYSTEM_MAX_FLUSH_THRESHOLD_SIZE = SYSTEM_AZUREBLOB_PREFIX + "maxFlushThresholdSize";
private static final int SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT = 10485760;

// initialization size of in-memory OutputStream
// This value should be between SYSTEM_INIT_BUFFER_SIZE_DEFAULT and getMaxFlushThresholdSize() exclusive.
public static final String SYSTEM_INIT_BUFFER_SIZE = SYSTEM_AZUREBLOB_PREFIX + "initBufferSize.bytes";
ehoner marked this conversation as resolved.
Show resolved Hide resolved
// re-use size for parameterless constructor java.io.ByteArrayOutputStream()
public static final int SYSTEM_INIT_BUFFER_SIZE_DEFAULT = 32;
mynameborat marked this conversation as resolved.
Show resolved Hide resolved

// maximum size of uncompressed blob in bytes
public static final String SYSTEM_MAX_BLOB_SIZE = SYSTEM_AZUREBLOB_PREFIX + "maxBlobSize";
private static final long SYSTEM_MAX_BLOB_SIZE_DEFAULT = Long.MAX_VALUE; // unlimited
Expand Down Expand Up @@ -170,6 +176,12 @@ public int getMaxFlushThresholdSize(String systemName) {
return getInt(String.format(SYSTEM_MAX_FLUSH_THRESHOLD_SIZE, systemName), SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT);
}

// return larger of config value or DEFAULT and smaller of MaxFlushThresholdSize
public int getInitBufferSizeBytes(String systemName) {
int init = getInt(String.format(SYSTEM_INIT_BUFFER_SIZE, systemName), SYSTEM_INIT_BUFFER_SIZE_DEFAULT);
return Math.min(Math.max(init, SYSTEM_INIT_BUFFER_SIZE_DEFAULT), getMaxFlushThresholdSize(systemName));
}

public int getAzureBlobThreadPoolCount(String systemName) {
return getInt(String.format(SYSTEM_THREAD_POOL_COUNT, systemName), SYSTEM_THREAD_POOL_COUNT_DEFAULT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.samza.system.azureblob.AzureBlobConfig;
import org.apache.samza.system.azureblob.compression.Compression;
import org.apache.samza.system.azureblob.producer.AzureBlobWriter;
import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
Expand Down Expand Up @@ -108,19 +109,32 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
private final String blobURLPrefix;
private final long maxBlobSize;
private final long maxRecordsPerBlob;
private final int initBufferSize;
private final boolean useRandomStringInBlobName;
private final Object currentDataFileWriterLock = new Object();
private volatile long recordsInCurrentBlob = 0;
private BlobMetadataGeneratorFactory blobMetadataGeneratorFactory;
private Config blobMetadataGeneratorConfig;
private String streamName;

@Deprecated
ehoner marked this conversation as resolved.
Show resolved Hide resolved
public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String blobURLPrefix,
Executor blobThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxRecordsPerBlob) {

this(containerAsyncClient, blobURLPrefix, blobThreadPool, metrics, blobMetadataGeneratorFactory,
blobMetadataGeneratorConfig, streamName, maxBlockFlushThresholdSize, flushTimeoutMs, compression,
useRandomStringInBlobName, maxBlobSize, maxRecordsPerBlob, AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT);
}

public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String blobURLPrefix,
Executor blobThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxRecordsPerBlob, int initBufferSize) {

this.blobThreadPool = blobThreadPool;
this.metrics = metrics;
this.maxBlockFlushThresholdSize = maxBlockFlushThresholdSize;
Expand All @@ -134,6 +148,7 @@ public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String
this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
this.streamName = streamName;
this.initBufferSize = initBufferSize;
}

/**
Expand Down Expand Up @@ -232,7 +247,9 @@ public void close() {
DataFileWriter<IndexedRecord> dataFileWriter,
AzureBlobOutputStream azureBlobOutputStream, BlockBlobAsyncClient blockBlobAsyncClient,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
long maxBlobSize, long maxRecordsPerBlob, Compression compression, boolean useRandomStringInBlobName) {
long maxBlobSize, long maxRecordsPerBlob, Compression compression, boolean useRandomStringInBlobName,
int initBufferSize) {

if (dataFileWriter == null || azureBlobOutputStream == null || blockBlobAsyncClient == null) {
this.currentBlobWriterComponents = null;
} else {
Expand All @@ -253,6 +270,7 @@ public void close() {
this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
this.streamName = streamName;
this.initBufferSize = initBufferSize;
}

@VisibleForTesting
Expand Down Expand Up @@ -339,7 +357,7 @@ private void startNextBlob(Optional<IndexedRecord> optionalIndexedRecord) throws
try {
azureBlobOutputStream = new AzureBlobOutputStream(blockBlobAsyncClient, blobThreadPool, metrics,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig,
streamName, flushTimeoutMs, maxBlockFlushThresholdSize, compression);
streamName, flushTimeoutMs, maxBlockFlushThresholdSize, compression, initBufferSize);
} catch (Exception e) {
throw new SamzaException("Unable to create AzureBlobOutputStream", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ public class AzureBlobAvroWriterFactory implements AzureBlobWriterFactory {
/**
* {@inheritDoc}
*/
@Override
public AzureBlobWriter getWriterInstance(BlobContainerAsyncClient containerAsyncClient, String blobURL,
Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxMessagesPerBlob) throws IOException {
long maxBlobSize, long maxMessagesPerBlob, int initBufferSize) throws IOException {
return new AzureBlobAvroWriter(containerAsyncClient, blobURL, blobUploadThreadPool, metrics,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName, maxBlockFlushThresholdSize, flushTimeoutMs,
compression, useRandomStringInBlobName, maxBlobSize, maxMessagesPerBlob);
compression, useRandomStringInBlobName, maxBlobSize, maxMessagesPerBlob, initBufferSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@

/**
* This class extends {@link java.io.OutputStream} and uses {@link java.io.ByteArrayOutputStream}
* for caching the write calls till upload is not called.
* for caching the write calls till upload is not called. The initialization size of the
* underlying {@link java.io.ByteArrayOutputStream} can be set by the caller or from config.
*
* It asynchronously uploads the blocks and waits on them to finish at close.
* The blob is persisted at close.
Expand Down Expand Up @@ -105,13 +106,13 @@ public class AzureBlobOutputStream extends OutputStream {
* @param flushTimeoutMs timeout for uploading a block
* @param maxBlockFlushThresholdSize max block size
* @param compression type of compression to be used before uploading a block
* @param initBufferSize initial size of {@link ByteArrayOutputStream}
*/
public AzureBlobOutputStream(BlockBlobAsyncClient blobAsyncClient, Executor blobThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
long flushTimeoutMs, int maxBlockFlushThresholdSize, Compression compression) {
long flushTimeoutMs, int maxBlockFlushThresholdSize, Compression compression, int initBufferSize) {
this(blobAsyncClient, blobThreadPool, metrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName,
flushTimeoutMs, maxBlockFlushThresholdSize,
new ByteArrayOutputStream(maxBlockFlushThresholdSize), compression);
flushTimeoutMs, maxBlockFlushThresholdSize, new ByteArrayOutputStream(initBufferSize), compression);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,8 @@ AzureBlobWriter createNewWriter(String blobURL, AzureBlobWriterMetrics writerMet
CompressionFactory.getInstance().getCompression(config.getCompressionType(systemName)),
config.getSuffixRandomStringToBlobName(systemName),
config.getMaxBlobSize(systemName),
config.getMaxMessagesPerBlob(systemName));
config.getMaxMessagesPerBlob(systemName),
config.getInitBufferSizeBytes(systemName));
} catch (Exception e) {
throw new RuntimeException("Failed to create a writer for the producer.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ public interface AzureBlobWriterFactory {
* @param streamName name of the stream that this AzureBlobWriter is associated with
* @param maxBlockFlushThresholdSize threshold at which to upload
* @param flushTimeoutMs timeout after which the flush is abandoned
* @param initBufferSize initial size of in-memory buffer(s)
* @return AzureBlobWriter instance
* @throws IOException if writer creation fails
*/
AzureBlobWriter getWriterInstance(BlobContainerAsyncClient containerAsyncClient, String blobURL,
Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxMessagesPerBlob) throws IOException;
long maxBlobSize, long maxMessagesPerBlob, int initBufferSize) throws IOException;
}
Loading