Skip to content

Commit

Permalink
Create store directory paths in CSM constructor for disk space monitor (
Browse files Browse the repository at this point in the history
apache#1697) (apache#43)

* Create store directory paths in CSM constructor to be able to monitor the disk usage of the store directories

* Fix stylecheck issues

* Refactor - init all store paths together and do not mutate the storeDirPaths. Added test

* Remove ununsed method

* Remove ununsed method

* Stylecheck, Remove ununsed import

Co-authored-by: Shekhar Sharma <[email protected]>
  • Loading branch information
shekhars-li and shekhars-li authored Apr 5, 2024
1 parent daae0e4 commit 01534ff
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,19 @@ public ContainerStorageManager(
this.context = new ContextImpl(jobContext, containerContext, Optional.empty(), Optional.empty(), Optional.empty(),
externalContextOptional);

this.storeDirectoryPaths = new HashSet<>();

// Setting the init thread pool size equal to the number of taskInstances
this.parallelInitThreadPoolSize = containerModel.getTasks().size();

// Note: The store directory paths are used by SamzaContainer to add a metric to watch the disk space usage
// of the store directories. The stores itself does not need to be created but the store directory paths need to be
// set to be able to monitor them, once they're created and in use.
this.storeDirectoryPaths = ContainerStorageManagerUtil.getStoreDirPaths(config, storageEngineFactories,
activeTaskChangelogSystemStreams, sideInputStoreNames, containerModel, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);

this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores(
activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames,
storeDirectoryPaths, containerModel, jobContext, containerContext,
samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil,
activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames, containerModel, jobContext,
containerContext, taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);

this.daVinciStores = ContainerStorageManagerUtil.createDaVinciStores(
Expand Down Expand Up @@ -259,8 +263,7 @@ public Map<TaskName, Checkpoint> start() throws SamzaException, InterruptedExcep
this.sideInputsManager = new SideInputsManager(
sideInputSystemStreams, systemFactories,
changelogSystemStreams, activeTaskChangelogSystemStreams,
storageEngineFactories, storeDirectoryPaths,
containerModel, jobContext, containerContext,
storageEngineFactories, containerModel, jobContext, containerContext,
samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors,
streamMetadataCache, systemAdmins, serdeManager, serdes, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
Expand Down Expand Up @@ -372,8 +375,7 @@ private Map<TaskName, Checkpoint> restoreStores() throws InterruptedException {
.filter(s -> !inMemoryStoreNames.contains(s)).collect(Collectors.toSet());
this.taskStores = ContainerStorageManagerUtil.createTaskStores(
storesToCreate, this.storageEngineFactories, this.sideInputStoreNames,
this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths,
this.containerModel, this.jobContext, this.containerContext,
this.activeTaskChangelogSystemStreams, this.containerModel, this.jobContext, this.containerContext,
this.serdes, this.samzaContainerMetrics, this.taskInstanceMetrics, this.taskInstanceCollectors, this.storageManagerUtil,
this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory, this.config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public static Map<TaskName, Map<String, StorageEngine>> createTaskStores(Set<Str
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Set<String> sideInputStoreNames,
Map<String, SystemStream> activeTaskChangelogSystemStreams,
Set<Path> storeDirectoryPaths,
ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
Map<String, Serde<Object>> serdes,
SamzaContainerMetrics samzaContainerMetrics,
Expand All @@ -82,7 +81,6 @@ public static Map<TaskName, Map<String, StorageEngine>> createTaskStores(Set<Str
File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory,
Config config) {
Map<TaskName, Map<String, StorageEngine>> taskStores = new HashMap<>();
StorageConfig storageConfig = new StorageConfig(config);

// iterate over each task and each storeName
for (Map.Entry<TaskName, TaskModel> task : containerModel.getTasks().entrySet()) {
Expand All @@ -93,16 +91,8 @@ public static Map<TaskName, Map<String, StorageEngine>> createTaskStores(Set<Str
}

for (String storeName : storesToCreate) {
List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
// A store is considered durable if it is backed by a changelog or another backupManager factory
boolean isDurable = activeTaskChangelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty();
boolean isSideInput = sideInputStoreNames.contains(storeName);
// Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir
// for non logged stores
File storeBaseDir = isDurable || isSideInput ? loggedStoreBaseDirectory : nonLoggedStoreBaseDirectory;
File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName,
taskModel.getTaskMode());
storeDirectoryPaths.add(storeDirectory.toPath());
File storeDirectory = getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams,
sideInputStoreNames, taskName, taskModel, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);

// if taskInstanceMetrics are specified use those for store metrics,
// otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap
Expand Down Expand Up @@ -170,7 +160,6 @@ public static Map<TaskName, Map<String, StorageEngine>> createInMemoryStores(
Map<String, SystemStream> activeTaskChangelogSystemStreams,
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Set<String> sideInputStoreNames,
Set<Path> storeDirectoryPaths,
ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
SamzaContainerMetrics samzaContainerMetrics,
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
Expand All @@ -182,8 +171,7 @@ public static Map<TaskName, Map<String, StorageEngine>> createInMemoryStores(
Set<String> inMemoryStoreNames = getInMemoryStoreNames(storageEngineFactories, config);
return ContainerStorageManagerUtil.createTaskStores(
inMemoryStoreNames, storageEngineFactories, sideInputStoreNames,
activeTaskChangelogSystemStreams, storeDirectoryPaths,
containerModel, jobContext, containerContext, serdes,
activeTaskChangelogSystemStreams, containerModel, jobContext, containerContext, serdes,
samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
}
Expand Down Expand Up @@ -448,4 +436,42 @@ public static Set<String> getSideInputStoreNames(
}
return sideInputStores;
}

public static Set<Path> getStoreDirPaths(Config config, Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Map<String, SystemStream> activeTaskChangelogSystemStreams, Set<String> sideInputStoreNames,
ContainerModel containerModel, StorageManagerUtil storageManagerUtil, File loggedStoreBaseDirectory,
File nonLoggedStoreBaseDirectory) {
Set<Path> storeDirectoryPaths = new HashSet<>();
StorageConfig storageConfig = new StorageConfig(config);
Set<String> storeNames = new HashSet<>();
// Add all side input and regular stores
storeNames.addAll(storageConfig.getStoreNames());
// Add all in-memory store names
storeNames.addAll(getInMemoryStoreNames(storageEngineFactories, config));

for (String storeName : storeNames) {
for (Map.Entry<TaskName, TaskModel> task : containerModel.getTasks().entrySet()) {
File storeDirPath =
getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams, sideInputStoreNames, task.getKey(),
task.getValue(), storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);
storeDirectoryPaths.add(storeDirPath.toPath());
}
}
return storeDirectoryPaths;
}
public static File getStoreDirPath(String storeName, Config config, Map<String, SystemStream> activeTaskChangelogSystemStreams,
Set<String> sideInputStoreNames, TaskName taskName, TaskModel taskModel, StorageManagerUtil storageManagerUtil,
File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory) {
StorageConfig storageConfig = new StorageConfig(config);
List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
// A store is considered durable if it is backed by a changelog or another backupManager factory
boolean isDurable = activeTaskChangelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty();
boolean isSideInput = sideInputStoreNames.contains(storeName);
// Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir
// for non logged stores
File storeBaseDir = isDurable || isSideInput ? loggedStoreBaseDirectory : nonLoggedStoreBaseDirectory;
File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName,
taskModel.getTaskMode());
return storeDirectory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import scala.collection.JavaConversions;

import java.io.File;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -119,7 +118,6 @@ public SideInputsManager(Map<String, Set<SystemStream>> sideInputSystemStreams,
Map<String, SystemStream> changelogSystemStreams,
Map<String, SystemStream> activeTaskChangelogSystemStreams,
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Set<Path> storeDirectoryPaths,
ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
SamzaContainerMetrics samzaContainerMetrics,
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
Expand Down Expand Up @@ -147,8 +145,7 @@ public SideInputsManager(Map<String, Set<SystemStream>> sideInputSystemStreams,
// create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories
this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(
sideInputStoreNames, storageEngineFactories, sideInputStoreNames,
activeTaskChangelogSystemStreams, storeDirectoryPaths,
containerModel, jobContext, containerContext, serdes,
activeTaskChangelogSystemStreams, containerModel, jobContext, containerContext, serdes,
samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableSet;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1081,6 +1082,39 @@ public void testRestoreRecoversFromDeletedException() throws Exception {
inOrder.verify(blobStoreManager).close(); // close called on blobStoreManager passed to taskRestoreManager
}

@Test
public void testStoreDirectoriesInitialized() {
String sideInputStore = "sideInputStore";
String inMemoryStore = "inMemoryStore";
String regularStore = "regularStore";
Map<String, String> storeFactories = new HashMap<>();
storeFactories.put(String.format("stores.%s.side.inputs.processor.factory", sideInputStore), "sideinputfactory");
storeFactories.put(String.format("stores.%s.factory", regularStore), "regularstorefactory");
storeFactories.put(String.format("stores.%s.factory", inMemoryStore),
"org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory");
Map<String, String> configMap = new HashMap<>(storeFactories);
Config config = new MapConfig(configMap);
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories = new HashMap<>();
storageEngineFactories.put(sideInputStore, (StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class));
storageEngineFactories.put(inMemoryStore, (StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class));
storageEngineFactories.put(regularStore, (StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class));

Map<String, SystemStream> activeTaskChangelogSystemStreams = new HashMap<>();
activeTaskChangelogSystemStreams.put(regularStore, new SystemStream("kafka", "changelog"));
Set<String> sideInputStoreNames = new HashSet<>();
sideInputStoreNames.add(sideInputStore);
ContainerModel containerModel = mock(ContainerModel.class);
when(containerModel.getTasks())
.thenReturn(ImmutableMap.of(new TaskName("task"),
new TaskModel(new TaskName("task"), Collections.emptySet(), new Partition(1))));

Set<Path> storeDirPaths = ContainerStorageManagerUtil.getStoreDirPaths(config, storageEngineFactories,
activeTaskChangelogSystemStreams, sideInputStoreNames, containerModel, new StorageManagerUtil(),
new File("/tmp"), new File("/tmp2"));

assertEquals(3, storeDirPaths.size());
}

@Test
public void getActiveTaskChangelogSystemStreams() {
Map<String, SystemStream> storeToChangelogSystemStreams =
Expand Down

0 comments on commit 01534ff

Please sign in to comment.