Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15045: (KIP-924 pt. 17) State store computation fixed #16194

Merged
merged 3 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@
files="StreamThread.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>
files="(InternalTopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl).java"/>

<suppress checks="CyclomaticComplexity"
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|PartitionGroup|SubscriptionWrapperSerde|AssignorConfiguration).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,12 @@ public InternalTopologyBuilder(final TopologyConfig topologyConfigs) {

private String applicationId = null;

// keyed by subtopology id
private Map<Integer, Set<String>> nodeGroups = null;

// keyed by subtopology id
private Map<Integer, Set<String>> subtopologyIdToStateStoreNames = null;

// The name of the topology this builder belongs to, or null if this is not a NamedTopology
private final String topologyName;
// TODO KAFKA-13336: we can remove this reference once we make the Topology/NamedTopology class into an interface and implement it
Expand Down Expand Up @@ -937,14 +941,15 @@ private int putNodeGroupName(final String nodeName,
* @return the full topology minus any global state
*/
public synchronized ProcessorTopology buildTopology() {
final Set<String> nodeGroup = new HashSet<>();
final Set<String> allNodes = new HashSet<>();
for (final Set<String> value : nodeGroups().values()) {
nodeGroup.addAll(value);
allNodes.addAll(value);
}
nodeGroup.removeAll(globalNodeGroups());
allNodes.removeAll(globalNodeGroups());

initializeSubscription();
return build(nodeGroup);
initializeSubtopologyIdToStateStoreNamesMap();
return build(allNodes);
}

/**
Expand Down Expand Up @@ -1500,6 +1505,34 @@ private boolean isGlobalSource(final String nodeName) {
return false;
}

public Set<String> stateStoreNamesForSubtopology(final int subtopologyId) {
return subtopologyIdToStateStoreNames.get(subtopologyId);
}

private void initializeSubtopologyIdToStateStoreNamesMap() {
final Map<Integer, Set<String>> storeNames = new HashMap<>();

for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet()) {
final Set<String> subtopologyNodes = nodeGroup.getValue();
final boolean isNodeGroupOfGlobalStores = nodeGroupContainsGlobalSourceNode(subtopologyNodes);

if (!isNodeGroupOfGlobalStores) {
final int subtopologyId = nodeGroup.getKey();
final Set<String> subtopologyStoreNames = new HashSet<>();

for (final String nodeName : subtopologyNodes) {
final AbstractNode node = nodeFactories.get(nodeName).describe();
if (node instanceof Processor) {
subtopologyStoreNames.addAll(((Processor) node).stores());
}
}

storeNames.put(subtopologyId, subtopologyStoreNames);
}
}
subtopologyIdToStateStoreNames = storeNames;
}

public TopologyDescription describe() {
final TopologyDescription description = new TopologyDescription(topologyName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,7 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe
Function.identity(),
taskId -> {
final Set<String> stateStoreNames = topologyMetadata
.stateStoreNameToSourceTopicsForTopology(taskId.topologyName())
.keySet();
.stateStoreNamesForSubtopology(taskId.topologyName(), taskId.subtopology());
final Set<TaskTopicPartition> topicPartitions = topicPartitionsForTask.get(taskId);
return new DefaultTaskInfo(
taskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,10 @@ public Map<String, List<String>> stateStoreNameToSourceTopicsForTopology(final S
return lookupBuilderForNamedTopology(topologyName).stateStoreNameToFullSourceTopicNames();
}

public Set<String> stateStoreNamesForSubtopology(final String topologyName, final int subtopologyId) {
return lookupBuilderForNamedTopology(topologyName).stateStoreNamesForSubtopology(subtopologyId);
}

public Map<String, List<String>> stateStoreNameToSourceTopics() {
final Map<String, List<String>> stateStoreNameToSourceTopics = new HashMap<>();
applyToEachBuilder(b -> stateStoreNameToSourceTopics.putAll(b.stateStoreNameToFullSourceTopicNames()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -568,6 +569,29 @@ public void testAddStateStore() {
assertEquals(storeBuilder.name(), suppliers.get(0).name());
}

@Test
public void testStateStoreNamesForSubtopology() {
builder.addStateStore(storeBuilder);
builder.setApplicationId("X");

builder.addSource(null, "source-1", null, null, null, "topic-1");
builder.addProcessor("processor-1", new MockApiProcessorSupplier<>(), "source-1");
builder.connectProcessorAndStateStores("processor-1", storeBuilder.name());

builder.addSource(null, "source-2", null, null, null, "topic-2");
builder.addProcessor("processor-2", new MockApiProcessorSupplier<>(), "source-2");

builder.buildTopology();
final Set<String> stateStoreNames = builder.stateStoreNamesForSubtopology(0);
assertThat(stateStoreNames, equalTo(mkSet(storeBuilder.name())));

final Set<String> emptyStoreNames = builder.stateStoreNamesForSubtopology(1);
assertThat(emptyStoreNames, equalTo(mkSet()));

final Set<String> stateStoreNamesUnknownSubtopology = builder.stateStoreNamesForSubtopology(13);
assertThat(stateStoreNamesUnknownSubtopology, nullValue());
}

@Test
public void shouldAllowAddingSameStoreBuilderMultipleTimes() {
builder.setApplicationId("X");
Expand Down