Skip to content

Commit

Permalink
SAMZA-2778: Make AzureBlobOutputStream buffer initialization size con…
Browse files Browse the repository at this point in the history
…figurable. (#1662)

Symptom:
JVM crashes due to OOM exceptions. When the system has a large number of AzureBlobWriterObjects, the memory becomes heavily fragmented and can be susceptible to crashing when a new AzureBlobOutputStream is created.

Cause:
The crashes are caused by inefficient memory management when using the G1GC. (The default garbage collector in Java 11+.) What causes (code paths) this issue and why this is a problem for the G1GC are examined separately.

What causes this issue?
The underlying issue is caused by the AzureBlobOutputStream. When a new instance is created, it creates a new ByteArrayOutputStream initialized to maxFlushThresholdSize.12 The ByteArrayOutputStream is the buffer used by the parent to accumulate messages between flush intervals. It requires 10MB (current default) of memory to initialize. This allows the buffer to accumulate messages without resize operations, however, it does not prevent resizing. The maxFlushThresholdSize is enforced in the parent during write() - see AzureBlobAvroWriter.

Why this is a problem for the G1GC?
The focus here is on the G1GC and humongous objects (G1 specific).3 The G1 GC introduced a new memory management strategy that divides the heap into regions, -XX:G1HeapRegionSize=n. The GC can operate on regions concurrently and copies live objects between regions during full GC to reclaim/empty regions.4 The default behavior creates ~2048 regions, sized to a factor of 2 between 1MB and 32MB. Any object larger than half of a region size, is considered a humongous object.
Humongous objects are allocated an entire region (or consecutive regions if larger than a single region) and are not copied between regions during full GC. Any space not used by the object within a region is non-addressable for the life of the object.5 A JVM heap size of 31GB, -Xmx31G, will default to 16MB regions. Considering the current default size is 10MB, each buffer requires an entire region and prevents the use of 6MB, regardless of the how much data is in the buffer. For a heap smaller than 16GB, each buffer would require multiple regions.
The 10MB buffer size can exhaust the regions and cause OOMs or create fragmentation that causes OOMs. A fragmentation caused OOM occurs in the following sequence. On new, the JVM attempts to create the object in Eden. If there is insufficient space in Eden a minor GC is performed. If there is insufficient space after minor GC, the object is immediately promoted Old Gen. If there is insufficient space in Old Gen, a full GC is performed. If a full GC cannot allocate memory or region(s) for a humongous object the JVM will exit with OOM.

Changes:
The javadocs, where appropriate, have been updated to reflect changes or describe new behaviors. No public APIs were removed, they were marked deprecated and migrated to the new default initialization value. All of the changes are itemized below.

AzureBlobConfig
Adding two new public fields and one public method. The new configuration is made accessible in the same manner as existing configs (see #configs SEP-26), also consistent the coding-guide. There is a new public config key: SYSTEM_INIT_BUFFER_SIZE - named initBufferSize.bytes. The default value is public field SYSTEM_INIT_BUFFER_SIZE_DEFAULT. The user provided configuration value is accessible with new public method getInitBufferSizeBytes(..). The method returns the configuration value between 32 and getMaxFlushThresholdSize(..) inclusive. 32 is the default initialization size of a ByteArrayOutputStream in the parameterless constructor.

AzureBlobWriterFactory
There are two changes to this interface, both to the method, getWriterInstance(..). The existing implementation is marked @deprecated and a new method with an additional parameter is added. The new parameter is an int that is expected to be the _ initBufferSize_.

AzureBlobAvroWriterFactory
The modifications here are consistent with the changes to interface AzureBlobWriterFactory. However, the deprecated implementation uses the new field AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT when calling the new public API. This will migrate users to the new initialization behavior.

AzureBlobAvroWriter
There are two public, one package-private, and two private changes. Both public changes are to constructors. The existing public constructor is marked deprecated and invokes the new public constructor with the new field AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT. The new constructor sets the new private field initBufferSize. The package-private constructor is modified with an additional int parameter and the tests were changed accordingly. The remaining private change modifies creation of new AzureBlobOutputStream instances to include the additional private field initBufferSize.

AzureBlobOutputStream
The existing public API is marked @deprecated and the ByteArrayOutputStream is initialized with the new field AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT. The new public constructor includes the new parameter initBufferSize and initializes the ByteArrayOutputStream to that size.
  • Loading branch information
ehoner authored Jun 17, 2023
1 parent cc21b6e commit aa5db44
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ Configs for producing to [Azure Blob Storage](https://azure.microsoft.com/en-us/
|systems.**_system-name_**.azureblob.maxFlushThresholdSize|10485760 (10 MB)|max size of the uncompressed block to be uploaded in bytes. Maximum size allowed by Azure is 100MB.|
|systems.**_system-name_**.azureblob.maxBlobSize|Long.MAX_VALUE (unlimited)|max size of the uncompressed blob in bytes.<br>If default value then size is unlimited capped only by Azure BlockBlob size of 4.75 TB (100 MB per block X 50,000 blocks).|
|systems.**_system-name_**.azureblob.maxMessagesPerBlob|Long.MAX_VALUE (unlimited)|max number of messages per blob.|
|systems.**_system-name_**.azureblob.initBufferSize.bytes|32|The amount of memory pre-allocated for `org.apache.samza.system.azureblob.avro.AzureBlobOutputStream` buffers.<br>Values must be between 32 (default) and `maxBlobSize`.<br>This should be increased for fast-filling buffers when buffer resize operations affect performance. Large values can lead to inefficent memory allocation with the G1 garbage collector. If the size is >= half of a region size, `G1HeapRegionSize`, consider switching to the ParallelGC.|
|systems.**_system-name_**.azureblob.threadPoolCount|2|number of threads for the asynchronous uploading of blocks.|
|systems.**_system-name_**.azureblob.blockingQueueSize|Thread Pool Count * 2|size of the queue to hold blocks ready to be uploaded by asynchronous threads.<br>If all threads are busy uploading then blocks are queued and if queue is full then main thread will start uploading which will block processing of incoming messages.|
|systems.**_system-name_**.azureblob.flushTimeoutMs|180000 (3 mins)|timeout to finish uploading all blocks before committing a blob.|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public class AzureBlobConfig extends MapConfig {
public static final String SYSTEM_MAX_FLUSH_THRESHOLD_SIZE = SYSTEM_AZUREBLOB_PREFIX + "maxFlushThresholdSize";
private static final int SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT = 10485760;

// initialization size of in-memory OutputStream
// This value should be between SYSTEM_INIT_BUFFER_SIZE_DEFAULT and getMaxFlushThresholdSize() exclusive.
public static final String SYSTEM_INIT_BUFFER_SIZE = SYSTEM_AZUREBLOB_PREFIX + "initBufferSize.bytes";
// re-use size for parameterless constructor java.io.ByteArrayOutputStream()
public static final int SYSTEM_INIT_BUFFER_SIZE_DEFAULT = 32;

// maximum size of uncompressed blob in bytes
public static final String SYSTEM_MAX_BLOB_SIZE = SYSTEM_AZUREBLOB_PREFIX + "maxBlobSize";
private static final long SYSTEM_MAX_BLOB_SIZE_DEFAULT = Long.MAX_VALUE; // unlimited
Expand Down Expand Up @@ -170,6 +176,12 @@ public int getMaxFlushThresholdSize(String systemName) {
return getInt(String.format(SYSTEM_MAX_FLUSH_THRESHOLD_SIZE, systemName), SYSTEM_MAX_FLUSH_THRESHOLD_SIZE_DEFAULT);
}

// return larger of config value or DEFAULT and smaller of MaxFlushThresholdSize
public int getInitBufferSizeBytes(String systemName) {
int init = getInt(String.format(SYSTEM_INIT_BUFFER_SIZE, systemName), SYSTEM_INIT_BUFFER_SIZE_DEFAULT);
return Math.min(Math.max(init, SYSTEM_INIT_BUFFER_SIZE_DEFAULT), getMaxFlushThresholdSize(systemName));
}

public int getAzureBlobThreadPoolCount(String systemName) {
return getInt(String.format(SYSTEM_THREAD_POOL_COUNT, systemName), SYSTEM_THREAD_POOL_COUNT_DEFAULT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.samza.system.azureblob.AzureBlobConfig;
import org.apache.samza.system.azureblob.compression.Compression;
import org.apache.samza.system.azureblob.producer.AzureBlobWriter;
import org.apache.samza.system.azureblob.producer.AzureBlobWriterMetrics;
Expand Down Expand Up @@ -114,19 +115,32 @@ public class AzureBlobAvroWriter implements AzureBlobWriter {
private final String blobURLPrefix;
private final long maxBlobSize;
private final long maxRecordsPerBlob;
private final int initBufferSize;
private final boolean useRandomStringInBlobName;
private final Object currentDataFileWriterLock = new Object();
private volatile long recordsInCurrentBlob = 0;
private BlobMetadataGeneratorFactory blobMetadataGeneratorFactory;
private Config blobMetadataGeneratorConfig;
private String streamName;

@Deprecated
public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String blobURLPrefix,
Executor blobThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxRecordsPerBlob) {

this(containerAsyncClient, blobURLPrefix, blobThreadPool, metrics, blobMetadataGeneratorFactory,
blobMetadataGeneratorConfig, streamName, maxBlockFlushThresholdSize, flushTimeoutMs, compression,
useRandomStringInBlobName, maxBlobSize, maxRecordsPerBlob, AzureBlobConfig.SYSTEM_INIT_BUFFER_SIZE_DEFAULT);
}

public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String blobURLPrefix,
Executor blobThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxRecordsPerBlob, int initBufferSize) {

this.blobThreadPool = blobThreadPool;
this.metrics = metrics;
this.maxBlockFlushThresholdSize = maxBlockFlushThresholdSize;
Expand All @@ -140,6 +154,7 @@ public AzureBlobAvroWriter(BlobContainerAsyncClient containerAsyncClient, String
this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
this.streamName = streamName;
this.initBufferSize = initBufferSize;
}

/**
Expand Down Expand Up @@ -244,7 +259,9 @@ public void close() {
DataFileWriter<Object> dataFileWriter,
AzureBlobOutputStream azureBlobOutputStream, BlockBlobAsyncClient blockBlobAsyncClient,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
long maxBlobSize, long maxRecordsPerBlob, Compression compression, boolean useRandomStringInBlobName) {
long maxBlobSize, long maxRecordsPerBlob, Compression compression, boolean useRandomStringInBlobName,
int initBufferSize) {

if (dataFileWriter == null || azureBlobOutputStream == null || blockBlobAsyncClient == null) {
this.currentBlobWriterComponents = null;
} else {
Expand All @@ -265,6 +282,7 @@ public void close() {
this.blobMetadataGeneratorFactory = blobMetadataGeneratorFactory;
this.blobMetadataGeneratorConfig = blobMetadataGeneratorConfig;
this.streamName = streamName;
this.initBufferSize = initBufferSize;
}

@VisibleForTesting
Expand Down Expand Up @@ -351,7 +369,7 @@ private void startNextBlob(Optional<GenericContainer> optionalGenericContainer)
try {
azureBlobOutputStream = new AzureBlobOutputStream(blockBlobAsyncClient, blobThreadPool, metrics,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig,
streamName, flushTimeoutMs, maxBlockFlushThresholdSize, compression);
streamName, flushTimeoutMs, maxBlockFlushThresholdSize, compression, initBufferSize);
} catch (Exception e) {
throw new SamzaException("Unable to create AzureBlobOutputStream", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ public class AzureBlobAvroWriterFactory implements AzureBlobWriterFactory {
/**
* {@inheritDoc}
*/
@Override
public AzureBlobWriter getWriterInstance(BlobContainerAsyncClient containerAsyncClient, String blobURL,
Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxMessagesPerBlob) throws IOException {
long maxBlobSize, long maxMessagesPerBlob, int initBufferSize) throws IOException {
return new AzureBlobAvroWriter(containerAsyncClient, blobURL, blobUploadThreadPool, metrics,
blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName, maxBlockFlushThresholdSize, flushTimeoutMs,
compression, useRandomStringInBlobName, maxBlobSize, maxMessagesPerBlob);
compression, useRandomStringInBlobName, maxBlobSize, maxMessagesPerBlob, initBufferSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@

/**
* This class extends {@link java.io.OutputStream} and uses {@link java.io.ByteArrayOutputStream}
* for caching the write calls till upload is not called.
* for caching the write calls till upload is not called. The initialization size of the
* underlying {@link java.io.ByteArrayOutputStream} can be set by the caller or from config.
*
* It asynchronously uploads the blocks and waits on them to finish at close.
* The blob is persisted at close.
Expand Down Expand Up @@ -105,13 +106,13 @@ public class AzureBlobOutputStream extends OutputStream {
* @param flushTimeoutMs timeout for uploading a block
* @param maxBlockFlushThresholdSize max block size
* @param compression type of compression to be used before uploading a block
* @param initBufferSize initial size of {@link ByteArrayOutputStream}
*/
public AzureBlobOutputStream(BlockBlobAsyncClient blobAsyncClient, Executor blobThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
long flushTimeoutMs, int maxBlockFlushThresholdSize, Compression compression) {
long flushTimeoutMs, int maxBlockFlushThresholdSize, Compression compression, int initBufferSize) {
this(blobAsyncClient, blobThreadPool, metrics, blobMetadataGeneratorFactory, blobMetadataGeneratorConfig, streamName,
flushTimeoutMs, maxBlockFlushThresholdSize,
new ByteArrayOutputStream(maxBlockFlushThresholdSize), compression);
flushTimeoutMs, maxBlockFlushThresholdSize, new ByteArrayOutputStream(initBufferSize), compression);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,8 @@ AzureBlobWriter createNewWriter(String blobURL, AzureBlobWriterMetrics writerMet
CompressionFactory.getInstance().getCompression(config.getCompressionType(systemName)),
config.getSuffixRandomStringToBlobName(systemName),
config.getMaxBlobSize(systemName),
config.getMaxMessagesPerBlob(systemName));
config.getMaxMessagesPerBlob(systemName),
config.getInitBufferSizeBytes(systemName));
} catch (Exception e) {
throw new RuntimeException("Failed to create a writer for the producer.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ public interface AzureBlobWriterFactory {
* @param streamName name of the stream that this AzureBlobWriter is associated with
* @param maxBlockFlushThresholdSize threshold at which to upload
* @param flushTimeoutMs timeout after which the flush is abandoned
* @param initBufferSize initial size of in-memory buffer(s)
* @return AzureBlobWriter instance
* @throws IOException if writer creation fails
*/
AzureBlobWriter getWriterInstance(BlobContainerAsyncClient containerAsyncClient, String blobURL,
Executor blobUploadThreadPool, AzureBlobWriterMetrics metrics,
BlobMetadataGeneratorFactory blobMetadataGeneratorFactory, Config blobMetadataGeneratorConfig, String streamName,
int maxBlockFlushThresholdSize, long flushTimeoutMs, Compression compression, boolean useRandomStringInBlobName,
long maxBlobSize, long maxMessagesPerBlob) throws IOException;
long maxBlobSize, long maxMessagesPerBlob, int initBufferSize) throws IOException;
}
Loading

0 comments on commit aa5db44

Please sign in to comment.