Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2778: Make AzureBlobOutputStream buffer initialization size configurable. #1662

Merged
merged 8 commits into from
Jun 17, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public class AzureBlobConfig extends MapConfig {
public static final String SYSTEM_MAX_FLUSH_THRESHOLD_SIZE = SYSTEM_AZUREBLOB_PREFIX + "maxFlushThresholdSize";
private static final int SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT = 10485760;

// initialization size of in-memory OutputStream
// This value should be between SYSTEM_INIT_BUFFER_SIZE_DEFAULT and getMaxFlushThresholdSize() exclusive.
public static final String SYSTEM_INIT_BUFFER_SIZE = SYSTEM_AZUREBLOB_PREFIX + "initBufferSize.bytes";
ehoner marked this conversation as resolved.
Show resolved Hide resolved
// re-use size for parameterless constructor java.io.ByteArrayOutputStream()
public static final int SYSTEM_INIT_BUFFER_SIZE_DEFAULT = 32;
mynameborat marked this conversation as resolved.
Show resolved Hide resolved

// maximum size of uncompressed blob in bytes
public static final String SYSTEM_MAX_BLOB_SIZE = SYSTEM_AZUREBLOB_PREFIX + "maxBlobSize";
private static final long SYSTEM_MAX_BLOB_SIZE_DEFAULT = Long.MAX_VALUE; // unlimited
Expand Down Expand Up @@ -170,6 +176,12 @@ public int getMaxFlushThresholdSize(String systemName) {
return getInt(String.format(SYSTEM_MAX_FLUSH_THRESHOLD_SIZE, systemName), SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT);
}

// return larger of config value or DEFAULT and smaller of MaxFlushThresholdSize
public int getInitBufferSizeBytes(String systemName) {
int init = getInt(String.format(SYSTEM_INIT_BUFFER_SIZE, systemName), SYSTEM_INIT_BUFFER_SIZE_DEFAULT);
return Math.min(Math.max(init, SYSTEM_INIT_BUFFER_SIZE_DEFAULT), getMaxFlushThresholdSize(systemName));
}

public int getAzureBlobThreadPoolCount(String systemName) {
return getInt(String.format(SYSTEM_THREAD_POOL_COUNT, systemName), SYSTEM_THREAD_POOL_COUNT_DEFAULT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.samza.system.azureblob.AzureBlobConfig;
import org.apache.samza.system.azureblob.compression.Compression;
import org.apache.samza.system.azureblob.producer.AzureBlobWriter;
import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
Expand Down Expand Up @@ -108,19 +109,32 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
private final String blobURLPrefix;
private final long maxBlobSize;
private final long maxRecordsPerBlob;
private final int initBufferSize;
private final boolean useRandomStringInBlobName;
private final Object currentDataFileWriterLock = new Object();
private volatile long recordsInCurrentBlob = 0;
private BlobMetadataGeneratorFactory blobMetadataGeneratorFactory;
private Config blobMetadataGeneratorConfig;
private String streamName;

@Deprecated
ehoner marked this conversation as resolved.
Show resolved Hide resolved
public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String blobURLPrefix,
Executor blobThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxRecordsPerBlob) {

this(containerAsyncClient, blobURLPrefix, blobThreadPool, metrics, blobMetadataGeneratorFactory,
blobMetadataGeneratorConfig, streamName, maxBlockFlushThresholdSize, flushTimeoutMs, compression,
useRandomStringInBlobName, maxBlobSize, maxRecordsPerBlob, AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT);
}

public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String blobURLPrefix,
Executor blobThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxRecordsPerBlob, int initBufferSize) {

this.blobThreadPool = blobThreadPool;
this.metrics = metrics;
this.maxBlockFlushThresholdSize = maxBlockFlushThresholdSize;
Expand All @@ -134,6 +148,7 @@ public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String
this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
this.streamName = streamName;
this.initBufferSize = initBufferSize;
}

/**
Expand Down Expand Up @@ -232,7 +247,9 @@ public void close() {
DataFileWriter<IndexedRecord> dataFileWriter,
AzureBlobOutputStream azureBlobOutputStream, BlockBlobAsyncClient blockBlobAsyncClient,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
long maxBlobSize, long maxRecordsPerBlob, Compression compression, boolean useRandomStringInBlobName) {
long maxBlobSize, long maxRecordsPerBlob, Compression compression, boolean useRandomStringInBlobName,
int initBufferSize) {

if (dataFileWriter == null || azureBlobOutputStream == null || blockBlobAsyncClient == null) {
this.currentBlobWriterComponents = null;
} else {
Expand All @@ -253,6 +270,7 @@ public void close() {
this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
this.streamName = streamName;
this.initBufferSize = initBufferSize;
}

@VisibleForTesting
Expand Down Expand Up @@ -339,7 +357,7 @@ private void startNextBlob(Optional<IndexedRecord> optionalIndexedRecord) throws
try {
azureBlobOutputStream = new AzureBlobOutputStream(blockBlobAsyncClient, blobThreadPool, metrics,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig,
streamName, flushTimeoutMs, maxBlockFlushThresholdSize, compression);
streamName, flushTimeoutMs, maxBlockFlushThresholdSize, compression, initBufferSize);
} catch (Exception e) {
throw new SamzaException("Unable to create AzureBlobOutputStream", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ public class AzureBlobAvroWriterFactory implements AzureBlobWriterFactory {
/**
* {@inheritDoc}
*/
@Override
public AzureBlobWriter getWriterInstance(BlobContainerAsyncClient containerAsyncClient, String blobURL,
Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxMessagesPerBlob) throws IOException {
long maxBlobSize, long maxMessagesPerBlob, int initBufferSize) throws IOException {
return new AzureBlobAvroWriter(containerAsyncClient, blobURL, blobUploadThreadPool, metrics,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName, maxBlockFlushThresholdSize, flushTimeoutMs,
compression, useRandomStringInBlobName, maxBlobSize, maxMessagesPerBlob);
compression, useRandomStringInBlobName, maxBlobSize, maxMessagesPerBlob, initBufferSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@

/**
* This class extends {@link java.io.OutputStream} and uses {@link java.io.ByteArrayOutputStream}
* for caching the write calls till upload is not called.
* for caching the write calls till upload is not called. The initialization size of the
* underlying {@link java.io.ByteArrayOutputStream} can be set by the caller or from config.
*
* It asynchronously uploads the blocks and waits on them to finish at close.
* The blob is persisted at close.
Expand Down Expand Up @@ -105,13 +106,13 @@ public class AzureBlobOutputStream extends OutputStream {
* @param flushTimeoutMs timeout for uploading a block
* @param maxBlockFlushThresholdSize max block size
* @param compression type of compression to be used before uploading a block
* @param initBufferSize initial size of {@link ByteArrayOutputStream}
*/
public AzureBlobOutputStream(BlockBlobAsyncClient blobAsyncClient, Executor blobThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
long flushTimeoutMs, int maxBlockFlushThresholdSize, Compression compression) {
long flushTimeoutMs, int maxBlockFlushThresholdSize, Compression compression, int initBufferSize) {
this(blobAsyncClient, blobThreadPool, metrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName,
flushTimeoutMs, maxBlockFlushThresholdSize,
new ByteArrayOutputStream(maxBlockFlushThresholdSize), compression);
flushTimeoutMs, maxBlockFlushThresholdSize, new ByteArrayOutputStream(initBufferSize), compression);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,8 @@ AzureBlobWriter createNewWriter(String blobURL, AzureBlobWriterMetrics writerMet
CompressionFactory.getInstance().getCompression(config.getCompressionType(systemName)),
config.getSuffixRandomStringToBlobName(systemName),
config.getMaxBlobSize(systemName),
config.getMaxMessagesPerBlob(systemName));
config.getMaxMessagesPerBlob(systemName),
config.getInitBufferSizeBytes(systemName));
} catch (Exception e) {
throw new RuntimeException("Failed to create a writer for the producer.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ public interface AzureBlobWriterFactory {
* @param streamName name of the stream that this AzureBlobWriter is associated with
* @param maxBlockFlushThresholdSize threshold at which to upload
* @param flushTimeoutMs timeout after which the flush is abandoned
* @param initBufferSize initial size of in-memory buffer(s)
* @return AzureBlobWriter instance
* @throws IOException if writer creation fails
*/
AzureBlobWriter getWriterInstance(BlobContainerAsyncClient containerAsyncClient, String blobURL,
Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxMessagesPerBlob) throws IOException;
long maxBlobSize, long maxMessagesPerBlob, int initBufferSize) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.azure.storage.blob.BlobAsyncClient;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import org.apache.samza.system.azureblob.AzureBlobConfig;
import org.apache.samza.system.azureblob.compression.Compression;
import org.apache.samza.system.azureblob.compression.CompressionFactory;
import org.apache.samza.system.azureblob.compression.CompressionType;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class TestAzureBlobAvroWriter {
private static final String VALUE = "FAKE_VALUE";
private static final String SYSTEM_NAME = "FAKE_SYSTEM";
private static final int THRESHOLD = 100;
private static final int INIT_SIZE = AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT;

private class SpecificRecordEvent extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
Expand Down Expand Up @@ -154,7 +156,7 @@ public void setup() throws Exception {
spy(new AzureBlobAvroWriter(mockContainerAsyncClient, mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
60000, "test", mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false)); // keeping blob size and number of records unlimited
Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false, INIT_SIZE)); // keeping blob size and number of records unlimited
doReturn(encodedRecord).when(azureBlobAvroWriter).encodeRecord((IndexedRecord) ome.getMessage());
}
@Test
Expand Down Expand Up @@ -198,7 +200,7 @@ public void testWriteByteArrayWithoutSchema() throws Exception {
spy(new AzureBlobAvroWriter(PowerMockito.mock(BlobContainerAsyncClient.class), mock(AzureBlobWriterMetrics.class),
threadPool, THRESHOLD, 60000, "test",
null, null, null, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
1000, 100, mockCompression, false));
1000, 100, mockCompression, false, INIT_SIZE));
OutgoingMessageEnvelope omeEncoded = new OutgoingMessageEnvelope(new SystemStream(SYSTEM_NAME, "Topic1"), new byte[100]);
azureBlobAvroWriter.write(omeEncoded);
}
Expand Down Expand Up @@ -251,7 +253,7 @@ public void testNPEinFlush() throws Exception {
spy(new AzureBlobAvroWriter(PowerMockito.mock(BlobContainerAsyncClient.class), mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
60000, "test", null, null, null,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false)); // keeping blob size and number of records unlimited
Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false, INIT_SIZE)); // keeping blob size and number of records unlimited
when(azureBlobAvroWriter.encodeRecord((IndexedRecord) ome.getMessage())).thenThrow(IllegalStateException.class);
azureBlobAvroWriter.flush(); // No NPE because has null check for currentBlobWriterComponents
}
Expand All @@ -266,7 +268,7 @@ public void testMaxBlobSizeExceeded() throws Exception {
azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient,
mockMetrics, threadPool, THRESHOLD, 60000, blobUrlPrefix,
null, null, null, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
maxBlobSize, 10, mockCompression, true));
maxBlobSize, 10, mockCompression, true, INIT_SIZE));

