Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2444: JobModel save in CoordinatorStreamStore resulting flush for each message #1259

Merged
merged 7 commits into from
Feb 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public UUID writeContainerPlacementRequestMessage(String deploymentId, String pr
try {
containerPlacementMessageStore.put(toContainerPlacementMessageKey(message.getUuid(), message.getClass()),
objectMapper.writeValueAsBytes(message));
containerPlacementMessageStore.flush();
} catch (Exception ex) {
throw new SamzaException(
String.format("ContainerPlacementRequestMessage might have been not written to metastore %s", message), ex);
Expand All @@ -137,6 +138,7 @@ void writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage me
try {
containerPlacementMessageStore.put(toContainerPlacementMessageKey(message.getUuid(), message.getClass()),
objectMapper.writeValueAsBytes(message));
containerPlacementMessageStore.flush();
} catch (Exception ex) {
throw new SamzaException(
String.format("ContainerPlacementResponseMessage might have been not written to metastore %s", message), ex);
Expand Down Expand Up @@ -199,6 +201,7 @@ public void deleteContainerPlacementRequestMessage(UUID uuid) {
Preconditions.checkState(!stopped, "Underlying metadata store not available");
Preconditions.checkNotNull(uuid, "uuid cannot be null");
containerPlacementMessageStore.delete(toContainerPlacementMessageKey(uuid, ContainerPlacementRequestMessage.class));
containerPlacementMessageStore.flush();
}

/**
Expand All @@ -209,6 +212,7 @@ public void deleteContainerPlacementResponseMessage(UUID uuid) {
Preconditions.checkState(!stopped, "Underlying metadata store not available");
Preconditions.checkNotNull(uuid, "uuid cannot be null");
containerPlacementMessageStore.delete(toContainerPlacementMessageKey(uuid, ContainerPlacementResponseMessage.class));
containerPlacementMessageStore.flush();
}

/**
Expand All @@ -230,6 +234,7 @@ public void deleteAllContainerPlacementMessages() {
for (String key : requestKeys) {
containerPlacementMessageStore.delete(key);
}
containerPlacementMessageStore.flush();
}

static String toContainerPlacementMessageKey(UUID uuid, Class<?> messageType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public void writeContainerToHostMapping(String containerId, String hostName) {
}

metadataStore.put(containerId, valueSerde.toBytes(hostName));
metadataStore.flush();
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,35 @@ public Map<TaskName, TaskMode> readTaskModes() {
}

/**
* Method to write task container info to {@link MetadataStore}.
*
* @param taskName the task name
* @param containerId the SamzaContainer ID or {@code null} to delete the mapping
* @param taskMode the mode of the task
* Method to batch write task container info to {@link MetadataStore}.
* @param mappings the task and container mappings: (ContainerId, (TaskName, TaskMode))
*/
public void writeTaskContainerMapping(String taskName, String containerId, TaskMode taskMode) {
String existingContainerId = taskNameToContainerId.get(taskName);
if (existingContainerId != null && !existingContainerId.equals(containerId)) {
LOG.info("Task \"{}\" in mode {} moved from container {} to container {}", new Object[]{taskName, taskMode, existingContainerId, containerId});
} else {
LOG.debug("Task \"{}\" in mode {} assigned to container {}", taskName, taskMode, containerId);
}
public void writeTaskContainerMappings(Map<String, Map<String, TaskMode>> mappings) {
for (String containerId : mappings.keySet()) {
Map<String, TaskMode> tasks = mappings.get(containerId);
for (String taskName : tasks.keySet()) {
TaskMode taskMode = tasks.get(taskName);
LOG.info("Storing task: {} and container ID: {} into metadata store", taskName, containerId);
String existingContainerId = taskNameToContainerId.get(taskName);
if (existingContainerId != null && !existingContainerId.equals(containerId)) {
LOG.info("Task \"{}\" in mode {} moved from container {} to container {}", new Object[]{taskName, taskMode, existingContainerId, containerId});
} else {
LOG.debug("Task \"{}\" in mode {} assigned to container {}", taskName, taskMode, containerId);
}

if (containerId == null) {
taskContainerMappingMetadataStore.delete(taskName);
taskModeMappingMetadataStore.delete(taskName);
taskNameToContainerId.remove(taskName);
} else {
taskContainerMappingMetadataStore.put(taskName, containerIdSerde.toBytes(containerId));
taskModeMappingMetadataStore.put(taskName, taskModeSerde.toBytes(taskMode.toString()));
taskNameToContainerId.put(taskName, containerId);
if (containerId == null) {
taskContainerMappingMetadataStore.delete(taskName);
taskModeMappingMetadataStore.delete(taskName);
taskNameToContainerId.remove(taskName);
} else {
taskContainerMappingMetadataStore.put(taskName, containerIdSerde.toBytes(containerId));
taskModeMappingMetadataStore.put(taskName, taskModeSerde.toBytes(taskMode.toString()));
taskNameToContainerId.put(taskName, containerId);
}
}
}
taskContainerMappingMetadataStore.flush();
taskModeMappingMetadataStore.flush();
}

/**
Expand All @@ -132,6 +138,8 @@ public void deleteTaskContainerMappings(Iterable<String> taskNames) {
taskModeMappingMetadataStore.delete(taskName);
taskNameToContainerId.remove(taskName);
}
taskContainerMappingMetadataStore.flush();
taskModeMappingMetadataStore.flush();
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,28 +66,32 @@ public TaskPartitionAssignmentManager(MetadataStore metadataStore) {
}

/**
* Stores the task to partition assignments to the metadata store.
* @param partition the system stream partition.
* @param taskNames the task names to which the partition is assigned to.
* Stores the task names to {@link SystemStreamPartition} assignments to the metadata store.
* @param sspToTaskNameMapping the mapped assignments to write to the metadata store. If the task name list is empty,
* then the entry is deleted from the metadata store.
*/
public void writeTaskPartitionAssignment(SystemStreamPartition partition, List<String> taskNames) {
// For broadcast streams, a input system stream partition will be mapped to more than one tasks in a
// SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here
// systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka.
String serializedSSPAsJson = serializeSSPToJson(partition);
if (taskNames == null || taskNames.isEmpty()) {
LOG.info("Deleting the key: {} from the metadata store.", partition);
metadataStore.delete(serializedSSPAsJson);
} else {
try {
String taskNamesAsString = taskNamesMapper.writeValueAsString(taskNames);
byte[] taskNamesAsBytes = valueSerde.toBytes(taskNamesAsString);
LOG.info("Storing the partition: {} and taskNames: {} into the metadata store.", serializedSSPAsJson, taskNames);
metadataStore.put(serializedSSPAsJson, taskNamesAsBytes);
} catch (Exception e) {
throw new SamzaException("Exception occurred when writing task to partition assignment.", e);
public void writeTaskPartitionAssignments(Map<SystemStreamPartition, List<String>> sspToTaskNameMapping) {
for (SystemStreamPartition partition: sspToTaskNameMapping.keySet()) {
List<String> taskNames = sspToTaskNameMapping.get(partition);
// For broadcast streams, a input system stream partition will be mapped to more than one tasks in a
// SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here
// systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka.
String serializedSSPAsJson = serializeSSPToJson(partition);
if (taskNames == null || taskNames.isEmpty()) {
LOG.info("Deleting the key: {} from the metadata store.", partition);
metadataStore.delete(serializedSSPAsJson);
} else {
try {
alnzng marked this conversation as resolved.
Show resolved Hide resolved
String taskNamesAsString = taskNamesMapper.writeValueAsString(taskNames);
byte[] taskNamesAsBytes = valueSerde.toBytes(taskNamesAsString);
LOG.info("Storing the partition: {} and taskNames: {} into the metadata store.", serializedSSPAsJson, taskNames);
metadataStore.put(serializedSSPAsJson, taskNamesAsBytes);
} catch (Exception e) {
throw new SamzaException("Exception occurred when writing task to partition assignment.", e);
}
}
}
metadataStore.flush();
}

/**
Expand Down Expand Up @@ -120,6 +124,7 @@ public void delete(Iterable<SystemStreamPartition> systemStreamPartitions) {
String serializedSSPAsJson = serializeSSPToJson(systemStreamPartition);
metadataStore.delete(serializedSSPAsJson);
}
metadataStore.flush();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public Optional<String> getRunId() {
String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8);
LOG.info("Writing the run id for this run as {}", runId);
metadataStore.put(CoordinationConstants.RUNID_STORE_KEY, runId.getBytes("UTF-8"));
metadataStore.flush();
} else {
runId = new String(metadataStore.get(CoordinationConstants.RUNID_STORE_KEY));
LOG.info("Read the run id for this run as {}", runId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,6 @@ public byte[] get(String namespacedKey) {

@Override
public void put(String namespacedKey, byte[] value) {
putWithoutFlush(namespacedKey, value);
flush();
}

@Override
public void putAll(Map<String, byte[]> entries) {
for (Map.Entry<String, byte[]> entry : entries.entrySet()) {
putWithoutFlush(entry.getKey(), entry.getValue());
}
flush();
}

private void putWithoutFlush(String namespacedKey, byte[] value) {
// 1. Store the namespace and key into correct fields of the CoordinatorStreamKey and convert the key to bytes.
CoordinatorMessageKey coordinatorMessageKey = deserializeCoordinatorMessageKeyFromJson(namespacedKey);
CoordinatorStreamKeySerde keySerde = new CoordinatorStreamKeySerde(coordinatorMessageKey.getNamespace());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ private void checkAndCreateStreams(String lockId, List<StreamSpec> intStreams, S
streamManager.createStreams(intStreams);
String streamCreatedMessage = "Streams created by processor " + processorId;
metadataStore.put(String.format(STREAM_CREATED_STATE_KEY, lockId), streamCreatedMessage.getBytes("UTF-8"));
metadataStore.flush();
distributedLock.unlock();
break;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public static void writeJobModel(JobModel jobModel, String jobModelVersion, Meta
byte[] jobModelSerializedAsBytes = jobModelSerializedAsString.getBytes(UTF_8);
String metadataStoreKey = getJobModelKey(jobModelVersion);
metadataStore.put(metadataStoreKey, jobModelSerializedAsBytes);
metadataStore.flush();
} catch (Exception e) {
throw new SamzaException(String.format("Exception occurred when storing JobModel: %s with version: %s.", jobModel, jobModelVersion), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startp

try {
readWriteStore.put(toReadWriteStoreKey(ssp, taskName), objectMapper.writeValueAsBytes(startpoint));
readWriteStore.flush();
} catch (Exception ex) {
throw new SamzaException(String.format(
"Startpoint for SSP: %s and task: %s may not have been written to the metadata store.", ssp, taskName), ex);
Expand Down Expand Up @@ -208,6 +209,7 @@ public void deleteStartpoint(SystemStreamPartition ssp, TaskName taskName) {
Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");

readWriteStore.delete(toReadWriteStoreKey(ssp, taskName));
readWriteStore.flush();
}

/**
Expand All @@ -218,6 +220,7 @@ public void deleteAllStartpoints() {
for (String key : readWriteKeys) {
readWriteStore.delete(key);
}
readWriteStore.flush();
}

/**
Expand Down Expand Up @@ -273,6 +276,7 @@ public Map<TaskName, Map<SystemStreamPartition, Startpoint>> fanOut(Map<TaskName
StartpointFanOutPerTask newFanOut = fanOuts.get(taskName);
fanOutStore.put(fanOutKey, objectMapper.writeValueAsBytes(newFanOut));
}
fanOutStore.flush();

for (SystemStreamPartition ssp : deleteKeys.keySet()) {
for (TaskName taskName : deleteKeys.get(ssp)) {
Expand Down Expand Up @@ -314,6 +318,7 @@ public void removeFanOutForTask(TaskName taskName) {
Preconditions.checkNotNull(taskName, "TaskName cannot be null");

fanOutStore.delete(toFanOutStoreKey(taskName));
fanOutStore.flush();
}

/**
Expand All @@ -324,6 +329,7 @@ public void removeAllFanOuts() {
for (String key : fanOutKeys) {
fanOutStore.delete(key);
}
fanOutStore.flush();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void writePartitionMapping(Map<TaskName, Integer> changelogEntries) {
metadataStore.delete(taskName);
}
}
metadataStore.flush();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ void loadMetadataResources(JobModel jobModel) {
byte[] serializedValue = jsonSerde.toBytes(entry.getValue());
configStore.put(entry.getKey(), serializedValue);
}
configStore.flush();

// fan out the startpoints
StartpointManager startpointManager = createStartpointManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,12 @@ object JobModelManager extends Logging {
// taskName to SystemStreamPartitions is done here to wire-in the data to {@see JobModel}.
val sspToTaskNameMap: util.Map[SystemStreamPartition, util.List[String]] = new util.HashMap[SystemStreamPartition, util.List[String]]()

val taskContainerMappings: util.Map[String, util.Map[String, TaskMode]] = new util.HashMap[String, util.Map[String, TaskMode]]()

for (container <- jobModel.getContainers.values()) {
for ((taskName, taskModel) <- container.getTasks) {
info ("Storing task: %s and container ID: %s into metadata store" format(taskName.getTaskName, container.getId))
taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName, container.getId, container.getTasks.get(taskName).getTaskMode)
taskContainerMappings.putIfAbsent(container.getId, new util.HashMap[String, TaskMode]())
taskContainerMappings.get(container.getId).put(taskName.getTaskName, container.getTasks.get(taskName).getTaskMode)
for (partition <- taskModel.getSystemStreamPartitions) {
if (!sspToTaskNameMap.containsKey(partition)) {
sspToTaskNameMap.put(partition, new util.ArrayList[String]())
Expand All @@ -253,10 +255,8 @@ object JobModelManager extends Logging {
}
}

for ((ssp, taskNames) <- sspToTaskNameMap) {
info ("Storing ssp: %s and task: %s into metadata store" format(ssp, taskNames))
taskPartitionAssignmentManager.writeTaskPartitionAssignment(ssp, taskNames)
}
taskAssignmentManager.writeTaskContainerMappings(taskContainerMappings)
taskPartitionAssignmentManager.writeTaskPartitionAssignments(sspToTaskNameMap);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ public void tearDown() {
@Test
public void testTaskAssignmentManager() {
Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1", "Task2", "2", "Task3", "0", "Task4", "1");
Map<String, Map<String, TaskMode>> taskContainerMappings = ImmutableMap.of(
"0", ImmutableMap.of("Task0", TaskMode.Active, "Task3", TaskMode.Active),
"1", ImmutableMap.of("Task1", TaskMode.Active, "Task4", TaskMode.Active),
"2", ImmutableMap.of("Task2", TaskMode.Active)
);

for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active);
}
taskAssignmentManager.writeTaskContainerMappings(taskContainerMappings);

Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();

Expand All @@ -75,10 +78,12 @@ public void testTaskAssignmentManager() {
@Test
public void testDeleteMappings() {
Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1");
Map<String, Map<String, TaskMode>> taskContainerMappings = ImmutableMap.of(
"0", ImmutableMap.of("Task0", TaskMode.Active),
"1", ImmutableMap.of("Task1", TaskMode.Active)
);

for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active);
}
taskAssignmentManager.writeTaskContainerMappings(taskContainerMappings);

Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
assertEquals(expectedMap, localMap);
Expand Down
Loading