Skip to content

Commit

Permalink
Add Default Configuration for RocksDB MaxManifestFileSize (#1706)
Browse files Browse the repository at this point in the history
* add default configuration for rocksdb max manifest file size

* update doc, clarify the configuration

* create constant for default store name

* address minor comments

* table format

* one-line
  • Loading branch information
yehaolan authored Aug 26, 2024
1 parent 2a94cad commit a52ef9d
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ These properties define Samza's storage mechanism for efficient [stateful stream
|stores.**_store-name_**.<br>rocksdb.keep.log.file.num|2|The number of RocksDB LOG files (including rotated LOG.old.* files) to keep.|
|stores.**_store-name_**.<br>rocksdb.metrics.list|(none)|A list of [RocksDB properties](https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L409) to expose as metrics (gauges).|
|stores.**_store-name_**.<br>rocksdb.delete.obsolete.files.period.micros|21600000000|This property specifies the period in microseconds to delete obsolete files regardless of files removed during compaction. Allowed range is up to 9223372036854775807.|
|stores.**_store-name_**.<br>rocksdb.max.manifest.file.size|18446744073709551615|This property specifies the maximum size of the MANIFEST data file, after which it is rotated. Default value is also the maximum, making it practically unlimited: only one manifest file is used.|
|stores-default.<br>rocksdb.max.manifest.file.size.bytes|1073741824| This property specifies the default maximum size (in bytes) of the MANIFEST data file for **ANY** stores, after which it is rotated. The default value is 1GB. The value for a specific store can be configured by `stores.store-name.rocksdb.max.manifest.file.size`.|
|stores.**_store-name_**.<br>rocksdb.max.manifest.file.size|stores-default.<br>rocksdb.max.manifest.file.size.bytes| This property specifies the maximum size (in bytes) of the MANIFEST data file for a specific store, after which it is rotated. The default value is defined by `stores-default.rocksdb.max.manifest.file.size.bytes`.|
|stores.**_store-name_**.<br>side.inputs|(none)|Samza applications with stores that are populated by a secondary data sources such as HDFS, but otherwise ready-only, can leverage side inputs. Stores configured with side inputs use the the source streams to bootstrap data in the absence of local copy thereby, reducing additional copy of the data in changelog. It is also recommended to enable host affinity feature when turning on side inputs to prevent bootstrapping of the data during container restarts. The value is a comma-separated list of streams.<br> Each stream is of the format `system-name.stream-name`. Additionally, applications should add the side inputs to job inputs (`task.inputs`) and configure side input processor (`stores.store-name.side.inputs.processor.factory`).
|stores.**_store-name_**.<br>side.inputs.processor.factory|(none)|The value is a fully-qualified name of a Java class that implements <a href="../api/javadocs/org/apache/samza/storage/SideInputProcessorFactory.html">SideInputProcessorFactory</a>. It is a required configuration for stores with side inputs (`stores.store-name.side.inputs`).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ public class StorageConfig extends MapConfig {
public static final String STORE_RESTORE_FACTORIES = STORE_PREFIX + "%s." + RESTORE_FACTORIES_SUFFIX;
public static final String JOB_RESTORE_FACTORIES = STORE_PREFIX + RESTORE_FACTORIES_SUFFIX;
public static final List<String> DEFAULT_RESTORE_FACTORIES = ImmutableList.of(KAFKA_STATE_BACKEND_FACTORY);
public static final long DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE_IN_BYTES = 1024 * 1024 * 1024L;

static final String DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE = "stores-default.rocksdb.max.manifest.file.size.bytes";
static final String CHANGELOG_SYSTEM = "job.changelog.system";
static final String CHANGELOG_DELETE_RETENTION_MS = STORE_PREFIX + "%s.changelog.delete.retention.ms";
static final long DEFAULT_CHANGELOG_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1);
Expand Down Expand Up @@ -263,6 +265,15 @@ public long getChangelogMinCompactionLagMs(String storeName) {
return getLong(minCompactLagConfigName, getDefaultChangelogMinCompactionLagMs());
}

/**
* Helper method to get the default RocksDB max manifest file size in bytes for ANY stores, which is default to 1GB.
* The default value for ANY stores can be configured by "stores-default.rocksdb.max.manifest.file.size.bytes",
* and the value for a specific store can be configured by "stores.store-name.rocksdb.max.manifest.file.size".
*/
public long getDefaultMaxManifestFileSizeBytes() {
return getLong(DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE, DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE_IN_BYTES);
}

/**
* Helper method to check if there is any stores configured w/ a changelog
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,4 +590,15 @@ public void testGetBackupManagers() {
assertEquals(Collections.emptyList(),
new StorageConfig(new MapConfig(configMap)).getStoresWithBackupFactory(storeBackupFactory2));
}

@Test
public void testGetMaxManifestFileSize() {
// empty config, return default size, which is 1GB
assertEquals(DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE_IN_BYTES,
new StorageConfig(new MapConfig()).getDefaultMaxManifestFileSizeBytes());

StorageConfig storageConfig = new StorageConfig(
new MapConfig(ImmutableMap.of(String.format(DEFAULT_ROCKSDB_MAX_MANIFEST_FILE_SIZE), "1024")));
assertEquals(1024, storageConfig.getDefaultMaxManifestFileSizeBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ public RocksDbKeyValueReader(String storeName, String dbPath, Config config) {
valueSerde = getSerdeFromName(storageConfig.getStorageMsgSerde(storeName).orElse(null), serializerConfig);

// get db options
Options options = RocksDbOptionsHelper.options(config, 1, new File(dbPath), StorageEngineFactory.StoreMode.ReadWrite);
Options options = RocksDbOptionsHelper.options(config,
1,
storageConfig.getDefaultMaxManifestFileSizeBytes(),
new File(dbPath),
StorageEngineFactory.StoreMode.ReadWrite
);

// open the db
RocksDB.loadLibrary();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public class RocksDbOptionsHelper {
*/
private static final int DEFAULT_ROCKSDB_MAX_BACKGROUND_JOBS = 4;


public static Options options(Config storeConfig, int numTasksForContainer, File storeDir, StorageEngineFactory.StoreMode storeMode) {
public static Options options(Config storeConfig, int numTasksForContainer, long defaultMaxManifestFileSize,
File storeDir, StorageEngineFactory.StoreMode storeMode) {
Options options = new Options();

if (storeConfig.getBoolean(ROCKSDB_WAL_ENABLED, false)) {
Expand Down Expand Up @@ -143,10 +143,8 @@ public static Options options(Config storeConfig, int numTasksForContainer, File
options.setDeleteObsoleteFilesPeriodMicros(storeConfig.getLong(ROCKSDB_DELETE_OBSOLETE_FILES_PERIOD_MICROS, 21600000000L));
options.setMaxOpenFiles(storeConfig.getInt(ROCKSDB_MAX_OPEN_FILES, -1));
options.setMaxFileOpeningThreads(storeConfig.getInt(ROCKSDB_MAX_FILE_OPENING_THREADS, 16));
// The default for rocksdb is 18446744073709551615, which is larger than java Long.MAX_VALUE. Hence setting it only if it's passed.
if (storeConfig.containsKey(ROCKSDB_MAX_MANIFEST_FILE_SIZE)) {
options.setMaxManifestFileSize(storeConfig.getLong(ROCKSDB_MAX_MANIFEST_FILE_SIZE));
}
// The default for rocksdb is 1GB (1024*1024*1024 bytes)
options.setMaxManifestFileSize(storeConfig.getLong(ROCKSDB_MAX_MANIFEST_FILE_SIZE, defaultMaxManifestFileSize));
// use prepareForBulk load only when i. the store is being requested in BulkLoad mode
// and ii. the storeDirectory does not exist (fresh restore), because bulk load does not work seamlessly with
// existing stores : https://github.com/facebook/rocksdb/issues/2734
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,22 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi
registry: MetricsRegistry,
jobContext: JobContext,
containerContext: ContainerContext, storeMode: StoreMode): KeyValueStore[Array[Byte], Array[Byte]] = {
val storageConfig = new StorageConfig(jobContext.getConfig)
val storageConfigSubset = jobContext.getConfig.subset("stores." + storeName + ".", true)
val isLoggedStore = new StorageConfig(jobContext.getConfig).getChangelogStream(storeName).isPresent
val isLoggedStore = storageConfig.getChangelogStream(storeName).isPresent
val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry)
val numTasksForContainer = containerContext.getContainerModel.getTasks.keySet().size()
val defaultMaxManifestFileSize = storageConfig.getDefaultMaxManifestFileSizeBytes
rocksDbMetrics.newGauge("rocksdb.block-cache-size",
() => RocksDbOptionsHelper.getBlockCacheSize(storageConfigSubset, numTasksForContainer))

val rocksDbOptions = RocksDbOptionsHelper.options(storageConfigSubset, numTasksForContainer, storeDir, storeMode)
val rocksDbOptions = RocksDbOptionsHelper.options(
storageConfigSubset,
numTasksForContainer,
defaultMaxManifestFileSize,
storeDir,
storeMode
)
val rocksDbWriteOptions = new WriteOptions()

if (!storageConfigSubset.getBoolean(RocksDbOptionsHelper.ROCKSDB_WAL_ENABLED, false)) {
Expand Down

0 comments on commit a52ef9d

Please sign in to comment.