From ef1839599ca530d8f8492032ac13ab83b1667ca5 Mon Sep 17 00:00:00 2001 From: Antoine Pourchet Date: Tue, 4 Jun 2024 10:22:49 -0600 Subject: [PATCH 1/3] KAFKA-15045: (KIP-924 pt. 17) State store computation fixed Fixed the calculation of the store name list based on the subtopology being accessed. Also added a new test to make sure this new functionality works as intended. --- .../internals/InternalTopologyBuilder.java | 50 +++++++++++++++++-- .../internals/StreamsPartitionAssignor.java | 3 +- .../processor/internals/TopologyMetadata.java | 4 ++ .../InternalTopologyBuilderTest.java | 24 +++++++++ 4 files changed, 75 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index c160655578ee6..7c14a5badb3a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -16,13 +16,17 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyConfig; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.processor.StateStore; @@ -144,8 +148,12 @@ public InternalTopologyBuilder(final TopologyConfig topologyConfigs) { private String applicationId = null; + // keyed by subtopology id private Map> nodeGroups = null; + // keyed by subtopology id + private Map> subtopologyIdToStateStoreNames = null; + // The name of the topology this builder belongs to, or null if this is not a NamedTopology private final String topologyName; // TODO KAFKA-13336: we can remove this reference once we make the Topology/NamedTopology class into an interface and implement it @@ -937,14 +945,15 @@ private int putNodeGroupName(final String nodeName, * @return the full topology minus any global state */ public synchronized ProcessorTopology buildTopology() { - final Set nodeGroup = new HashSet<>(); + final Set allNodes = new HashSet<>(); for (final Set value : nodeGroups().values()) { - nodeGroup.addAll(value); + allNodes.addAll(value); } - nodeGroup.removeAll(globalNodeGroups()); + allNodes.removeAll(globalNodeGroups()); initializeSubscription(); - return build(nodeGroup); + initializeSubtopologyIdToStateStoreNamesMap(); + return build(allNodes); } /** @@ -1500,6 +1509,39 @@ private boolean isGlobalSource(final String nodeName) { return false; } + public Set stateStoreNamesForSubtopology(final int subtopologyId) { + return subtopologyIdToStateStoreNames.get(subtopologyId); + } + + private void initializeSubtopologyIdToStateStoreNamesMap() { + if (subtopologyIdToStateStoreNames != null) { + log.error("Attempted to initialize subtopologyIdToStateStores map but it was not null"); + throw new StreamsException("Double initialization of subtopologyIdToStateStores map"); + } + + final ConcurrentMap> storeNames = new ConcurrentHashMap<>(); + + for (final Map.Entry> nodeGroup : makeNodeGroups().entrySet()) { + final Set subtopologyNodes = nodeGroup.getValue(); + final boolean isNodeGroupOfGlobalStores = nodeGroupContainsGlobalSourceNode(subtopologyNodes); + + if (!isNodeGroupOfGlobalStores) { + final int subtopologyId = nodeGroup.getKey(); + final Set subtopologyStoreNames = new HashSet<>(); + + for (final String nodeName : subtopologyNodes) { + final AbstractNode node = nodeFactories.get(nodeName).describe(); + if (node instanceof Processor) { + subtopologyStoreNames.addAll(((Processor) node).stores()); + } + } + + storeNames.put(subtopologyId, subtopologyStoreNames); + } + } + subtopologyIdToStateStoreNames = storeNames; + } + public TopologyDescription describe() { final TopologyDescription description = new TopologyDescription(topologyName); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 658ba1540da48..562f912311df7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -560,8 +560,7 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe Function.identity(), taskId -> { final Set stateStoreNames = topologyMetadata - .stateStoreNameToSourceTopicsForTopology(taskId.topologyName()) - .keySet(); + .stateStoreNamesForSubtopology(taskId.topologyName(), taskId.subtopology()); final Set topicPartitions = topicPartitionsForTask.get(taskId); return new DefaultTaskInfo( taskId, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java index 150a6be9c5aad..5a0e3407eba88 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java @@ -521,6 +521,10 @@ public Map> stateStoreNameToSourceTopicsForTopology(final S return lookupBuilderForNamedTopology(topologyName).stateStoreNameToFullSourceTopicNames(); } + public Set stateStoreNamesForSubtopology(final String topologyName, final int subtopologyId) { + return lookupBuilderForNamedTopology(topologyName).stateStoreNamesForSubtopology(subtopologyId); + } + public Map> stateStoreNameToSourceTopics() { final Map> stateStoreNameToSourceTopics = new HashMap<>(); applyToEachBuilder(b -> stateStoreNameToSourceTopics.putAll(b.stateStoreNameToFullSourceTopicNames())); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index bbb4625d56440..1a0166523710a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -71,6 +71,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -568,6 +569,29 @@ public void testAddStateStore() { assertEquals(storeBuilder.name(), suppliers.get(0).name()); } + @Test + public void testStateStoreNamesForSubtopology() { + builder.addStateStore(storeBuilder); + builder.setApplicationId("X"); + + builder.addSource(null, "source-1", null, null, null, "topic-1"); + builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1"); + builder.connectProcessorAndStateStores("processor-1", storeBuilder.name()); + + builder.addSource(null, "source-2", null, null, null, "topic-2"); + builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), "source-2"); + + builder.buildTopology(); + final Set stateStoreNames = builder.stateStoreNamesForSubtopology(0); + assertThat(stateStoreNames, equalTo(mkSet(storeBuilder.name()))); + + final Set emptyStoreNames = builder.stateStoreNamesForSubtopology(1); + assertThat(emptyStoreNames, equalTo(mkSet())); + + final Set stateStoreNamesUnknownSubtopology = builder.stateStoreNamesForSubtopology(13); + assertThat(stateStoreNamesUnknownSubtopology, nullValue()); + } + @Test public void shouldAllowAddingSameStoreBuilderMultipleTimes() { builder.setApplicationId("X"); From 451c86132bd6a03f787acb1b279982c529e18526 Mon Sep 17 00:00:00 2001 From: Antoine Pourchet Date: Tue, 4 Jun 2024 14:09:55 -0600 Subject: [PATCH 2/3] build fix --- checkstyle/suppressions.xml | 2 +- .../streams/processor/internals/InternalTopologyBuilder.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index fc6995dadfe7e..46047a6c0ff76 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -200,7 +200,7 @@ files="StreamThread.java"/> + files="(InternalTopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl).java"/> diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 7c14a5badb3a2..fe95aefdacdd1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -19,7 +19,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; From dccda66ea525e4aaa7a2c1b818e2e2698093994b Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Tue, 4 Jun 2024 22:15:33 -0700 Subject: [PATCH 3/3] Remove double-initialization check.java --- .../processor/internals/InternalTopologyBuilder.java | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index fe95aefdacdd1..b387760043181 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Deserializer; @@ -25,7 +23,6 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyConfig; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.processor.StateStore; @@ -1513,12 +1510,7 @@ public Set stateStoreNamesForSubtopology(final int subtopologyId) { } private void initializeSubtopologyIdToStateStoreNamesMap() { - if (subtopologyIdToStateStoreNames != null) { - log.error("Attempted to initialize subtopologyIdToStateStores map but it was not null"); - throw new StreamsException("Double initialization of subtopologyIdToStateStores map"); - } - - final ConcurrentMap> storeNames = new ConcurrentHashMap<>(); + final Map> storeNames = new HashMap<>(); for (final Map.Entry> nodeGroup : makeNodeGroups().entrySet()) { final Set subtopologyNodes = nodeGroup.getValue();