Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding internal config to retain non-logged stores on container start #1229

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,12 @@ public class StorageConfig extends MapConfig {
static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
"org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory";

// Internal config to clean storeDirs of a store on container start. This is used to benchmark bootstrap performance.
// Internal config to clean storeDirs of a logged store on container start. This is used to benchmark bootstrap performance.
static final String CLEAN_LOGGED_STOREDIRS_ON_START = STORE_PREFIX + "%s.clean.on.container.start";

// Internal config to clean storeDirs of a logged store on container start. This is used to benchmark bootstrap performance.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/s/logged/non-logged/

static final String RETAIN_NONLOGGED_STOREDIRS_ON_START = STORE_PREFIX + "%s.retain.on.container.start";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we name this config to reflect the non-logged only case ? %s.retain.nonlogged.on.container.start


public StorageConfig(Config config) {
super(config);
}
Expand Down Expand Up @@ -266,4 +269,13 @@ public int getNumPersistentStores() {
public boolean getCleanLoggedStoreDirsOnStart(String storeName) {
return getBoolean(String.format(CLEAN_LOGGED_STOREDIRS_ON_START, storeName), false);
}

/**
* Helper method to get if nonlogged store dirs should not be deleted on container start.
* @param storeName
* @return
*/
public boolean getRetainNonloggedStoreDirsOnStart(String storeName) {
return getBoolean(String.format(RETAIN_NONLOGGED_STOREDIRS_ON_START, storeName), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private void cleanBaseDirsAndReadOffsetFiles() {
storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode());
LOG.info("Got non logged storage partition directory as " + nonLoggedStorePartitionDir.toPath().toString());

if (nonLoggedStorePartitionDir.exists()) {
if (nonLoggedStorePartitionDir.exists() || !storageConfig.getRetainNonloggedStoreDirsOnStart(storeName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update doc string to indicate that deletion is conditional.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be && instead of ||?

LOG.info("Deleting non logged storage partition directory " + nonLoggedStorePartitionDir.toPath().toString());
fileUtil.rm(nonLoggedStorePartitionDir);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,15 +214,18 @@ static StoreActions getStoreActions(
return;
}

// persistent but non-logged stores are always deleted
if (storageEngine.getStoreProperties().isPersistedToDisk() &&
!storageEngine.getStoreProperties().isLoggedStore()) {
File currentDir = storageManagerUtil.getTaskStoreDir(
nonLoggedStoreBaseDirectory, storeName, taskName, taskMode);
LOG.info("Marking current directory: {} for store: {} in task: {} for deletion since it is not a logged store.",
currentDir, storeName, taskName);
storeDirsToDelete.put(storeName, currentDir);
// persistent but non-logged stores should not have checkpoint dirs
// persistent but non-logged stores are always deleted unless retain.on.container.start config is set
if (storageEngine.getStoreProperties().isPersistedToDisk() && !storageEngine.getStoreProperties().isLoggedStore()) {
File currentDir = storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskName, taskMode);

if (!new StorageConfig(config).getRetainNonloggedStoreDirsOnStart(storeName)) {
LOG.info("Marking current directory: {} for store: {} in task: {} for deletion since it is not a logged store.",
currentDir, storeName, taskName);
storeDirsToDelete.put(storeName, currentDir);
// persistent but non-logged stores should not have checkpoint dirs
} else {
LOG.info("Retaining current directory: {} for store: {} in task: {}", currentDir, storeName, taskName);
}
return;
}

Expand All @@ -243,17 +246,6 @@ static StoreActions getStoreActions(
timeSinceLastCheckpointInMs = System.currentTimeMillis() -
checkpointedChangelogOffset.getCheckpointId().getMillis();
}

// if the clean.store.start config is set, delete the currentDir, restore from oldest offset to checkpointed
if (storageEngine.getStoreProperties().isPersistedToDisk() && new StorageConfig(
config).getCleanLoggedStoreDirsOnStart(storeName)) {
File currentDir = storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskName, taskMode);
LOG.info("Marking current directory: {} for store: {} in task: {}.", currentDir, storeName, taskName);
storeDirsToDelete.put(storeName, currentDir);
LOG.info("Marking restore offsets for store: {} in task: {} to {}, {} ", storeName, taskName, oldestOffset, checkpointedOffset);
storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset));
return;
}

// if the clean.store.start config is set, delete the currentDir, restore from oldest offset to checkpointed
if (storageEngine.getStoreProperties().isPersistedToDisk() && new StorageConfig(
Expand Down