DataFileWriter mockDataFileWriter1 = mock(DataFileWriter.class);
PowerMockito.whenNew(DataFileWriter.class).withAnyArguments().thenReturn(mockDataFileWriter1);
Expand All @@ -279,7 +281,7 @@ public void testMaxBlobSizeExceeded() throws Exception {
AzureBlobOutputStream mockAzureBlobOutputStream1 = mock(AzureBlobOutputStream.class);
PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient1, threadPool,
mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
(long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream1);
(long) 60000, THRESHOLD, mockCompression, INIT_SIZE).thenReturn(mockAzureBlobOutputStream1);
when(mockAzureBlobOutputStream1.getSize()).thenReturn((long) maxBlobSize - 1);

// first OME creates the first blob
Expand All @@ -297,7 +299,7 @@ public void testMaxBlobSizeExceeded() throws Exception {
AzureBlobOutputStream mockAzureBlobOutputStream2 = mock(AzureBlobOutputStream.class);
PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient2, threadPool,
mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
(long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream2);
(long) 60000, THRESHOLD, mockCompression, INIT_SIZE).thenReturn(mockAzureBlobOutputStream2);
when(mockAzureBlobOutputStream2.getSize()).thenReturn((long) maxBlobSize - 1);

// Second OME creates the second blob because maxBlobSize is 1000 and mockAzureBlobOutputStream.getSize is 999.
Expand Down Expand Up @@ -329,7 +331,7 @@ public void testRecordLimitExceeded() throws Exception {
azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient,
mockMetrics, threadPool, THRESHOLD, 60000, blobUrlPrefix,
null, null, null, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
maxBlobSize, maxRecordsPerBlob, mockCompression, true));
maxBlobSize, maxRecordsPerBlob, mockCompression, true, INIT_SIZE));

