Skip to content

Commit

Permalink
KAFKA-14748: assert FK-JOIN-SUBSCRIPTION-STATE change / cleanup upon …
Browse files Browse the repository at this point in the history
…changing foreign key
  • Loading branch information
florin-akermann committed Dec 2, 2023
1 parent 76708d2 commit 2990988
Showing 1 changed file with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.integration;

import java.time.Duration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
Expand All @@ -34,18 +33,22 @@
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.Assertions;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -684,7 +687,11 @@ public void shouldEmitRecordWhenOldAndNewFkDiffer() {
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
final KeyValueStore<String, String> store = driver.getKeyValueStore("store");

final String subscriptionStoreName = driver.getAllStateStores().entrySet().stream()
.filter(e -> e.getKey().contains("SUBSCRIPTION-STATE-STORE"))
.findAny().orElseThrow(() -> new RuntimeException("couldn't find store")).getKey();
final KeyValueStore<Bytes, ValueAndTimestamp<String>> subscriptionStore = driver.getKeyValueStore(subscriptionStoreName);
final Bytes key = subscriptionStoreKey("lhs1", "rhs1");
left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
{
final Map<String, String> expected = mkMap(
Expand All @@ -694,6 +701,7 @@ public void shouldEmitRecordWhenOldAndNewFkDiffer() {
if (materialized) {
assertThat(asMap(store), is(expected));
}
Assertions.assertNotNull(subscriptionStore.get(key));
}
left.pipeInput("lhs1", "lhsValue1|returnNull", baseTimestamp);
{
Expand All @@ -704,10 +712,22 @@ public void shouldEmitRecordWhenOldAndNewFkDiffer() {
if (materialized) {
assertThat(asMap(store), is(expected));
}
Assertions.assertNull(subscriptionStore.get(key));
}
}
}

private static Bytes subscriptionStoreKey(final String lhs, final String rhs) {
final byte[] lhs1bytes = lhs.getBytes();
final byte[] rhs1bytes = rhs.getBytes();
final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + lhs1bytes.length + rhs1bytes.length);
buf.putInt(rhs1bytes.length);
buf.put(rhs1bytes);
buf.put(lhs1bytes);
final Bytes key = Bytes.wrap(buf.array());
return key;
}

protected static Map<String, String> asMap(final KeyValueStore<String, String> store) {
final HashMap<String, String> result = new HashMap<>();
store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));
Expand Down

0 comments on commit 2990988

Please sign in to comment.