Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2778: Make AzureBlobOutputStream buffer initialization size configurable. #1662

Merged
merged 8 commits into from
Jun 17, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public class AzureBlobConfig extends MapConfig {
public static final String SYSTEM_MAX_FLUSH_THRESHOLD_SIZE = SYSTEM_AZUREBLOB_PREFIX + "maxFlushThresholdSize";
private static final int SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT = 10485760;

// initialization size of in-memory OutputStream
// This value should be between SYSTEM_INIT_BUFFER_SIZE_DEFAULT and getMaxFlushThresholdSize() exclusive.
public static final String SYSTEM_INIT_BUFFER_SIZE = SYSTEM_AZUREBLOB_PREFIX + "initBufferSize.bytes";
ehoner marked this conversation as resolved.
Show resolved Hide resolved
// re-use size for parameterless constructor java.io.ByteArrayOutputStream()
public static final int SYSTEM_INIT_BUFFER_SIZE_DEFAULT = 32;
mynameborat marked this conversation as resolved.
Show resolved Hide resolved

// maximum size of uncompressed blob in bytes
public static final String SYSTEM_MAX_BLOB_SIZE = SYSTEM_AZUREBLOB_PREFIX + "maxBlobSize";
private static final long SYSTEM_MAX_BLOB_SIZE_DEFAULT = Long.MAX_VALUE; // unlimited
Expand Down Expand Up @@ -170,6 +176,12 @@ public int getMaxFlushThresholdSize(String systemName) {
return getInt(String.format(SYSTEM_MAX_FLUSH_THRESHOLD_SIZE, systemName), SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT);
}

// return larger of config value or DEFAULT and smaller of MaxFlushThresholdSize
public int getInitBufferSizeBytes(String systemName) {
int init = getInt(String.format(SYSTEM_INIT_BUFFER_SIZE, systemName), SYSTEM_INIT_BUFFER_SIZE_DEFAULT);
return Math.min(Math.max(init, SYSTEM_INIT_BUFFER_SIZE_DEFAULT), getMaxFlushThresholdSize(systemName));
}

public int getAzureBlobThreadPoolCount(String systemName) {
return getInt(String.format(SYSTEM_THREAD_POOL_COUNT, systemName), SYSTEM_THREAD_POOL_COUNT_DEFAULT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.samza.system.azureblob.AzureBlobConfig;
import org.apache.samza.system.azureblob.compression.Compression;
import org.apache.samza.system.azureblob.producer.AzureBlobWriter;
import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
Expand Down Expand Up @@ -108,19 +109,32 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
private final String blobURLPrefix;
private final long maxBlobSize;
private final long maxRecordsPerBlob;
private final int initBufferSize;
private final boolean useRandomStringInBlobName;
private final Object currentDataFileWriterLock = new Object();
private volatile long recordsInCurrentBlob = 0;
private BlobMetadataGeneratorFactory blobMetadataGeneratorFactory;
private Config blobMetadataGeneratorConfig;
private String streamName;

@Deprecated
ehoner marked this conversation as resolved.
Show resolved Hide resolved
public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String blobURLPrefix,
Executor blobThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxRecordsPerBlob) {

this(containerAsyncClient, blobURLPrefix, blobThreadPool, metrics, blobMetadataGeneratorFactory,
blobMetadataGeneratorConfig, streamName, maxBlockFlushThresholdSize, flushTimeoutMs, compression,
useRandomStringInBlobName, maxBlobSize, maxRecordsPerBlob, AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT);
}

public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String blobURLPrefix,
Executor blobThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxRecordsPerBlob, int initBufferSize) {

this.blobThreadPool = blobThreadPool;
this.metrics = metrics;
this.maxBlockFlushThresholdSize = maxBlockFlushThresholdSize;
Expand All @@ -134,6 +148,7 @@ public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String
this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
this.streamName = streamName;
this.initBufferSize = initBufferSize;
}

/**
Expand Down Expand Up @@ -232,7 +247,9 @@ public void close() {
DataFileWriter<IndexedRecord> dataFileWriter,
AzureBlobOutputStream azureBlobOutputStream, BlockBlobAsyncClient blockBlobAsyncClient,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
long maxBlobSize, long maxRecordsPerBlob, Compression compression, boolean useRandomStringInBlobName) {
long maxBlobSize, long maxRecordsPerBlob, Compression compression, boolean useRandomStringInBlobName,
int initBufferSize) {

if (dataFileWriter == null || azureBlobOutputStream == null || blockBlobAsyncClient == null) {
this.currentBlobWriterComponents = null;
} else {
Expand All @@ -253,6 +270,7 @@ public void close() {
this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
this.streamName = streamName;
this.initBufferSize = initBufferSize;
}

@VisibleForTesting
Expand Down Expand Up @@ -339,7 +357,7 @@ private void startNextBlob(Optional<IndexedRecord> optionalIndexedRecord) throws
try {
azureBlobOutputStream = new AzureBlobOutputStream(blockBlobAsyncClient, blobThreadPool, metrics,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig,
streamName, flushTimeoutMs, maxBlockFlushThresholdSize, compression);
streamName, flushTimeoutMs, maxBlockFlushThresholdSize, compression, initBufferSize);
} catch (Exception e) {
throw new SamzaException("Unable to create AzureBlobOutputStream", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.samza.system.azureblob.avro;

import org.apache.samza.system.azureblob.AzureBlobConfig;
import org.apache.samza.system.azureblob.compression.Compression;
import org.apache.samza.system.azureblob.producer.AzureBlobWriter;
import org.apache.samza.system.azureblob.producer.AzureBlobWriterFactory;
Expand All @@ -35,13 +36,29 @@ public class AzureBlobAvroWriterFactory implements AzureBlobWriterFactory {
/**
* {@inheritDoc}
*/
@Deprecated
ehoner marked this conversation as resolved.
Show resolved Hide resolved
@Override
public AzureBlobWriter getWriterInstance(BlobContainerAsyncClient containerAsyncClient, String blobURL,
Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxMessagesPerBlob) throws IOException {
return getWriterInstance(containerAsyncClient, blobURL, blobUploadThreadPool, metrics,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName, maxBlockFlushThresholdSize, flushTimeoutMs,
compression, useRandomStringInBlobName, maxBlobSize, maxMessagesPerBlob, AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT);
}

/**
* {@inheritDoc}
*/
@Override
public AzureBlobWriter getWriterInstance(BlobContainerAsyncClient containerAsyncClient, String blobURL,
Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxMessagesPerBlob, int initBufferSize) throws IOException {
return new AzureBlobAvroWriter(containerAsyncClient, blobURL, blobUploadThreadPool, metrics,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName, maxBlockFlushThresholdSize, flushTimeoutMs,
compression, useRandomStringInBlobName, maxBlobSize, maxMessagesPerBlob);
compression, useRandomStringInBlobName, maxBlobSize, maxMessagesPerBlob, initBufferSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import org.apache.samza.AzureException;
import org.apache.samza.system.azureblob.AzureBlobConfig;
import org.apache.samza.system.azureblob.compression.Compression;
import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -51,7 +52,8 @@

/**
* This class extends {@link java.io.OutputStream} and uses {@link java.io.ByteArrayOutputStream}
* for caching the write calls till upload is not called.
* for caching the write calls till upload is not called. The initialization size of the
* underlying {@link java.io.ByteArrayOutputStream} can be set by the caller or from config.
*
* It asynchronously uploads the blocks and waits on them to finish at close.
* The blob is persisted at close.
Expand Down Expand Up @@ -106,12 +108,33 @@ public class AzureBlobOutputStream extends OutputStream {
* @param maxBlockFlushThresholdSize max block size
* @param compression type of compression to be used before uploading a block
*/
@Deprecated
public AzureBlobOutputStream(BlockBlobAsyncClient blobAsyncClient, Executor blobThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
long flushTimeoutMs, int maxBlockFlushThresholdSize, Compression compression) {
this(blobAsyncClient, blobThreadPool, metrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName,
flushTimeoutMs, maxBlockFlushThresholdSize,
new ByteArrayOutputStream(maxBlockFlushThresholdSize), compression);
new ByteArrayOutputStream(AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT), compression);
}

/**
*
* @param blobAsyncClient Client to communicate with Azure Blob Storage.
* @param blobThreadPool threads to be used for uploading blocks to Azure Blob Storage.
* @param metrics needed for emitting metrics about bytes written, blocks uploaded, blobs committed.
* @param blobMetadataGeneratorFactory impl of {@link org.apache.samza.system.azureblob.utils.BlobMetadataGeneratorFactory}
* to be used for generating metadata properties for a blob
* @param streamName name of the stream to which the blob generated corresponds to. Used in metadata properties.
* @param flushTimeoutMs timeout for uploading a block
* @param maxBlockFlushThresholdSize max block size
* @param compression type of compression to be used before uploading a block
* @param initBufferSize initial size of {@link ByteArrayOutputStream}
*/
public AzureBlobOutputStream(BlockBlobAsyncClient blobAsyncClient, Executor blobThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
long flushTimeoutMs, int maxBlockFlushThresholdSize, Compression compression, int initBufferSize) {
this(blobAsyncClient, blobThreadPool, metrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName,
flushTimeoutMs, maxBlockFlushThresholdSize, new ByteArrayOutputStream(initBufferSize), compression);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,8 @@ AzureBlobWriter createNewWriter(String blobURL, AzureBlobWriterMetrics writerMet
CompressionFactory.getInstance().getCompression(config.getCompressionType(systemName)),
config.getSuffixRandomStringToBlobName(systemName),
config.getMaxBlobSize(systemName),
config.getMaxMessagesPerBlob(systemName));
config.getMaxMessagesPerBlob(systemName),
config.getInitBufferSizeBytes(systemName));
} catch (Exception e) {
throw new RuntimeException("Failed to create a writer for the producer.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,30 @@ public interface AzureBlobWriterFactory {
* @return AzureBlobWriter instance
* @throws IOException if writer creation fails
*/
@Deprecated
AzureBlobWriter getWriterInstance(BlobContainerAsyncClient containerAsyncClient, String blobURL,
Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxMessagesPerBlob) throws IOException;

/**
* creates an instance of AzureBlobWriter.
* @param containerAsyncClient Azure container client
* @param blobURL Azure blob url
* @param blobUploadThreadPool thread pool to be used by writer for uploading
* @param metrics metrics to measure the number of bytes written by writer
* @param blobMetadataGeneratorFactory factory to get generator for metadata properties for a blob
* @param streamName name of the stream that this AzureBlobWriter is associated with
* @param maxBlockFlushThresholdSize threshold at which to upload
* @param flushTimeoutMs timeout after which the flush is abandoned
* @param initBufferSize initial size of in-memory buffer(s)
* @return AzureBlobWriter instance
* @throws IOException if writer creation fails
*/
AzureBlobWriter getWriterInstance(BlobContainerAsyncClient containerAsyncClient, String blobURL,
Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxMessagesPerBlob, int initBufferSize) throws IOException;
}
Loading