Skip to content

Commit

Permalink
KAFKA-15045: (KIP-924 pt. 17) State store computation fixed (apache#1…
Browse files Browse the repository at this point in the history
…6194)

Fixed the calculation of the store name list based on the subtopology being accessed.

Also added a new test to make sure this new functionality works as intended.

Reviewers: Anna Sophie Blee-Goldman <[email protected]>
  • Loading branch information
apourchet authored and gongxuanzhang committed Jun 12, 2024
1 parent 2ca14b0 commit bd05222
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 7 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@
files="StreamThread.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>
files="(InternalTopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl).java"/>

<suppress checks="CyclomaticComplexity"
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup|SubscriptionWrapperSerde|AssignorConfiguration).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,12 @@ public InternalTopologyBuilder(final TopologyConfig topologyConfigs) {

private String applicationId = null;

// keyed by subtopology id
private Map<Integer, Set<String>> nodeGroups = null;

// keyed by subtopology id
private Map<Integer, Set<String>> subtopologyIdToStateStoreNames = null;

// The name of the topology this builder belongs to, or null if this is not a NamedTopology
private final String topologyName;
// TODO KAFKA-13336: we can remove this reference once we make the Topology/NamedTopology class into an interface and implement it
Expand Down Expand Up @@ -937,14 +941,15 @@ private int putNodeGroupName(final String nodeName,
* @return the full topology minus any global state
*/
public synchronized ProcessorTopology buildTopology() {
final Set<String> nodeGroup = new HashSet<>();
final Set<String> allNodes = new HashSet<>();
for (final Set<String> value : nodeGroups().values()) {
nodeGroup.addAll(value);
allNodes.addAll(value);
}
nodeGroup.removeAll(globalNodeGroups());
allNodes.removeAll(globalNodeGroups());

initializeSubscription();
return build(nodeGroup);
initializeSubtopologyIdToStateStoreNamesMap();
return build(allNodes);
}

/**
Expand Down Expand Up @@ -1500,6 +1505,34 @@ private boolean isGlobalSource(final String nodeName) {
return false;
}

public Set<String> stateStoreNamesForSubtopology(final int subtopologyId) {
return subtopologyIdToStateStoreNames.get(subtopologyId);
}

private void initializeSubtopologyIdToStateStoreNamesMap() {
final Map<Integer, Set<String>> storeNames = new HashMap<>();

for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {
final Set<String> subtopologyNodes = nodeGroup.getValue();
final boolean isNodeGroupOfGlobalStores = nodeGroupContainsGlobalSourceNode(subtopologyNodes);

if (!isNodeGroupOfGlobalStores) {
final int subtopologyId = nodeGroup.getKey();
final Set<String> subtopologyStoreNames = new HashSet<>();

for (final String nodeName : subtopologyNodes) {
final AbstractNode node = nodeFactories.get(nodeName).describe();
if (node instanceof Processor) {
subtopologyStoreNames.addAll(((Processor) node).stores());
}
}

storeNames.put(subtopologyId, subtopologyStoreNames);
}
}
subtopologyIdToStateStoreNames = storeNames;
}

public TopologyDescription describe() {
final TopologyDescription description = new TopologyDescription(topologyName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,7 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe
Function.identity(),
taskId -> {
final Set<String> stateStoreNames = topologyMetadata
.stateStoreNameToSourceTopicsForTopology(taskId.topologyName())
.keySet();
.stateStoreNamesForSubtopology(taskId.topologyName(), taskId.subtopology());
final Set<TaskTopicPartition> topicPartitions = topicPartitionsForTask.get(taskId);
return new DefaultTaskInfo(
taskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,10 @@ public Map<String, List<String>> stateStoreNameToSourceTopicsForTopology(final S
return lookupBuilderForNamedTopology(topologyName).stateStoreNameToFullSourceTopicNames();
}

public Set<String> stateStoreNamesForSubtopology(final String topologyName, final int subtopologyId) {
return lookupBuilderForNamedTopology(topologyName).stateStoreNamesForSubtopology(subtopologyId);
}

public Map<String, List<String>> stateStoreNameToSourceTopics() {
final Map<String, List<String>> stateStoreNameToSourceTopics = new HashMap<>();
applyToEachBuilder(b -> stateStoreNameToSourceTopics.putAll(b.stateStoreNameToFullSourceTopicNames()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -568,6 +569,29 @@ public void testAddStateStore() {
assertEquals(storeBuilder.name(), suppliers.get(0).name());
}

@Test
public void testStateStoreNamesForSubtopology() {
builder.addStateStore(storeBuilder);
builder.setApplicationId("X");

builder.addSource(null, "source-1", null, null, null, "topic-1");
builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1");
builder.connectProcessorAndStateStores("processor-1", storeBuilder.name());

builder.addSource(null, "source-2", null, null, null, "topic-2");
builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), "source-2");

builder.buildTopology();
final Set<String> stateStoreNames = builder.stateStoreNamesForSubtopology(0);
assertThat(stateStoreNames, equalTo(mkSet(storeBuilder.name())));

final Set<String> emptyStoreNames = builder.stateStoreNamesForSubtopology(1);
assertThat(emptyStoreNames, equalTo(mkSet()));

final Set<String> stateStoreNamesUnknownSubtopology = builder.stateStoreNamesForSubtopology(13);
assertThat(stateStoreNamesUnknownSubtopology, nullValue());
}

@Test
public void shouldAllowAddingSameStoreBuilderMultipleTimes() {
builder.setApplicationId("X");
Expand Down

0 comments on commit bd05222

Please sign in to comment.