diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index fc6995dadfe7e..46047a6c0ff76 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -200,7 +200,7 @@
files="StreamThread.java"/>
+ files="(InternalTopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl).java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index c160655578ee6..b387760043181 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -144,8 +144,12 @@ public InternalTopologyBuilder(final TopologyConfig topologyConfigs) {
private String applicationId = null;
+ // keyed by subtopology id
private Map> nodeGroups = null;
+ // keyed by subtopology id
+ private Map> subtopologyIdToStateStoreNames = null;
+
// The name of the topology this builder belongs to, or null if this is not a NamedTopology
private final String topologyName;
// TODO KAFKA-13336: we can remove this reference once we make the Topology/NamedTopology class into an interface and implement it
@@ -937,14 +941,15 @@ private int putNodeGroupName(final String nodeName,
* @return the full topology minus any global state
*/
public synchronized ProcessorTopology buildTopology() {
- final Set nodeGroup = new HashSet<>();
+ final Set allNodes = new HashSet<>();
for (final Set value : nodeGroups().values()) {
- nodeGroup.addAll(value);
+ allNodes.addAll(value);
}
- nodeGroup.removeAll(globalNodeGroups());
+ allNodes.removeAll(globalNodeGroups());
initializeSubscription();
- return build(nodeGroup);
+ initializeSubtopologyIdToStateStoreNamesMap();
+ return build(allNodes);
}
/**
@@ -1500,6 +1505,34 @@ private boolean isGlobalSource(final String nodeName) {
return false;
}
+ public Set stateStoreNamesForSubtopology(final int subtopologyId) {
+ return subtopologyIdToStateStoreNames.get(subtopologyId);
+ }
+
+ private void initializeSubtopologyIdToStateStoreNamesMap() {
+ final Map> storeNames = new HashMap<>();
+
+ for (final Map.Entry> nodeGroup : makeNodeGroups().entrySet()) {
+ final Set subtopologyNodes = nodeGroup.getValue();
+ final boolean isNodeGroupOfGlobalStores = nodeGroupContainsGlobalSourceNode(subtopologyNodes);
+
+ if (!isNodeGroupOfGlobalStores) {
+ final int subtopologyId = nodeGroup.getKey();
+ final Set subtopologyStoreNames = new HashSet<>();
+
+ for (final String nodeName : subtopologyNodes) {
+ final AbstractNode node = nodeFactories.get(nodeName).describe();
+ if (node instanceof Processor) {
+ subtopologyStoreNames.addAll(((Processor) node).stores());
+ }
+ }
+
+ storeNames.put(subtopologyId, subtopologyStoreNames);
+ }
+ }
+ subtopologyIdToStateStoreNames = storeNames;
+ }
+
public TopologyDescription describe() {
final TopologyDescription description = new TopologyDescription(topologyName);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 658ba1540da48..562f912311df7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -560,8 +560,7 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe
Function.identity(),
taskId -> {
final Set stateStoreNames = topologyMetadata
- .stateStoreNameToSourceTopicsForTopology(taskId.topologyName())
- .keySet();
+ .stateStoreNamesForSubtopology(taskId.topologyName(), taskId.subtopology());
final Set topicPartitions = topicPartitionsForTask.get(taskId);
return new DefaultTaskInfo(
taskId,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index 150a6be9c5aad..5a0e3407eba88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -521,6 +521,10 @@ public Map> stateStoreNameToSourceTopicsForTopology(final S
return lookupBuilderForNamedTopology(topologyName).stateStoreNameToFullSourceTopicNames();
}
+ public Set stateStoreNamesForSubtopology(final String topologyName, final int subtopologyId) {
+ return lookupBuilderForNamedTopology(topologyName).stateStoreNamesForSubtopology(subtopologyId);
+ }
+
public Map> stateStoreNameToSourceTopics() {
final Map> stateStoreNameToSourceTopics = new HashMap<>();
applyToEachBuilder(b -> stateStoreNameToSourceTopics.putAll(b.stateStoreNameToFullSourceTopicNames()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index bbb4625d56440..1a0166523710a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -71,6 +71,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -568,6 +569,29 @@ public void testAddStateStore() {
assertEquals(storeBuilder.name(), suppliers.get(0).name());
}
+ @Test
+ public void testStateStoreNamesForSubtopology() {
+ builder.addStateStore(storeBuilder);
+ builder.setApplicationId("X");
+
+ builder.addSource(null, "source-1", null, null, null, "topic-1");
+ builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1");
+ builder.connectProcessorAndStateStores("processor-1", storeBuilder.name());
+
+ builder.addSource(null, "source-2", null, null, null, "topic-2");
+ builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), "source-2");
+
+ builder.buildTopology();
+ final Set stateStoreNames = builder.stateStoreNamesForSubtopology(0);
+ assertThat(stateStoreNames, equalTo(mkSet(storeBuilder.name())));
+
+ final Set emptyStoreNames = builder.stateStoreNamesForSubtopology(1);
+ assertThat(emptyStoreNames, equalTo(mkSet()));
+
+ final Set stateStoreNamesUnknownSubtopology = builder.stateStoreNamesForSubtopology(13);
+ assertThat(stateStoreNamesUnknownSubtopology, nullValue());
+ }
+
@Test
public void shouldAllowAddingSameStoreBuilderMultipleTimes() {
builder.setApplicationId("X");