Skip to content

Commit

Permalink
Revert "KAFKA-16448: Add ProcessingExceptionHandler in Streams config…
Browse files Browse the repository at this point in the history
…uration (apache#16092)" (apache#16141)

This reverts commit 3f70c46.

Reviewer: Lucas Brutschy <[email protected]>
  • Loading branch information
cadonna authored and chiacyu committed Jun 1, 2024
1 parent 1c00ce1 commit 88bb080
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 43 deletions.
16 changes: 0 additions & 16 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
Expand Down Expand Up @@ -556,11 +554,6 @@ public class StreamsConfig extends AbstractConfig {
public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler";
private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> interface.";

/** {@code processing.exception.handler} */
@SuppressWarnings("WeakerAccess")
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = "processing.exception.handler";
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code> interface.";

/** {@code default.dsl.store} */
@Deprecated
@SuppressWarnings("WeakerAccess")
Expand Down Expand Up @@ -938,11 +931,6 @@ public class StreamsConfig extends AbstractConfig {
DefaultProductionExceptionHandler.class.getName(),
Importance.MEDIUM,
DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC)
.define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
Type.CLASS,
LogAndFailProcessingExceptionHandler.class.getName(),
Importance.MEDIUM,
PROCESSING_EXCEPTION_HANDLER_CLASS_DOC)
.define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
Type.CLASS,
FailOnInvalidTimestamp.class.getName(),
Expand Down Expand Up @@ -1937,10 +1925,6 @@ public ProductionExceptionHandler defaultProductionExceptionHandler() {
return getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
}

public ProcessingExceptionHandler processingExceptionHandler() {
return getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class);
}

/**
* Override any client properties in the original configs with overrides
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1586,33 +1586,6 @@ public void shouldDisableMetricCollectionOnMainConsumerOnly() {
);
}

@Test
public void shouldGetDefaultValueProcessingExceptionHandler() {
final StreamsConfig streamsConfig = new StreamsConfig(props);

assertEquals("org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler", streamsConfig.processingExceptionHandler().getClass().getName());
}

@Test
public void shouldOverrideDefaultProcessingExceptionHandler() {
props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, "org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler");
final StreamsConfig streamsConfig = new StreamsConfig(props);

assertEquals("org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler", streamsConfig.processingExceptionHandler().getClass().getName());
}

@Test
public void testInvalidProcessingExceptionHandler() {
props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, "org.apache.kafka.streams.errors.InvalidProcessingExceptionHandler");
final Exception exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));

assertThat(
exception.getMessage(),
containsString("Invalid value org.apache.kafka.streams.errors.InvalidProcessingExceptionHandler " +
"for configuration processing.exception.handler: Class org.apache.kafka.streams.errors.InvalidProcessingExceptionHandler could not be found.")
);
}

static class MisconfiguredSerde implements Serde<Object> {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
Expand Down

0 comments on commit 88bb080

Please sign in to comment.