diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 8802de24827c1..25e86928b00c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -39,8 +39,6 @@ import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; -import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler; -import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.errors.ProductionExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.internals.StreamsConfigUtils; @@ -556,11 +554,6 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler"; private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.ProductionExceptionHandler interface."; - /** {@code processing.exception.handler} */ - @SuppressWarnings("WeakerAccess") - public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = "processing.exception.handler"; - public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.ProcessingExceptionHandler interface."; - /** {@code default.dsl.store} */ @Deprecated @SuppressWarnings("WeakerAccess") @@ -938,11 +931,6 @@ public class StreamsConfig extends AbstractConfig { DefaultProductionExceptionHandler.class.getName(), Importance.MEDIUM, DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC) - .define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, - Type.CLASS, - LogAndFailProcessingExceptionHandler.class.getName(), - Importance.MEDIUM, - PROCESSING_EXCEPTION_HANDLER_CLASS_DOC) .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, Type.CLASS, FailOnInvalidTimestamp.class.getName(), @@ -1937,10 +1925,6 @@ public ProductionExceptionHandler defaultProductionExceptionHandler() { return getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class); } - public ProcessingExceptionHandler processingExceptionHandler() { - return getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class); - } - /** * Override any client properties in the original configs with overrides * diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 28430a27e5d99..8c89132ae2f9c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -1586,33 +1586,6 @@ public void shouldDisableMetricCollectionOnMainConsumerOnly() { ); } - @Test - public void shouldGetDefaultValueProcessingExceptionHandler() { - final StreamsConfig streamsConfig = new StreamsConfig(props); - - assertEquals("org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler", streamsConfig.processingExceptionHandler().getClass().getName()); - } - - @Test - public void shouldOverrideDefaultProcessingExceptionHandler() { - props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, "org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler"); - final StreamsConfig streamsConfig = new StreamsConfig(props); - - assertEquals("org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler", streamsConfig.processingExceptionHandler().getClass().getName()); - } - - @Test - public void testInvalidProcessingExceptionHandler() { - props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, "org.apache.kafka.streams.errors.InvalidProcessingExceptionHandler"); - final Exception exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props)); - - assertThat( - exception.getMessage(), - containsString("Invalid value org.apache.kafka.streams.errors.InvalidProcessingExceptionHandler " + - "for configuration processing.exception.handler: Class org.apache.kafka.streams.errors.InvalidProcessingExceptionHandler could not be found.") - ); - } - static class MisconfiguredSerde implements Serde { @Override public void configure(final Map configs, final boolean isKey) {