From b7efe8db5b5a878c84eaf52e16f0d15ffafe7518 Mon Sep 17 00:00:00 2001 From: Alan Zhang Date: Mon, 27 Jan 2020 11:37:54 -0800 Subject: [PATCH 1/7] Remove flush operation out of put functions Signed-off-by: Alan Zhang --- .../metadatastore/CoordinatorStreamStore.java | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java index e01a4c6d73..1df4f7017c 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java @@ -126,19 +126,6 @@ public byte[] get(String namespacedKey) { @Override public void put(String namespacedKey, byte[] value) { - putWithoutFlush(namespacedKey, value); - flush(); - } - - @Override - public void putAll(Map entries) { - for (Map.Entry entry : entries.entrySet()) { - putWithoutFlush(entry.getKey(), entry.getValue()); - } - flush(); - } - - private void putWithoutFlush(String namespacedKey, byte[] value) { // 1. Store the namespace and key into correct fields of the CoordinatorStreamKey and convert the key to bytes. CoordinatorMessageKey coordinatorMessageKey = deserializeCoordinatorMessageKeyFromJson(namespacedKey); CoordinatorStreamKeySerde keySerde = new CoordinatorStreamKeySerde(coordinatorMessageKey.getNamespace()); @@ -149,6 +136,13 @@ private void putWithoutFlush(String namespacedKey, byte[] value) { systemProducer.send(SOURCE, envelope); } + @Override + public void putAll(Map entries) { + for (Map.Entry entry : entries.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + @Override public void delete(String namespacedKey) { // Since kafka doesn't support individual message deletion, store value as null for a namespacedKey to delete. From 413c834857f6f7a17d8563fe15cb92863560cb91 Mon Sep 17 00:00:00 2001 From: Alan Zhang Date: Mon, 27 Jan 2020 13:45:01 -0800 Subject: [PATCH 2/7] Explicitly call flush method after calling put/putAll/delete methods Signed-off-by: Alan Zhang --- .../placement/ContainerPlacementMetadataStore.java | 5 +++++ .../java/org/apache/samza/container/LocalityManager.java | 1 + .../samza/container/grouper/task/TaskAssignmentManager.java | 4 ++++ .../grouper/task/TaskPartitionAssignmentManager.java | 2 ++ .../java/org/apache/samza/coordinator/RunIdGenerator.java | 1 + .../java/org/apache/samza/execution/LocalJobPlanner.java | 1 + .../main/java/org/apache/samza/job/model/JobModelUtil.java | 1 + .../java/org/apache/samza/startpoint/StartpointManager.java | 6 ++++++ .../org/apache/samza/storage/ChangelogStreamManager.java | 1 + .../src/main/java/org/apache/samza/zk/ZkJobCoordinator.java | 1 + 10 files changed, 23 insertions(+) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java index 2eb3afa7cc..02f698d780 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementMetadataStore.java @@ -119,6 +119,7 @@ public UUID writeContainerPlacementRequestMessage(String deploymentId, String pr try { containerPlacementMessageStore.put(toContainerPlacementMessageKey(message.getUuid(), message.getClass()), objectMapper.writeValueAsBytes(message)); + containerPlacementMessageStore.flush(); } catch (Exception ex) { throw new SamzaException( String.format("ContainerPlacementRequestMessage might have been not written to metastore %s", message), ex); @@ -137,6 +138,7 @@ void writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage me try { containerPlacementMessageStore.put(toContainerPlacementMessageKey(message.getUuid(), message.getClass()), objectMapper.writeValueAsBytes(message)); + containerPlacementMessageStore.flush(); } catch (Exception ex) { throw new SamzaException( String.format("ContainerPlacementResponseMessage might have been not written to metastore %s", message), ex); @@ -199,6 +201,7 @@ public void deleteContainerPlacementRequestMessage(UUID uuid) { Preconditions.checkState(!stopped, "Underlying metadata store not available"); Preconditions.checkNotNull(uuid, "uuid cannot be null"); containerPlacementMessageStore.delete(toContainerPlacementMessageKey(uuid, ContainerPlacementRequestMessage.class)); + containerPlacementMessageStore.flush(); } /** @@ -209,6 +212,7 @@ public void deleteContainerPlacementResponseMessage(UUID uuid) { Preconditions.checkState(!stopped, "Underlying metadata store not available"); Preconditions.checkNotNull(uuid, "uuid cannot be null"); containerPlacementMessageStore.delete(toContainerPlacementMessageKey(uuid, ContainerPlacementResponseMessage.class)); + containerPlacementMessageStore.flush(); } /** @@ -230,6 +234,7 @@ public void deleteAllContainerPlacementMessages() { for (String key : requestKeys) { containerPlacementMessageStore.delete(key); } + containerPlacementMessageStore.flush(); } static String toContainerPlacementMessageKey(UUID uuid, Class messageType) { diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java index 05f2e8b787..864b558ef7 100644 --- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java @@ -94,6 +94,7 @@ public void writeContainerToHostMapping(String containerId, String hostName) { } metadataStore.put(containerId, valueSerde.toBytes(hostName)); + metadataStore.flush(); } public void close() { diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java index 16f8a5190e..b74f626b71 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java @@ -119,6 +119,8 @@ public void writeTaskContainerMapping(String taskName, String containerId, TaskM taskModeMappingMetadataStore.put(taskName, taskModeSerde.toBytes(taskMode.toString())); taskNameToContainerId.put(taskName, containerId); } + taskContainerMappingMetadataStore.flush(); + taskModeMappingMetadataStore.flush(); } /** @@ -132,6 +134,8 @@ public void deleteTaskContainerMappings(Iterable taskNames) { taskModeMappingMetadataStore.delete(taskName); taskNameToContainerId.remove(taskName); } + taskContainerMappingMetadataStore.flush(); + taskModeMappingMetadataStore.flush(); } public void close() { diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java index 7e32f0a08e..2d6e6d8ed0 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java @@ -88,6 +88,7 @@ public void writeTaskPartitionAssignment(SystemStreamPartition partition, List systemStreamPartitions) { String serializedSSPAsJson = serializeSSPToJson(systemStreamPartition); metadataStore.delete(serializedSSPAsJson); } + metadataStore.flush(); } /** diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java b/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java index 284c0bf894..ddcc72bc04 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java @@ -84,6 +84,7 @@ public Optional getRunId() { String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8); LOG.info("Writing the run id for this run as {}", runId); metadataStore.put(CoordinationConstants.RUNID_STORE_KEY, runId.getBytes("UTF-8")); + metadataStore.flush(); } else { runId = new String(metadataStore.get(CoordinationConstants.RUNID_STORE_KEY)); LOG.info("Read the run id for this run as {}", runId); diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java index 000e55a4e0..f55f02fd92 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java @@ -190,6 +190,7 @@ private void checkAndCreateStreams(String lockId, List intStreams, S streamManager.createStreams(intStreams); String streamCreatedMessage = "Streams created by processor " + processorId; metadataStore.put(String.format(STREAM_CREATED_STATE_KEY, lockId), streamCreatedMessage.getBytes("UTF-8")); + metadataStore.flush(); distributedLock.unlock(); break; } else { diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java index e230b1a02c..377fa94e9f 100644 --- a/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java +++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModelUtil.java @@ -80,6 +80,7 @@ public static void writeJobModel(JobModel jobModel, String jobModelVersion, Meta byte[] jobModelSerializedAsBytes = jobModelSerializedAsString.getBytes(UTF_8); String metadataStoreKey = getJobModelKey(jobModelVersion); metadataStore.put(metadataStoreKey, jobModelSerializedAsBytes); + metadataStore.flush(); } catch (Exception e) { throw new SamzaException(String.format("Exception occurred when storing JobModel: %s with version: %s.", jobModel, jobModelVersion), e); } diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java index ba5acb7bb3..9a196c3da5 100644 --- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java +++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointManager.java @@ -145,6 +145,7 @@ public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startp try { readWriteStore.put(toReadWriteStoreKey(ssp, taskName), objectMapper.writeValueAsBytes(startpoint)); + readWriteStore.flush(); } catch (Exception ex) { throw new SamzaException(String.format( "Startpoint for SSP: %s and task: %s may not have been written to the metadata store.", ssp, taskName), ex); @@ -208,6 +209,7 @@ public void deleteStartpoint(SystemStreamPartition ssp, TaskName taskName) { Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null"); readWriteStore.delete(toReadWriteStoreKey(ssp, taskName)); + readWriteStore.flush(); } /** @@ -218,6 +220,7 @@ public void deleteAllStartpoints() { for (String key : readWriteKeys) { readWriteStore.delete(key); } + readWriteStore.flush(); } /** @@ -273,6 +276,7 @@ public Map> fanOut(Map changelogEntries) { metadataStore.delete(taskName); } } + metadataStore.flush(); } /** diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index dfd03044e8..feaabba34b 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -324,6 +324,7 @@ void loadMetadataResources(JobModel jobModel) { byte[] serializedValue = jsonSerde.toBytes(entry.getValue()); configStore.put(entry.getKey(), serializedValue); } + configStore.flush(); // fan out the startpoints StartpointManager startpointManager = createStartpointManager(); From 4f57592d6cac0596ce970448ece2f37e0699dea7 Mon Sep 17 00:00:00 2001 From: Alan Zhang Date: Mon, 27 Jan 2020 13:52:31 -0800 Subject: [PATCH 3/7] Check flush call in unit tests Signed-off-by: Alan Zhang --- .../java/org/apache/samza/coordinator/TestRunIdGenerator.java | 1 + .../org/apache/samza/runtime/TestLocalApplicationRunner.java | 2 ++ 2 files changed, 3 insertions(+) diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java index 39bc583c63..ef24541842 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java @@ -51,6 +51,7 @@ public void testSingleProcessorWriteRunId() throws Exception { verify(membership, Mockito.times(1)).registerProcessor(); verify(membership, Mockito.times(1)).getNumberOfProcessors(); verify(metadataStore, Mockito.times(1)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class)); + verify(metadataStore, Mockito.times(1)).flush(); } @Test diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index 6c7fcb49cc..5a6f332683 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -469,6 +469,7 @@ public void testRunIdForBatch() throws Exception { verify(coordinationUtils, Mockito.times(1)).getLock(CoordinationConstants.RUNID_LOCK_ID); verify(clusterMembership, Mockito.times(1)).getNumberOfProcessors(); verify(metadataStore, Mockito.times(1)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class)); + verify(metadataStore, Mockito.times(1)).flush(); } /** @@ -496,6 +497,7 @@ public void testRunIdForStream() throws Exception { verify(coordinationUtils, Mockito.times(0)).getClusterMembership(); verify(clusterMembership, Mockito.times(0)).getNumberOfProcessors(); verify(metadataStore, Mockito.times(0)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class)); + verify(metadataStore, Mockito.times(1)).flush(); } private void prepareTestForRunId() throws Exception { From 7466ca31b30248e756b618da707064d49b43c2e1 Mon Sep 17 00:00:00 2001 From: Alan Zhang Date: Mon, 27 Jan 2020 15:09:50 -0800 Subject: [PATCH 4/7] Improve performance with batch udpate 1. Batch write task partition assignments information to metadata store. 2. Batch write task container information to metadata store. Signed-off-by: Alan Zhang --- .../grouper/task/TaskAssignmentManager.java | 44 ++++++++++--------- .../task/TaskPartitionAssignmentManager.java | 42 ++++++++++-------- .../samza/coordinator/JobModelManager.scala | 12 ++--- .../task/TestTaskAssignmentManager.java | 17 ++++--- .../TestTaskPartitionAssignmentManager.java | 25 +++++------ .../coordinator/TestJobModelManager.java | 18 ++++---- 6 files changed, 85 insertions(+), 73 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java index b74f626b71..e9fcadb489 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java @@ -96,28 +96,32 @@ public Map readTaskModes() { } /** - * Method to write task container info to {@link MetadataStore}. - * - * @param taskName the task name - * @param containerId the SamzaContainer ID or {@code null} to delete the mapping - * @param taskMode the mode of the task + * Method to batch write task container info to {@link MetadataStore}. + * @param mappings the task and container mappings: (ContainerId, (TaskName, TaskMode)) */ - public void writeTaskContainerMapping(String taskName, String containerId, TaskMode taskMode) { - String existingContainerId = taskNameToContainerId.get(taskName); - if (existingContainerId != null && !existingContainerId.equals(containerId)) { - LOG.info("Task \"{}\" in mode {} moved from container {} to container {}", new Object[]{taskName, taskMode, existingContainerId, containerId}); - } else { - LOG.debug("Task \"{}\" in mode {} assigned to container {}", taskName, taskMode, containerId); - } + public void writeTaskContainerMappings(Map> mappings) { + for (String containerId : mappings.keySet()) { + Map tasks = mappings.get(containerId); + for (String taskName : tasks.keySet()) { + TaskMode taskMode = tasks.get(taskName); + LOG.info("Storing task: {} and container ID: {} into metadata store", taskName, containerId); + String existingContainerId = taskNameToContainerId.get(taskName); + if (existingContainerId != null && !existingContainerId.equals(containerId)) { + LOG.info("Task \"{}\" in mode {} moved from container {} to container {}", new Object[]{taskName, taskMode, existingContainerId, containerId}); + } else { + LOG.debug("Task \"{}\" in mode {} assigned to container {}", taskName, taskMode, containerId); + } - if (containerId == null) { - taskContainerMappingMetadataStore.delete(taskName); - taskModeMappingMetadataStore.delete(taskName); - taskNameToContainerId.remove(taskName); - } else { - taskContainerMappingMetadataStore.put(taskName, containerIdSerde.toBytes(containerId)); - taskModeMappingMetadataStore.put(taskName, taskModeSerde.toBytes(taskMode.toString())); - taskNameToContainerId.put(taskName, containerId); + if (containerId == null) { + taskContainerMappingMetadataStore.delete(taskName); + taskModeMappingMetadataStore.delete(taskName); + taskNameToContainerId.remove(taskName); + } else { + taskContainerMappingMetadataStore.put(taskName, containerIdSerde.toBytes(containerId)); + taskModeMappingMetadataStore.put(taskName, taskModeSerde.toBytes(taskMode.toString())); + taskNameToContainerId.put(taskName, containerId); + } + } } taskContainerMappingMetadataStore.flush(); taskModeMappingMetadataStore.flush(); diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java index 2d6e6d8ed0..484a824b4b 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java @@ -66,26 +66,30 @@ public TaskPartitionAssignmentManager(MetadataStore metadataStore) { } /** - * Stores the task to partition assignments to the metadata store. - * @param partition the system stream partition. - * @param taskNames the task names to which the partition is assigned to. + * Stores the task names to {@link SystemStreamPartition} assignments to the metadata store. + * @param sspToTaskNameMapping the mapped assignments to write to the metadata store. If the task name list is empty, + * then the entry is deleted from the metadata store. */ - public void writeTaskPartitionAssignment(SystemStreamPartition partition, List taskNames) { - // For broadcast streams, a input system stream partition will be mapped to more than one tasks in a - // SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here - // systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka. - String serializedSSPAsJson = serializeSSPToJson(partition); - if (taskNames == null || taskNames.isEmpty()) { - LOG.info("Deleting the key: {} from the metadata store.", partition); - metadataStore.delete(serializedSSPAsJson); - } else { - try { - String taskNamesAsString = taskNamesMapper.writeValueAsString(taskNames); - byte[] taskNamesAsBytes = valueSerde.toBytes(taskNamesAsString); - LOG.info("Storing the partition: {} and taskNames: {} into the metadata store.", serializedSSPAsJson, taskNames); - metadataStore.put(serializedSSPAsJson, taskNamesAsBytes); - } catch (Exception e) { - throw new SamzaException("Exception occurred when writing task to partition assignment.", e); + public void writeTaskPartitionAssignments(Map> sspToTaskNameMapping) { + for (SystemStreamPartition partition: sspToTaskNameMapping.keySet()) { + List taskNames = sspToTaskNameMapping.get(partition); + LOG.info("Storing ssp: {} and task: {} into metadata store", partition, taskNames); + // For broadcast streams, a input system stream partition will be mapped to more than one tasks in a + // SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here + // systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka. + String serializedSSPAsJson = serializeSSPToJson(partition); + if (taskNames == null || taskNames.isEmpty()) { + LOG.info("Deleting the key: {} from the metadata store.", partition); + metadataStore.delete(serializedSSPAsJson); + } else { + try { + String taskNamesAsString = taskNamesMapper.writeValueAsString(taskNames); + byte[] taskNamesAsBytes = valueSerde.toBytes(taskNamesAsString); + LOG.info("Storing the partition: {} and taskNames: {} into the metadata store.", serializedSSPAsJson, taskNames); + metadataStore.put(serializedSSPAsJson, taskNamesAsBytes); + } catch (Exception e) { + throw new SamzaException("Exception occurred when writing task to partition assignment.", e); + } } } metadataStore.flush(); diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index b8c18fe768..e2dbf3f8f1 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -240,10 +240,12 @@ object JobModelManager extends Logging { // taskName to SystemStreamPartitions is done here to wire-in the data to {@see JobModel}. val sspToTaskNameMap: util.Map[SystemStreamPartition, util.List[String]] = new util.HashMap[SystemStreamPartition, util.List[String]]() + val taskContainerMappings: util.Map[String, util.Map[String, TaskMode]] = new util.HashMap[String, util.Map[String, TaskMode]]() + for (container <- jobModel.getContainers.values()) { for ((taskName, taskModel) <- container.getTasks) { - info ("Storing task: %s and container ID: %s into metadata store" format(taskName.getTaskName, container.getId)) - taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName, container.getId, container.getTasks.get(taskName).getTaskMode) + taskContainerMappings.putIfAbsent(container.getId, new util.HashMap[String, TaskMode]()) + taskContainerMappings.get(container.getId).put(taskName.getTaskName, container.getTasks.get(taskName).getTaskMode) for (partition <- taskModel.getSystemStreamPartitions) { if (!sspToTaskNameMap.containsKey(partition)) { sspToTaskNameMap.put(partition, new util.ArrayList[String]()) @@ -253,10 +255,8 @@ object JobModelManager extends Logging { } } - for ((ssp, taskNames) <- sspToTaskNameMap) { - info ("Storing ssp: %s and task: %s into metadata store" format(ssp, taskNames)) - taskPartitionAssignmentManager.writeTaskPartitionAssignment(ssp, taskNames) - } + taskAssignmentManager.writeTaskContainerMappings(taskContainerMappings) + taskPartitionAssignmentManager.writeTaskPartitionAssignments(sspToTaskNameMap); } /** diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java index 357e8ae4fb..34909e3b35 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java @@ -60,10 +60,13 @@ public void tearDown() { @Test public void testTaskAssignmentManager() { Map expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1", "Task2", "2", "Task3", "0", "Task4", "1"); + Map> taskContainerMappings = ImmutableMap.of( + "0", ImmutableMap.of("Task0", TaskMode.Active, "Task3", TaskMode.Active), + "1", ImmutableMap.of("Task1", TaskMode.Active, "Task4", TaskMode.Active), + "2", ImmutableMap.of("Task2", TaskMode.Active) + ); - for (Map.Entry entry : expectedMap.entrySet()) { - taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active); - } + taskAssignmentManager.writeTaskContainerMappings(taskContainerMappings); Map localMap = taskAssignmentManager.readTaskAssignment(); @@ -75,10 +78,12 @@ public void testTaskAssignmentManager() { @Test public void testDeleteMappings() { Map expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1"); + Map> taskContainerMappings = ImmutableMap.of( + "0", ImmutableMap.of("Task0", TaskMode.Active), + "1", ImmutableMap.of("Task1", TaskMode.Active) + ); - for (Map.Entry entry : expectedMap.entrySet()) { - taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active); - } + taskAssignmentManager.writeTaskContainerMappings(taskContainerMappings); Map localMap = taskAssignmentManager.readTaskAssignment(); assertEquals(expectedMap, localMap); diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java index 1a7f0c9c84..964f6ea774 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.samza.config.Config; @@ -63,7 +64,7 @@ public void tearDown() { @Test public void testReadAfterWrite() { List testTaskNames = ImmutableList.of("test-task1", "test-task2", "test-task3"); - taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames); + taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames)); Map> expectedMapping = ImmutableMap.of(testSystemStreamPartition, testTaskNames); Map> actualMapping = taskPartitionAssignmentManager.readTaskPartitionAssignments(); @@ -74,7 +75,7 @@ public void testReadAfterWrite() { @Test public void testDeleteAfterWrite() { List testTaskNames = ImmutableList.of("test-task1", "test-task2", "test-task3"); - taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames); + taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames)); Map> actualMapping = taskPartitionAssignmentManager.readTaskPartitionAssignments(); Assert.assertEquals(1, actualMapping.size()); @@ -94,12 +95,13 @@ public void testReadPartitionAssignments() { SystemStreamPartition testSystemStreamPartition3 = new SystemStreamPartition(TEST_SYSTEM, "stream-3", PARTITION); List testTaskNames3 = ImmutableList.of("test-task6", "test-task7", "test-task8"); - taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition1, testTaskNames1); - taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition2, testTaskNames2); - taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition3, testTaskNames3); + taskPartitionAssignmentManager.writeTaskPartitionAssignments( + ImmutableMap.of(testSystemStreamPartition1, testTaskNames1, testSystemStreamPartition2, testTaskNames2, + testSystemStreamPartition3, testTaskNames3)); - Map> expectedMapping = ImmutableMap.of(testSystemStreamPartition1, testTaskNames1, - testSystemStreamPartition2, testTaskNames2, testSystemStreamPartition3, testTaskNames3); + Map> expectedMapping = + ImmutableMap.of(testSystemStreamPartition1, testTaskNames1, testSystemStreamPartition2, testTaskNames2, + testSystemStreamPartition3, testTaskNames3); Map> actualMapping = taskPartitionAssignmentManager.readTaskPartitionAssignments(); Assert.assertEquals(expectedMapping, actualMapping); @@ -108,14 +110,11 @@ public void testReadPartitionAssignments() { @Test public void testMultipleUpdatesReturnsTheMostRecentValue() { List testTaskNames1 = ImmutableList.of("test-task1", "test-task2", "test-task3"); - - taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames1); - + taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames1)); List testTaskNames2 = ImmutableList.of("test-task4", "test-task5"); - taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames2); - + taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames2)); List testTaskNames3 = ImmutableList.of("test-task6", "test-task7", "test-task8"); - taskPartitionAssignmentManager.writeTaskPartitionAssignment(testSystemStreamPartition, testTaskNames3); + taskPartitionAssignmentManager.writeTaskPartitionAssignments(ImmutableMap.of(testSystemStreamPartition, testTaskNames3)); Map> expectedMapping = ImmutableMap.of(testSystemStreamPartition, testTaskNames3); Map> actualMapping = taskPartitionAssignmentManager.readTaskPartitionAssignments(); diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java index d58cf18d64..fe94e83ffc 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java @@ -270,7 +270,7 @@ public void testUpdateTaskAssignments() { when(mockJobModel.getContainers()).thenReturn(containerMapping); when(mockGrouperMetadata.getPreviousTaskToProcessorAssignment()).thenReturn(new HashMap<>()); - Mockito.doNothing().when(mockTaskAssignmentManager).writeTaskContainerMapping(Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.doNothing().when(mockTaskAssignmentManager).writeTaskContainerMappings(Mockito.any()); JobModelManager.updateTaskAssignments(mockJobModel, mockTaskAssignmentManager, mockTaskPartitionAssignmentManager, mockGrouperMetadata); @@ -289,18 +289,18 @@ public void testUpdateTaskAssignments() { // Verifications Mockito.verify(mockJobModel, atLeast(1)).getContainers(); Mockito.verify(mockTaskAssignmentManager).deleteTaskContainerMappings(Mockito.any()); - Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-1", "test-container-id", TaskMode.Active); - Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-2", "test-container-id", TaskMode.Active); - Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-3", "test-container-id", TaskMode.Active); - Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMapping("task-4", "test-container-id", TaskMode.Active); + Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMappings(ImmutableMap.of("test-container-id", + ImmutableMap.of("task-1", TaskMode.Active, "task-2", TaskMode.Active, "task-3", TaskMode.Active, "task-4", TaskMode.Active))); // Verify that the old, stale partition mappings had been purged in the coordinator stream. Mockito.verify(mockTaskPartitionAssignmentManager).delete(systemStreamPartitions); // Verify that the new task to partition assignment is stored in the coordinator stream. - Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignment(testSystemStreamPartition1, ImmutableList.of("task-1")); - Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignment(testSystemStreamPartition2, ImmutableList.of("task-2")); - Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignment(testSystemStreamPartition3, ImmutableList.of("task-3")); - Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignment(testSystemStreamPartition4, ImmutableList.of("task-4")); + Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignments(ImmutableMap.of( + testSystemStreamPartition1, ImmutableList.of("task-1"), + testSystemStreamPartition2, ImmutableList.of("task-2"), + testSystemStreamPartition3, ImmutableList.of("task-3"), + testSystemStreamPartition4, ImmutableList.of("task-4") + )); } } From a6696017fb3320f09a0b966dbb7c7c6ae9ae9fb0 Mon Sep 17 00:00:00 2001 From: Alan Zhang Date: Mon, 27 Jan 2020 15:15:08 -0800 Subject: [PATCH 5/7] Fix checkstyle issue Signed-off-by: Alan Zhang --- .../grouper/task/TestTaskPartitionAssignmentManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java index 964f6ea774..1a0dc48e10 100644 --- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java +++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskPartitionAssignmentManager.java @@ -20,7 +20,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.samza.config.Config; From e34e917e499c143841ca0bb588dc1323de9dbb17 Mon Sep 17 00:00:00 2001 From: Alan Zhang Date: Mon, 27 Jan 2020 15:29:03 -0800 Subject: [PATCH 6/7] Fix unit test failures Signed-off-by: Alan Zhang --- .../coordinator/metadatastore/TestCoordinatorStreamStore.java | 1 - .../org/apache/samza/runtime/TestLocalApplicationRunner.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java index e073a0e12f..5865de951b 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java @@ -105,7 +105,6 @@ public void testPutAll() { Assert.assertEquals(value3, spyCoordinatorStreamStore.get(key3)); Assert.assertEquals(value4, spyCoordinatorStreamStore.get(key4)); Assert.assertEquals(value5, spyCoordinatorStreamStore.get(key5)); - Mockito.verify(spyCoordinatorStreamStore).flush(); // verify flush called only once during putAll } @Test diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index 5a6f332683..1fddf04e38 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -497,7 +497,7 @@ public void testRunIdForStream() throws Exception { verify(coordinationUtils, Mockito.times(0)).getClusterMembership(); verify(clusterMembership, Mockito.times(0)).getNumberOfProcessors(); verify(metadataStore, Mockito.times(0)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class)); - verify(metadataStore, Mockito.times(1)).flush(); + verify(metadataStore, Mockito.times(0)).flush(); } private void prepareTestForRunId() throws Exception { From dfe49bcbd59eff0df67b15880e45b8d82fbf1ceb Mon Sep 17 00:00:00 2001 From: Alan Zhang Date: Tue, 28 Jan 2020 13:07:48 -0800 Subject: [PATCH 7/7] Remove duplicated codes and useless log Signed-off-by: Alan Zhang --- .../grouper/task/TaskPartitionAssignmentManager.java | 1 - .../coordinator/metadatastore/CoordinatorStreamStore.java | 7 ------- 2 files changed, 8 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java index 484a824b4b..9b9c71e47a 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java @@ -73,7 +73,6 @@ public TaskPartitionAssignmentManager(MetadataStore metadataStore) { public void writeTaskPartitionAssignments(Map> sspToTaskNameMapping) { for (SystemStreamPartition partition: sspToTaskNameMapping.keySet()) { List taskNames = sspToTaskNameMapping.get(partition); - LOG.info("Storing ssp: {} and task: {} into metadata store", partition, taskNames); // For broadcast streams, a input system stream partition will be mapped to more than one tasks in a // SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here // systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka. diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java index 1df4f7017c..24ce4576f6 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java @@ -136,13 +136,6 @@ public void put(String namespacedKey, byte[] value) { systemProducer.send(SOURCE, envelope); } - @Override - public void putAll(Map entries) { - for (Map.Entry entry : entries.entrySet()) { - put(entry.getKey(), entry.getValue()); - } - } - @Override public void delete(String namespacedKey) { // Since kafka doesn't support individual message deletion, store value as null for a namespacedKey to delete.