DataFileWriter mockDataFileWriter1 = mock(DataFileWriter.class);
PowerMockito.whenNew(DataFileWriter.class).withAnyArguments().thenReturn(mockDataFileWriter1);
Expand All @@ -342,7 +344,7 @@ public void testRecordLimitExceeded() throws Exception {
AzureBlobOutputStream mockAzureBlobOutputStream1 = mock(AzureBlobOutputStream.class);
PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient1, threadPool,
mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
(long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream1);
(long) 60000, THRESHOLD, mockCompression, INIT_SIZE).thenReturn(mockAzureBlobOutputStream1);
when(mockAzureBlobOutputStream1.getSize()).thenReturn((long) 1);

// first OME creates the first blob and 11th OME (ome2) creates the second blob.
Expand All @@ -363,7 +365,7 @@ public void testRecordLimitExceeded() throws Exception {
AzureBlobOutputStream mockAzureBlobOutputStream2 = mock(AzureBlobOutputStream.class);
PowerMockito.whenNew(AzureBlobOutputStream.class).withArguments(mockBlockBlobAsyncClient2, threadPool,
mockMetrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
(long) 60000, THRESHOLD, mockCompression).thenReturn(mockAzureBlobOutputStream2);
(long) 60000, THRESHOLD, mockCompression, INIT_SIZE).thenReturn(mockAzureBlobOutputStream2);
when(mockAzureBlobOutputStream2.getSize()).thenReturn((long) 1);

