From 6165f532e3491ff2206dd57078aa616892a8580e Mon Sep 17 00:00:00 2001 From: Florin Akermann Date: Wed, 6 Dec 2023 01:04:32 +0100 Subject: [PATCH] Kafka-14748: Relax non-null FK left-join requirement (#14107) Relax non-null FK left-join requirement. Testing Strategy: Inject extractor which returns null on first or second element. Reviewers: Walker Carlson --- docs/streams/developer-guide/dsl-api.html | 4 - docs/streams/upgrade-guide.html | 31 ++--- .../apache/kafka/streams/kstream/KTable.java | 21 ++- .../internals/foreignkeyjoin/CombinedKey.java | 1 - .../SubscriptionJoinProcessorSupplier.java | 5 +- .../SubscriptionReceiveProcessorSupplier.java | 56 ++++---- .../SubscriptionSendProcessorSupplier.java | 122 +++++++++--------- ...leKTableForeignKeyJoinIntegrationTest.java | 103 ++++++++++++++- 8 files changed, 224 insertions(+), 119 deletions(-) diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html index 88f7d6f4ae208..cc729ee25c26a 100644 --- a/docs/streams/developer-guide/dsl-api.html +++ b/docs/streams/developer-guide/dsl-api.html @@ -2542,10 +2542,6 @@
KTable-KTable Foreign-Key
    -
  • - Records for which the foreignKeyExtractor produces null are ignored and do not trigger a join. - If you want to join with null foreign keys, use a suitable sentinel value to do so (i.e. "NULL" for a String field, or -1 for an auto-incrementing integer field). -
  • Input records with a null value are interpreted as tombstones diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 1f122b6e3660e..4f905a98d1db8 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -136,21 +136,7 @@

    <

    Streams API changes in 3.7.0

    IQv2 supports RangeQuery that allows to specify unbounded, bounded, or half-open key-ranges, which return data in ascending (byte[]-lexicographical) order (per partition). - KIP-985 extends this functionality by adding .withDescendingKeys() to allow user to receive data in descending order. -

    - -

    Streams API changes in 3.6.0

    -

    - Rack aware task assignment was introduced in KIP-925. - Rack aware task assignment can be enabled for StickyTaskAssignor or HighAvailabilityTaskAssignor to compute task assignments which can minimize cross rack traffic under certain conditions. - For more information, including how it can be enabled and further configured, see the Kafka Streams Developer Guide. -

    - -

    - IQv2 supports a RangeQuery that allows to specify unbounded, bounded, or half-open key-ranges. Users have to use withUpperBound(K), withLowerBound(K), - or withNoBounds() to specify half-open or unbounded ranges, but cannot use withRange(K lower, K upper) for the same. - KIP-941 closes this gap by allowing to pass in null - as upper and lower bound (with semantics "no bound") to simplify the usage of the RangeQuery class. + KIP-985 extends this functionality by adding .withDescendingKeys() to allow user to receive data in descending order.

    @@ -198,6 +184,21 @@

    Streams API

    + +

    Streams API changes in 3.6.0

    +

    + Rack aware task assignment was introduced in KIP-925. + Rack aware task assignment can be enabled for StickyTaskAssignor or HighAvailabilityTaskAssignor to compute task assignments which can minimize cross rack traffic under certain conditions. + For more information, including how it can be enabled and further configured, see the Kafka Streams Developer Guide. +

    + +

    + IQv2 supports a RangeQuery that allows to specify unbounded, bounded, or half-open key-ranges. Users have to use withUpperBound(K), withLowerBound(K), + or withNoBounds() to specify half-open or unbounded ranges, but cannot use withRange(K lower, K upper) for the same. + KIP-941 closes this gap by allowing to pass in null + as upper and lower bound (with semantics "no bound") to simplify the usage of the RangeQuery class. +

    +

    Streams API changes in 3.5.0

    A new state store type, versioned key-value stores, was introduced in diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index e530769be4f64..0222da703400e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -2237,7 +2237,7 @@ KTable join(final KTable other, * * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the - * result is null, the update is ignored as invalid. + * extract is null, then the right hand side of the result will be null. * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param the value type of the result {@code KTable} * @param the key type of the other {@code KTable} @@ -2254,8 +2254,8 @@ KTable leftJoin(final KTable other, * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}. * * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. - * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the - * result is null, the update is ignored as invalid. + * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the + * extract is null, then the right hand side of the result will be null. * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param named a {@link Named} config used to name the processor in the topology * @param the value type of the result {@code KTable} @@ -2279,9 +2279,8 @@ KTable leftJoin(final KTable other, *

    * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}. * - * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. - * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the - * result is null, the update is ignored as invalid. + * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the + * extract is null, then the right hand side of the result will be null. * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores * @param the value type of the result {@code KTable} @@ -2301,7 +2300,7 @@ KTable leftJoin(final KTable other, * * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the - * result is null, the update is ignored as invalid. + * extract is null, then the right hand side of the result will be null. * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} * should be materialized. Cannot be {@code null} @@ -2321,8 +2320,8 @@ KTable leftJoin(final KTable other, * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}. * * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. - * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the - * result is null, the update is ignored as invalid. + * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the + * extract is null, then the right hand side of the result will be null. * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param named a {@link Named} config used to name the processor in the topology * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} @@ -2350,8 +2349,8 @@ KTable leftJoin(final KTable other, * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}. * * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. - * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the - * result is null, the update is ignored as invalid. + * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the + * extract is null, then the right hand side of the result will be null. * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param tableJoined a {@link TableJoined} used to configure partitioners and names of internal topics and stores * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java index ae5f20fe53300..01e94da345a43 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java @@ -24,7 +24,6 @@ public class CombinedKey { private final KP primaryKey; CombinedKey(final KF foreignKey, final KP primaryKey) { - Objects.requireNonNull(foreignKey, "foreignKey can't be null"); Objects.requireNonNull(primaryKey, "primaryKey can't be null"); this.foreignKey = foreignKey; this.primaryKey = primaryKey; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java index 7d31ef4422345..a8677ce2958ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java @@ -77,7 +77,10 @@ public void process(final Record, Change foreignValueAndTime = foreignValues.get(record.key().getForeignKey()); + final ValueAndTimestamp foreignValueAndTime = + record.key().getForeignKey() == null ? + null : + foreignValues.get(record.key().getForeignKey()); final long resultTimestamp = foreignValueAndTime == null ? diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java index cf88aec6f9a09..90d70bdf3308a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java @@ -75,20 +75,8 @@ public void init(final ProcessorContext, Change> record) { - if (record.key() == null) { - if (context().recordMetadata().isPresent()) { - final RecordMetadata recordMetadata = context().recordMetadata().get(); - LOG.warn( - "Skipping record due to null foreign key. " - + "topic=[{}] partition=[{}] offset=[{}]", - recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() - ); - } else { - LOG.warn( - "Skipping record due to null foreign key. Topic, partition, and offset not known." - ); - } - droppedRecordsSensor.record(); + if (record.key() == null && !SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().getInstruction())) { + dropRecord(); return; } if (record.value().getVersion() > SubscriptionWrapper.CURRENT_VERSION) { @@ -97,7 +85,22 @@ public void process(final Record> record) { //from older SubscriptionWrapper versions to newer versions. throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version."); } + context().forward( + record.withKey(new CombinedKey<>(record.key(), record.value().getPrimaryKey())) + .withValue(inferChange(record)) + .withTimestamp(record.timestamp()) + ); + } + private Change>> inferChange(final Record> record) { + if (record.key() == null) { + return new Change<>(ValueAndTimestamp.make(record.value(), record.timestamp()), null); + } else { + return inferBasedOnState(record); + } + } + + private Change>> inferBasedOnState(final Record> record) { final Bytes subscriptionKey = keySchema.toBytes(record.key(), record.value().getPrimaryKey()); final ValueAndTimestamp> newValue = ValueAndTimestamp.make(record.value(), record.timestamp()); @@ -110,14 +113,23 @@ public void process(final Record> record) { } else { store.put(subscriptionKey, newValue); } - final Change>> change = new Change<>(newValue, oldValue); - // note: key is non-nullable - // note: newValue is non-nullable - context().forward( - record.withKey(new CombinedKey<>(record.key(), record.value().getPrimaryKey())) - .withValue(change) - .withTimestamp(newValue.timestamp()) - ); + return new Change<>(newValue, oldValue); + } + + private void dropRecord() { + if (context().recordMetadata().isPresent()) { + final RecordMetadata recordMetadata = context().recordMetadata().get(); + LOG.warn( + "Skipping record due to null foreign key. " + + "topic=[{}] partition=[{}] offset=[{}]", + recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() + ); + } else { + LOG.warn( + "Skipping record due to null foreign key. Topic, partition, and offset not known." + ); + } + droppedRecordsSensor.record(); } }; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java index 8a6298c28de45..09bd35339abed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java @@ -37,6 +37,7 @@ import java.util.function.Function; import java.util.function.Supplier; +import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction; import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE; import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE; import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE; @@ -86,6 +87,7 @@ private class UnbindChangeProcessor extends ContextualProcessor, KO private Sensor droppedRecordsSensor; private String foreignKeySerdeTopic; private String valueSerdeTopic; + private long[] recordHash; @SuppressWarnings("unchecked") @Override @@ -109,108 +111,102 @@ public void init(final ProcessorContext> context) { @Override public void process(final Record> record) { + // clear cashed hash from previous record + recordHash = null; // drop out-of-order records from versioned tables (cf. KIP-914) if (useVersionedSemantics && !record.value().isLatest) { LOG.info("Skipping out-of-order record from versioned table while performing table-table join."); droppedRecordsSensor.record(); return; } + if (leftJoin) { + leftJoinInstructions(record); + } else { + defaultJoinInstructions(record); + } + } - final long[] currentHash = record.value().newValue == null ? - null : - Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue)); - - final int partition = context().recordMetadata().get().partition(); + private void leftJoinInstructions(final Record> record) { if (record.value().oldValue != null) { final KO oldForeignKey = foreignKeyExtractor.apply(record.value().oldValue); + final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); + if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { + forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); + } + forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); + } else if (record.value().newValue != null) { + final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue); + forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); + } + } + + private void defaultJoinInstructions(final Record> record) { + if (record.value().oldValue != null) { + final KO oldForeignKey = record.value().oldValue == null ? null : foreignKeyExtractor.apply(record.value().oldValue); if (oldForeignKey == null) { logSkippedRecordDueToNullForeignKey(); return; } if (record.value().newValue != null) { - final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue); + final KO newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.apply(record.value().newValue); if (newForeignKey == null) { logSkippedRecordDueToNullForeignKey(); return; } - - final byte[] serialOldForeignKey = - foreignKeySerializer.serialize(foreignKeySerdeTopic, oldForeignKey); - final byte[] serialNewForeignKey = - foreignKeySerializer.serialize(foreignKeySerdeTopic, newForeignKey); - if (!Arrays.equals(serialNewForeignKey, serialOldForeignKey)) { + if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { //Different Foreign Key - delete the old key value and propagate the new one. //Delete it from the oldKey's state store - context().forward( - record.withKey(oldForeignKey) - .withValue(new SubscriptionWrapper<>( - currentHash, - DELETE_KEY_NO_PROPAGATE, - record.key(), - partition - ))); - //Add to the newKey's state store. Additionally, propagate null if no FK is found there, - //since we must "unset" any output set by the previous FK-join. This is true for both INNER - //and LEFT join. + forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE); } - context().forward( - record.withKey(newForeignKey) - .withValue(new SubscriptionWrapper<>( - currentHash, - PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, - record.key(), - partition - ))); + //Add to the newKey's state store. Additionally, propagate null if no FK is found there, + //since we must "unset" any output set by the previous FK-join. This is true for both INNER + //and LEFT join. + forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); } else { - //A simple propagatable delete. Delete from the state store and propagate the delete onwards. - context().forward( - record.withKey(oldForeignKey) - .withValue(new SubscriptionWrapper<>( - currentHash, - DELETE_KEY_AND_PROPAGATE, - record.key(), - partition - ))); + forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); } } else if (record.value().newValue != null) { - //change.oldValue is null, which means it was deleted at least once before, or it is brand new. - //In either case, we only need to propagate if the FK_VAL is available, as the null from the delete would - //have been propagated otherwise. - - final SubscriptionWrapper.Instruction instruction; - if (leftJoin) { - //Want to send info even if RHS is null. - instruction = PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE; - } else { - instruction = PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE; - } final KO newForeignKey = foreignKeyExtractor.apply(record.value().newValue); if (newForeignKey == null) { logSkippedRecordDueToNullForeignKey(); } else { - context().forward( - record.withKey(newForeignKey) - .withValue(new SubscriptionWrapper<>( - currentHash, - instruction, - record.key(), - partition))); + forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE); } } } + private byte[] serialize(final KO key) { + return foreignKeySerializer.serialize(foreignKeySerdeTopic, key); + } + + private void forward(final Record> record, final KO foreignKey, final Instruction deleteKeyNoPropagate) { + final SubscriptionWrapper wrapper = new SubscriptionWrapper<>( + hash(record), + deleteKeyNoPropagate, + record.key(), + context().recordMetadata().get().partition() + ); + context().forward(record.withKey(foreignKey).withValue(wrapper)); + } + + private long[] hash(final Record> record) { + if (recordHash == null) { + recordHash = record.value().newValue == null + ? null + : Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, record.value().newValue)); + } + return recordHash; + } + private void logSkippedRecordDueToNullForeignKey() { if (context().recordMetadata().isPresent()) { final RecordMetadata recordMetadata = context().recordMetadata().get(); LOG.warn( - "Skipping record due to null foreign key. " - + "topic=[{}] partition=[{}] offset=[{}]", + "Skipping record due to null foreign key. topic=[{}] partition=[{}] offset=[{}]", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() ); } else { - LOG.warn( - "Skipping record due to null foreign key. Topic, partition, and offset not known." - ); + LOG.warn("Skipping record due to null foreign key. Topic, partition, and offset not known."); } droppedRecordsSensor.record(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java index b76b5ddc0dbf5..2bb7b6ea1909c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.integration; -import java.time.Duration; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -34,6 +33,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.utils.UniqueTopicSerdeScope; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; @@ -41,11 +41,14 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.Assertions; import org.junit.rules.TestName; import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.nio.ByteBuffer; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -646,6 +649,85 @@ public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated() { } } + @Test + public void shouldEmitRecordOnNullForeignKeyForLeftJoins() { + final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, true, rejoin, leftVersioned, rightVersioned, value -> null); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { + final TestInputTopic left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); + final TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); + final KeyValueStore store = driver.getKeyValueStore("store"); + + left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp); + { + final Map expected = mkMap( + mkEntry("lhs1", "(lhsValue1|rhs1,null)") + ); + assertThat(outputTopic.readKeyValuesToMap(), is(expected)); + if (materialized) { + assertThat(asMap(store), is(expected)); + } + } + } + } + + @Test + public void shouldEmitRecordWhenOldAndNewFkDiffer() { + final Function foreignKeyExtractor = value -> { + final String split = value.split("\\|")[1]; + if (split.equals("returnNull")) { + //new fk + return null; + } else { + //old fk + return split; + } + }; + final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, true, rejoin, leftVersioned, rightVersioned, foreignKeyExtractor); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { + final TestInputTopic left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); + final TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); + final KeyValueStore store = driver.getKeyValueStore("store"); + final String subscriptionStoreName = driver.getAllStateStores().entrySet().stream() + .filter(e -> e.getKey().contains("SUBSCRIPTION-STATE-STORE")) + .findAny().orElseThrow(() -> new RuntimeException("couldn't find store")).getKey(); + final KeyValueStore> subscriptionStore = driver.getKeyValueStore(subscriptionStoreName); + final Bytes key = subscriptionStoreKey("lhs1", "rhs1"); + left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp); + { + final Map expected = mkMap( + mkEntry("lhs1", "(lhsValue1|rhs1,null)") + ); + assertThat(outputTopic.readKeyValuesToMap(), is(expected)); + if (materialized) { + assertThat(asMap(store), is(expected)); + } + Assertions.assertNotNull(subscriptionStore.get(key)); + } + left.pipeInput("lhs1", "lhsValue1|returnNull", baseTimestamp); + { + final Map expected = mkMap( + mkEntry("lhs1", "(lhsValue1|returnNull,null)") + ); + assertThat(outputTopic.readKeyValuesToMap(), is(expected)); + if (materialized) { + assertThat(asMap(store), is(expected)); + } + Assertions.assertNull(subscriptionStore.get(key)); + } + } + } + + private static Bytes subscriptionStoreKey(final String lhs, final String rhs) { + final byte[] lhs1bytes = lhs.getBytes(); + final byte[] rhs1bytes = rhs.getBytes(); + final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + lhs1bytes.length + rhs1bytes.length); + buf.putInt(rhs1bytes.length); + buf.put(rhs1bytes); + buf.put(lhs1bytes); + final Bytes key = Bytes.wrap(buf.array()); + return key; + } + protected static Map asMap(final KeyValueStore store) { final HashMap result = new HashMap<>(); store.all().forEachRemaining(kv -> result.put(kv.key, kv.value)); @@ -658,6 +740,24 @@ protected static Topology getTopology(final Properties streamsConfig, final boolean rejoin, final boolean leftVersioned, final boolean rightVersioned) { + return getTopology( + streamsConfig, + queryableStoreName, + leftJoin, + rejoin, + leftVersioned, + rightVersioned, + value -> value.split("\\|")[1] + ); + } + + protected static Topology getTopology(final Properties streamsConfig, + final String queryableStoreName, + final boolean leftJoin, + final boolean rejoin, + final boolean leftVersioned, + final boolean rightVersioned, + final Function extractor) { final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope(); final StreamsBuilder builder = new StreamsBuilder(); @@ -693,7 +793,6 @@ protected static Topology getTopology(final Properties streamsConfig, ); } - final Function extractor = value -> value.split("\\|")[1]; final ValueJoiner joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")"; final ValueJoiner rejoiner = rejoin ? (value1, value2) -> "rejoin(" + value1 + "," + value2 + ")" : null;