From 2062129b74404f346aada7a44687615d5af438b1 Mon Sep 17 00:00:00 2001 From: Ray Matharu Date: Wed, 4 Dec 2019 20:15:37 -0800 Subject: [PATCH 1/2] Adding internal config to retain non-logged stores on container start --- .../apache/samza/config/StorageConfig.java | 9 +++++- ...nTransactionalStateTaskRestoreManager.java | 2 +- .../TransactionalStateTaskRestoreManager.java | 32 +++++++------------ 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java index b5687c87dd..bd7930a58c 100644 --- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java @@ -74,9 +74,12 @@ public class StorageConfig extends MapConfig { static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY = "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory"; - // Internal config to clean storeDirs of a store on container start. This is used to benchmark bootstrap performance. + // Internal config to clean storeDirs of a logged store on container start. This is used to benchmark bootstrap performance. static final String CLEAN_LOGGED_STOREDIRS_ON_START = STORE_PREFIX + "%s.clean.on.container.start"; + // Internal config to clean storeDirs of a logged store on container start. This is used to benchmark bootstrap performance. + static final String RETAIN_NONLOGGED_STOREDIRS_ON_START = STORE_PREFIX + "%s.retain.on.container.start"; + public StorageConfig(Config config) { super(config); } @@ -266,4 +269,8 @@ public int getNumPersistentStores() { public boolean getCleanLoggedStoreDirsOnStart(String storeName) { return getBoolean(String.format(CLEAN_LOGGED_STOREDIRS_ON_START, storeName), false); } + + public boolean getRetainNonloggedStoreDirsOnStart(String storeName) { + return getBoolean(String.format(RETAIN_NONLOGGED_STOREDIRS_ON_START, storeName), false); + } } diff --git a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java index 70c61746e8..c276e7fed7 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java @@ -132,7 +132,7 @@ private void cleanBaseDirsAndReadOffsetFiles() { storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode()); LOG.info("Got non logged storage partition directory as " + nonLoggedStorePartitionDir.toPath().toString()); - if (nonLoggedStorePartitionDir.exists()) { + if (nonLoggedStorePartitionDir.exists() || !storageConfig.getRetainNonloggedStoreDirsOnStart(storeName)) { LOG.info("Deleting non logged storage partition directory " + nonLoggedStorePartitionDir.toPath().toString()); fileUtil.rm(nonLoggedStorePartitionDir); } diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java index 689d431cbd..f4f26838a8 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java @@ -214,15 +214,18 @@ static StoreActions getStoreActions( return; } - // persistent but non-logged stores are always deleted - if (storageEngine.getStoreProperties().isPersistedToDisk() && - !storageEngine.getStoreProperties().isLoggedStore()) { - File currentDir = storageManagerUtil.getTaskStoreDir( - nonLoggedStoreBaseDirectory, storeName, taskName, taskMode); - LOG.info("Marking current directory: {} for store: {} in task: {} for deletion since it is not a logged store.", - currentDir, storeName, taskName); - storeDirsToDelete.put(storeName, currentDir); - // persistent but non-logged stores should not have checkpoint dirs + // persistent but non-logged stores are always deleted unless retain.on.container.start config is set + if (storageEngine.getStoreProperties().isPersistedToDisk() && !storageEngine.getStoreProperties().isLoggedStore()) { + File currentDir = storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskName, taskMode); + + if (!new StorageConfig(config).getRetainNonloggedStoreDirsOnStart(storeName)) { + LOG.info("Marking current directory: {} for store: {} in task: {} for deletion since it is not a logged store.", + currentDir, storeName, taskName); + storeDirsToDelete.put(storeName, currentDir); + // persistent but non-logged stores should not have checkpoint dirs + } else { + LOG.info("Retaining current directory: {} for store: {} in task: {}", currentDir, storeName, taskName); + } return; } @@ -243,17 +246,6 @@ static StoreActions getStoreActions( timeSinceLastCheckpointInMs = System.currentTimeMillis() - checkpointedChangelogOffset.getCheckpointId().getMillis(); } - - // if the clean.store.start config is set, delete the currentDir, restore from oldest offset to checkpointed - if (storageEngine.getStoreProperties().isPersistedToDisk() && new StorageConfig( - config).getCleanLoggedStoreDirsOnStart(storeName)) { - File currentDir = storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskName, taskMode); - LOG.info("Marking current directory: {} for store: {} in task: {}.", currentDir, storeName, taskName); - storeDirsToDelete.put(storeName, currentDir); - LOG.info("Marking restore offsets for store: {} in task: {} to {}, {} ", storeName, taskName, oldestOffset, checkpointedOffset); - storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset)); - return; - } // if the clean.store.start config is set, delete the currentDir, restore from oldest offset to checkpointed if (storageEngine.getStoreProperties().isPersistedToDisk() && new StorageConfig( From 06af207c1543725d064da19d32293cbcd16d91d6 Mon Sep 17 00:00:00 2001 From: Ray Matharu Date: Wed, 4 Dec 2019 20:17:06 -0800 Subject: [PATCH 2/2] Adding javadoc --- .../src/main/java/org/apache/samza/config/StorageConfig.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java index bd7930a58c..2c019fa5b8 100644 --- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java @@ -270,6 +270,11 @@ public boolean getCleanLoggedStoreDirsOnStart(String storeName) { return getBoolean(String.format(CLEAN_LOGGED_STOREDIRS_ON_START, storeName), false); } + /** + * Helper method to get if nonlogged store dirs should not be deleted on container start. + * @param storeName + * @return + */ public boolean getRetainNonloggedStoreDirsOnStart(String storeName) { return getBoolean(String.format(RETAIN_NONLOGGED_STOREDIRS_ON_START, storeName), false); }