azureBlobAvroWriter.write(ome2);
Expand Down Expand Up @@ -392,7 +394,7 @@ public void testMultipleBlobClose() throws Exception {
azureBlobAvroWriter = spy(new AzureBlobAvroWriter(mockContainerClient,
mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD, 60000, blobUrlPrefix,
mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
maxBlobSize, maxRecordsPerBlob, mockCompression, false));
maxBlobSize, maxRecordsPerBlob, mockCompression, false, INIT_SIZE));

DataFileWriter<IndexedRecord> mockDataFileWriter2 = mock(DataFileWriter.class);
AzureBlobOutputStream mockAzureBlobOutputStream2 = mock(AzureBlobOutputStream.class);
Expand All @@ -419,7 +421,7 @@ public void testEncodeRecord() throws Exception {
mock(AzureBlobWriterMetrics.class), threadPool, THRESHOLD,
60000, "test", mockDataFileWriter, mockAzureBlobOutputStream, mockBlockBlobAsyncClient,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, STREAM_NAME,
Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false));
Long.MAX_VALUE, Long.MAX_VALUE, mockCompression, false, INIT_SIZE));
IndexedRecord record = new GenericRecordEvent();
Assert.assertTrue(Arrays.equals(encodeRecord(record), azureBlobAvroWriter.encodeRecord(record)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.samza.AzureException;
import org.apache.samza.system.azureblob.AzureBlobConfig;
import org.apache.samza.system.azureblob.compression.Compression;
import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
Expand Down Expand Up @@ -75,6 +76,7 @@ public class TestAzureBlobOutputStream {
private ThreadPoolExecutor threadPool;
private ByteArrayOutputStream mockByteArrayOutputStream;
private static final int THRESHOLD = 100;
private static final int INIT_SIZE = AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT;
private BlockBlobAsyncClient mockBlobAsyncClient;
private AzureBlobOutputStream azureBlobOutputStream;
private static final String RANDOM_STRING = "roZzozzLiR7GCEjcB0UsRUNgBAip8cSLGXQSo3RQvbIDoxOaaRs4hrec2s5rMPWgTPRY4UnE959worEtyhRjwUFnRnVuNFZ554yuPQCbI69qFkQX7MmrB4blmpSnFeGjWKjFjIRLFNVSsQBYMkr5jT4T83uVtuGumsjACVrpcilihdd194H8Y71rQcrXZoTQtw5OvmPicbwptawpHoRNzHihyaDVYgAs0dQbvVEu1gitKpamzYdMLFtc5h8PFZSVEB";
Expand All @@ -98,7 +100,7 @@ public void setup() throws Exception {
TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());


mockByteArrayOutputStream = spy(new ByteArrayOutputStream(THRESHOLD));
mockByteArrayOutputStream = spy(new ByteArrayOutputStream(INIT_SIZE));

mockBlobAsyncClient = PowerMockito.mock(BlockBlobAsyncClient.class);

Expand Down