From 1134520decfca990410acce2ec1a8cba5c8c2cb4 Mon Sep 17 00:00:00 2001 From: Ayoub Omari Date: Thu, 6 Jun 2024 00:05:04 +0200 Subject: [PATCH] KAFKA-16573: Specify node and store where serdes are needed (#15790) Reviewers: Matthias J. Sax , Bruno Cadonna , Anna Sophie Blee-Goldman --- .../apache/kafka/streams/StreamsConfig.java | 2 +- .../internals/WrappingNullableUtils.java | 13 +- .../streams/processor/internals/SinkNode.java | 15 +- .../processor/internals/SourceNode.java | 15 +- .../state/internals/MeteredKeyValueStore.java | 15 +- .../state/internals/MeteredSessionStore.java | 15 +- .../MeteredTimestampedKeyValueStore.java | 2 +- .../MeteredVersionedKeyValueStore.java | 18 +- .../state/internals/MeteredWindowStore.java | 13 +- .../internals/StoreSerdeInitializer.java | 76 +++++++++ .../internals/ProcessorNodeTest.java | 8 +- .../processor/internals/SinkNodeTest.java | 68 +++++++- .../processor/internals/SourceNodeTest.java | 83 ++++++++- .../internals/StoreSerdeInitializerTest.java | 157 ++++++++++++++++++ 14 files changed, 433 insertions(+), 67 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 25e86928b00c6..025a8a5b393ad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1875,7 +1875,7 @@ public KafkaClientSupplier getKafkaClientSupplier() { @SuppressWarnings("WeakerAccess") public Serde defaultKeySerde() { final Object keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG); - if (keySerdeConfigSetting == null) { + if (keySerdeConfigSetting == null) { throw new ConfigException("Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG"); } try { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java index b904608c3d1a1..5dff888e621f2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableUtils.java @@ -33,22 +33,19 @@ private static Deserializer prepareDeserializer(final Deserializer spe final Deserializer deserializerToUse; if (specificDeserializer == null) { - final Deserializer contextKeyDeserializer = context.keySerde().deserializer(); - final Deserializer contextValueDeserializer = context.valueSerde().deserializer(); - deserializerToUse = (Deserializer) (isKey ? contextKeyDeserializer : contextValueDeserializer); + deserializerToUse = (Deserializer) (isKey ? context.keySerde().deserializer() : context.valueSerde().deserializer()); } else { deserializerToUse = specificDeserializer; initNullableDeserializer(deserializerToUse, new SerdeGetter(context)); } return deserializerToUse; } + @SuppressWarnings("unchecked") private static Serializer prepareSerializer(final Serializer specificSerializer, final ProcessorContext context, final boolean isKey, final String name) { final Serializer serializerToUse; if (specificSerializer == null) { - final Serializer contextKeySerializer = context.keySerde().serializer(); - final Serializer contextValueSerializer = context.valueSerde().serializer(); - serializerToUse = (Serializer) (isKey ? contextKeySerializer : contextValueSerializer); + serializerToUse = (Serializer) (isKey ? context.keySerde().serializer() : context.valueSerde().serializer()); } else { serializerToUse = specificSerializer; initNullableSerializer(serializerToUse, new SerdeGetter(context)); @@ -60,7 +57,7 @@ private static Serializer prepareSerializer(final Serializer specificS private static Serde prepareSerde(final Serde specificSerde, final SerdeGetter getter, final boolean isKey) { final Serde serdeToUse; if (specificSerde == null) { - serdeToUse = (Serde) (isKey ? getter.keySerde() : getter.valueSerde()); + serdeToUse = (Serde) (isKey ? getter.keySerde() : getter.valueSerde()); } else { serdeToUse = specificSerde; } @@ -93,12 +90,14 @@ public static Serde prepareKeySerde(final Serde specificSerde, final S public static Serde prepareValueSerde(final Serde specificSerde, final SerdeGetter getter) { return prepareSerde(specificSerde, getter, false); } + @SuppressWarnings({"rawtypes", "unchecked"}) public static void initNullableSerializer(final Serializer specificSerializer, final SerdeGetter getter) { if (specificSerializer instanceof WrappingNullableSerializer) { ((WrappingNullableSerializer) specificSerializer).setIfUnset(getter); } } + @SuppressWarnings({"rawtypes", "unchecked"}) public static void initNullableDeserializer(final Deserializer specificDeserializer, final SerdeGetter getter) { if (specificDeserializer instanceof WrappingNullableDeserializer) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 6f508eff2792a..6e79616d30a9c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.Record; @@ -58,8 +60,17 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; - keySerializer = prepareKeySerializer(keySerializer, context, this.name()); - valSerializer = prepareValueSerializer(valSerializer, context, this.name()); + try { + keySerializer = prepareKeySerializer(keySerializer, context, this.name()); + } catch (ConfigException | StreamsException e) { + throw new StreamsException(String.format("Failed to initialize key serdes for sink node %s", name()), e, context.taskId()); + } + + try { + valSerializer = prepareValueSerializer(valSerializer, context, this.name()); + } catch (final ConfigException | StreamsException e) { + throw new StreamsException(String.format("Failed to initialize value serdes for sink node %s", name()), e, context.taskId()); + } } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 5d0c04b96a89f..2f53840acc94f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics; @@ -74,8 +76,17 @@ public void init(final InternalProcessorContext context) { super.init(context); this.context = context; - keyDeserializer = prepareKeyDeserializer(keyDeserializer, context, name()); - valDeserializer = prepareValueDeserializer(valDeserializer, context, name()); + try { + keyDeserializer = prepareKeyDeserializer(keyDeserializer, context, name()); + } catch (final ConfigException | StreamsException e) { + throw new StreamsException(String.format("Failed to initialize key serdes for source node %s", name()), e, context.taskId()); + } + + try { + valDeserializer = prepareValueDeserializer(valDeserializer, context, name()); + } catch (final ConfigException | StreamsException e) { + throw new StreamsException(String.format("Failed to initialize value serdes for source node %s", name()), e, context.taskId()); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index fbe42b87065e8..f828c50287788 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -60,7 +60,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue; @@ -187,21 +186,15 @@ protected Serde prepareValueSerdeForStore(final Serde valueSerde, final Se protected void initStoreSerde(final ProcessorContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerdeForStore(valueSerde, new SerdeGetter(context)) - ); + serdes = StoreSerdeInitializer.prepareStoreSerde( + context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerdeForStore); } protected void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerdeForStore(valueSerde, new SerdeGetter(context)) - ); + serdes = StoreSerdeInitializer.prepareStoreSerde( + context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerdeForStore); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 731bc3145c181..233c2c00b8736 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -30,7 +30,6 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; -import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.query.FailureReason; import org.apache.kafka.streams.query.PositionBound; @@ -151,21 +150,15 @@ private void registerMetrics() { private void initStoreSerde(final ProcessorContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = new StateSerdes<>( - changelogTopic, - WrappingNullableUtils.prepareKeySerde(keySerde, new SerdeGetter(context)), - WrappingNullableUtils.prepareValueSerde(valueSerde, new SerdeGetter(context)) - ); + serdes = StoreSerdeInitializer.prepareStoreSerde( + context, storeName, changelogTopic, keySerde, valueSerde, WrappingNullableUtils::prepareValueSerde); } private void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = new StateSerdes<>( - changelogTopic, - WrappingNullableUtils.prepareKeySerde(keySerde, new SerdeGetter(context)), - WrappingNullableUtils.prepareValueSerde(valueSerde, new SerdeGetter(context)) - ); + serdes = StoreSerdeInitializer.prepareStoreSerde( + context, storeName, changelogTopic, keySerde, valueSerde, WrappingNullableUtils::prepareValueSerde); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java index 0b4702b9dbea5..61e6533fb8d14 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java @@ -86,6 +86,7 @@ public class MeteredTimestampedKeyValueStore (query, positionBound, config, store) -> runTimestampedKeyQuery(query, positionBound, config) ) ); + @SuppressWarnings("unchecked") @Override protected Serde> prepareValueSerdeForStore(final Serde> valueSerde, final SerdeGetter getter) { @@ -96,7 +97,6 @@ protected Serde> prepareValueSerdeForStore(final Serde getWithBinary(final K key) { try { return maybeMeasureLatency(() -> { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java index af3b6b77dcfb1..3a12ca59f2953 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java @@ -18,18 +18,18 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde; -import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerde; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; import java.time.Instant; import java.util.Map; import java.util.Objects; + import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; @@ -303,11 +303,8 @@ protected void initStoreSerde(final ProcessorContext context) { // additionally init raw value serde final String storeName = super.name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - plainValueSerdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerde(plainValueSerde, new SerdeGetter(context)) - ); + plainValueSerdes = StoreSerdeInitializer.prepareStoreSerde( + context, storeName, changelogTopic, keySerde, plainValueSerde, WrappingNullableUtils::prepareValueSerde); } @Override @@ -317,11 +314,8 @@ protected void initStoreSerde(final StateStoreContext context) { // additionally init raw value serde final String storeName = super.name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - plainValueSerdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerde(plainValueSerde, new SerdeGetter(context)) - ); + plainValueSerdes = StoreSerdeInitializer.prepareStoreSerde( + context, storeName, changelogTopic, keySerde, plainValueSerde, WrappingNullableUtils::prepareValueSerde); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index a62e8c47563f2..bf6f749977fe2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -56,7 +56,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue; @@ -170,19 +169,15 @@ private void registerMetrics() { private void initStoreSerde(final ProcessorContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerde(valueSerde, new SerdeGetter(context))); + serdes = StoreSerdeInitializer.prepareStoreSerde( + context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerde); } private void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - serdes = new StateSerdes<>( - changelogTopic, - prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerde(valueSerde, new SerdeGetter(context))); + serdes = StoreSerdeInitializer.prepareStoreSerde( + context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerde); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java new file mode 100644 index 0000000000000..1a9aa02f3c555 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.StateSerdes; + + +public class StoreSerdeInitializer { + static StateSerdes prepareStoreSerde(final StateStoreContext context, + final String storeName, + final String changelogTopic, + final Serde keySerde, + final Serde valueSerde, + final PrepareFunc prepareValueSerdeFunc) { + return new StateSerdes<>( + changelogTopic, + prepareSerde(WrappingNullableUtils::prepareKeySerde, storeName, keySerde, new SerdeGetter(context), true, context.taskId()), + prepareSerde(prepareValueSerdeFunc, storeName, valueSerde, new SerdeGetter(context), false, context.taskId()) + ); + } + + static StateSerdes prepareStoreSerde(final ProcessorContext context, + final String storeName, + final String changelogTopic, + final Serde keySerde, + final Serde valueSerde, + final PrepareFunc prepareValueSerdeFunc) { + return new StateSerdes<>( + changelogTopic, + prepareSerde(WrappingNullableUtils::prepareKeySerde, storeName, keySerde, new SerdeGetter(context), true, context.taskId()), + prepareSerde(prepareValueSerdeFunc, storeName, valueSerde, new SerdeGetter(context), false, context.taskId()) + ); + } + + private static Serde prepareSerde(final PrepareFunc prepare, + final String storeName, + final Serde serde, + final SerdeGetter getter, + final Boolean isKey, + final TaskId taskId) { + + final String serdeType = isKey ? "key" : "value"; + try { + return prepare.prepareSerde(serde, getter); + } catch (final ConfigException | StreamsException e) { + throw new StreamsException(String.format("Failed to initialize %s serdes for store %s", serdeType, storeName), e, taskId); + } + } +} + +interface PrepareFunc { + Serde prepareSerde(Serde serde, SerdeGetter getter); +} + diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index dfa3f9e422a75..fdd5214385ad1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; @@ -154,10 +153,9 @@ public void testTopologyLevelConfigException() { .flatMapValues(value -> Collections.singletonList("")); final Topology topology = builder.build(); - final ConfigException se = assertThrows(ConfigException.class, () -> new TopologyTestDriver(topology)); - final String msg = se.getMessage(); - assertTrue("Error about class cast with serdes", msg.contains("StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); - assertTrue("Error about class cast with serdes", msg.contains("specify a key serde")); + final StreamsException se = assertThrows(StreamsException.class, () -> new TopologyTestDriver(topology)); + assertThat(se.getMessage(), containsString("Failed to initialize key serdes for source node")); + assertThat(se.getCause().getMessage(), containsString("Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); } private static class ClassCastProcessor extends ExceptionalProcessor { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java index 7e7f7b824b5c2..805f2fd5db46e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java @@ -16,18 +16,27 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockRecordCollector; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; public class SinkNodeTest { private final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class); @@ -40,14 +49,21 @@ public class SinkNodeTest { // Used to verify that the correct exceptions are thrown if the compiler checks are bypassed @SuppressWarnings({"unchecked", "rawtypes"}) private final SinkNode illTypedSink = (SinkNode) sink; + private MockedStatic utilsMock; - @Before - public void before() { - sink.init(context); + @BeforeEach + public void setup() { + utilsMock = Mockito.mockStatic(WrappingNullableUtils.class); + } + + @AfterEach + public void cleanup() { + utilsMock.close(); } @Test public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() { + sink.init(context); // When/Then context.setTime(-1); // ensures a negative timestamp is set for the record we send next try { @@ -58,4 +74,46 @@ public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() { } } + @Test + public void shouldThrowStreamsExceptionOnUndefinedKeySerde() { + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer(any(), any(), any())) + .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); + + final Throwable exception = assertThrows(StreamsException.class, () -> sink.init(context)); + + assertThat( + exception.getMessage(), + equalTo("Failed to initialize key serdes for sink node anyNodeName") + ); + assertThat( + exception.getCause().getMessage(), + equalTo("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG") + ); + } + + @Test + public void shouldThrowStreamsExceptionOnUndefinedValueSerde() { + utilsMock.when(() -> WrappingNullableUtils.prepareValueSerializer(any(), any(), any())) + .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); + + final Throwable exception = assertThrows(StreamsException.class, () -> sink.init(context)); + + assertThat( + exception.getMessage(), + equalTo("Failed to initialize value serdes for sink node anyNodeName") + ); + assertThat( + exception.getCause().getMessage(), + equalTo("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG") + ); + } + + @Test + public void shouldThrowStreamsExceptionWithExplicitErrorMessage() { + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer(any(), any(), any())).thenThrow(new StreamsException("")); + + final Throwable exception = assertThrows(StreamsException.class, () -> sink.init(context)); + + assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for sink node anyNodeName")); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java index 03f22a3a917d6..f048f9948dece 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; @@ -24,11 +25,17 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.StreamsTestUtils; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import java.nio.charset.StandardCharsets; import java.util.Map; @@ -39,9 +46,25 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; public class SourceNodeTest { + private MockedStatic utilsMock; + + @BeforeEach + public void setup() { + utilsMock = Mockito.mockStatic(WrappingNullableUtils.class); + } + + @AfterEach + public void cleanup() { + utilsMock.close(); + } + + @Test public void shouldProvideTopicHeadersAndDataToKeyDeserializer() { final SourceNode sourceNode = new MockSourceNode<>(new TheDeserializer(), new TheDeserializer()); @@ -106,4 +129,62 @@ public void shouldExposeProcessMetrics() { contains(sensorNamePrefix + ".s.process") ); } + + @Test + public void shouldThrowStreamsExceptionOnUndefinedKeySerde() { + final InternalMockProcessorContext context = new InternalMockProcessorContext<>(); + + final SourceNode node = + new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); + + utilsMock.when(() -> WrappingNullableUtils.prepareKeyDeserializer(any(), any(), any())) + .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); + + final Throwable exception = assertThrows(StreamsException.class, () -> node.init(context)); + + assertThat( + exception.getMessage(), + equalTo("Failed to initialize key serdes for source node TESTING_NODE") + ); + assertThat( + exception.getCause().getMessage(), + equalTo("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG") + ); + } + + @Test + public void shouldThrowStreamsExceptionOnUndefinedValueSerde() { + final InternalMockProcessorContext context = new InternalMockProcessorContext<>(); + + final SourceNode node = + new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); + + utilsMock.when(() -> WrappingNullableUtils.prepareValueDeserializer(any(), any(), any())) + .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); + + final Throwable exception = assertThrows(StreamsException.class, () -> node.init(context)); + + assertThat( + exception.getMessage(), + equalTo("Failed to initialize value serdes for source node TESTING_NODE") + ); + assertThat( + exception.getCause().getMessage(), + equalTo("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG") + ); + } + + @Test + public void shouldThrowStreamsExceptionWithExplicitErrorMessage() { + final InternalMockProcessorContext context = new InternalMockProcessorContext<>(); + + final SourceNode node = + new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer()); + + utilsMock.when(() -> WrappingNullableUtils.prepareKeyDeserializer(any(), any(), any())).thenThrow(new StreamsException("")); + + final Throwable exception = assertThrows(StreamsException.class, () -> node.init(context)); + + assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for source node TESTING_NODE")); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java new file mode 100644 index 0000000000000..2a692f278e34c --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializerTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.test.MockInternalNewProcessorContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; + +public class StoreSerdeInitializerTest { + + private MockedStatic utilsMock; + + @BeforeEach + public void setup() { + utilsMock = Mockito.mockStatic(WrappingNullableUtils.class); + } + + @AfterEach + public void cleanup() { + utilsMock.close(); + } + + @Test + public void shouldPrepareStoreSerdeForProcessorContext() { + final Serde keySerde = new Serdes.StringSerde(); + final Serde valueSerde = new Serdes.StringSerde(); + + final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); + + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())).thenReturn(keySerde); + utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())).thenReturn(valueSerde); + + final StateSerdes result = StoreSerdeInitializer.prepareStoreSerde( + (ProcessorContext) context, "myStore", "topic", keySerde, valueSerde, WrappingNullableUtils::prepareValueSerde); + + assertThat(result.keySerde(), equalTo(keySerde)); + assertThat(result.valueSerde(), equalTo(valueSerde)); + assertThat(result.topic(), equalTo("topic")); + } + + @Test + public void shouldThrowStreamsExceptionOnUndefinedKeySerdeForProcessorContext() { + final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); + + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())) + .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); + + final Throwable exception = assertThrows(StreamsException.class, + () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext) context, "myStore", "topic", + new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); + + assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for store myStore")); + assertThat(exception.getCause().getMessage(), equalTo("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); + } + + @Test + public void shouldThrowStreamsExceptionOnUndefinedValueSerdeForProcessorContext() { + final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); + + utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())) + .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); + + final Throwable exception = assertThrows(StreamsException.class, + () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext) context, "myStore", "topic", + new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); + + assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for store myStore")); + assertThat(exception.getCause().getMessage(), equalTo("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); + } + + @Test + public void shouldThrowStreamsExceptionOnUndefinedKeySerdeForStateStoreContext() { + final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); + + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())) + .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); + + final Throwable exception = assertThrows(StreamsException.class, + () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext) context, "myStore", "topic", + new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); + + assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for store myStore")); + assertThat(exception.getCause().getMessage(), equalTo("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); + } + + @Test + public void shouldThrowStreamsExceptionOnUndefinedValueSerdeForStateStoreContext() { + final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); + + utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())) + .thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); + + final Throwable exception = assertThrows(StreamsException.class, + () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext) context, "myStore", "topic", + new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); + + assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for store myStore")); + assertThat(exception.getCause().getMessage(), equalTo("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG")); + } + + @Test + public void shouldThrowStreamsExceptionWithExplicitErrorMessageForProcessorContext() { + final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); + + utilsMock.when(() -> WrappingNullableUtils.prepareKeySerde(any(), any())).thenThrow(new StreamsException("")); + + final Throwable exception = assertThrows(StreamsException.class, + () -> StoreSerdeInitializer.prepareStoreSerde((ProcessorContext) context, "myStore", "topic", + new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); + + assertThat(exception.getMessage(), equalTo("Failed to initialize key serdes for store myStore")); + } + + @Test + public void shouldThrowStreamsExceptionWithExplicitErrorMessageForStateStoreContext() { + final MockInternalNewProcessorContext context = new MockInternalNewProcessorContext<>(); + + utilsMock.when(() -> WrappingNullableUtils.prepareValueSerde(any(), any())).thenThrow(new StreamsException("")); + + final Throwable exception = assertThrows(StreamsException.class, + () -> StoreSerdeInitializer.prepareStoreSerde((StateStoreContext) context, "myStore", "topic", + new Serdes.StringSerde(), new Serdes.StringSerde(), WrappingNullableUtils::prepareValueSerde)); + + assertThat(exception.getMessage(), equalTo("Failed to initialize value serdes for store myStore")); + } +}