From b6aac751e31d2bc69864dd10f1406305fd49b1d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Greffier?= Date: Mon, 29 Jul 2024 15:34:24 +0200 Subject: [PATCH] KAFKA-16448: Remove source raw key and source raw value from ErrorHandlerContext --- .../streams/errors/ErrorHandlerContext.java | 28 ------------------- .../internals/DefaultErrorHandlerContext.java | 16 ----------- .../processor/internals/CorruptedRecord.java | 2 +- .../internals/GlobalStateManagerImpl.java | 3 +- .../internals/GlobalStateUpdateTask.java | 3 +- .../processor/internals/ProcessorAdapter.java | 3 +- .../internals/ProcessorContextImpl.java | 3 +- .../processor/internals/ProcessorNode.java | 2 -- .../internals/ProcessorRecordContext.java | 18 +----------- .../processor/internals/RecordQueue.java | 2 +- .../streams/processor/internals/SinkNode.java | 3 +- .../processor/internals/StampedRecord.java | 19 +------------ .../processor/internals/StreamTask.java | 6 ++-- ...essingExceptionHandlerIntegrationTest.java | 2 -- .../internals/ProcessorNodeTest.java | 7 +---- 15 files changed, 12 insertions(+), 105 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java index 0c50547549027..6c5e4f19596c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java @@ -89,34 +89,6 @@ public interface ErrorHandlerContext { */ Headers headers(); - /** - * Return the non-deserialized byte[] of the input message key if the context has been triggered by a message. - * - *

If this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, it will return {@code null}. - * - *

If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent - * to the repartition topic. - * - * @return the raw byte of the key of the source message - */ - byte[] sourceRawKey(); - - /** - * Return the non-deserialized byte[] of the input message value if the context has been triggered by a message. - * - *

If this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, it will return {@code null}. - * - *

If this method is invoked in a sub-topology due to a repartition, the returned value would be one sent - * to the repartition topic. - * - * @return the raw byte of the value of the source message - */ - byte[] sourceRawValue(); - /** * Return the current processor node ID. * diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java index ff79860d77e30..aa066fb6da96c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java @@ -28,8 +28,6 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext { private final int partition; private final long offset; private final Headers headers; - private final byte[] sourceRawKey; - private final byte[] sourceRawValue; private final String processorNodeId; private final TaskId taskId; @@ -37,16 +35,12 @@ public DefaultErrorHandlerContext(final String topic, final int partition, final long offset, final Headers headers, - final byte[] sourceRawKey, - final byte[] sourceRawValue, final String processorNodeId, final TaskId taskId) { this.topic = topic; this.partition = partition; this.offset = offset; this.headers = headers; - this.sourceRawKey = sourceRawKey; - this.sourceRawValue = sourceRawValue; this.processorNodeId = processorNodeId; this.taskId = taskId; } @@ -71,16 +65,6 @@ public Headers headers() { return headers; } - @Override - public byte[] sourceRawKey() { - return sourceRawKey; - } - - @Override - public byte[] sourceRawValue() { - return sourceRawValue; - } - @Override public String processorNodeId() { return processorNodeId; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java index 1bc8cb51092c6..d31a29883cabe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CorruptedRecord.java @@ -28,7 +28,7 @@ public class CorruptedRecord extends StampedRecord { CorruptedRecord(final ConsumerRecord rawRecord) { - super(rawRecord, ConsumerRecord.NO_TIMESTAMP, rawRecord); + super(rawRecord, ConsumerRecord.NO_TIMESTAMP); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index b1263ddc58df6..6b7214a9ed185 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -307,8 +307,7 @@ private void reprocessState(final List topicPartitions, record.offset(), record.partition(), record.topic(), - record.headers(), - record); + record.headers()); globalProcessorContext.setRecordContext(recordContext); try { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 1713efb52a9bd..12a6beedbcd98 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -113,8 +113,7 @@ public void update(final ConsumerRecord record) { deserialized.offset(), deserialized.partition(), deserialized.topic(), - deserialized.headers(), - record); + deserialized.headers()); processorContext.setRecordContext(recordContext); processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode()); final Record toProcess = new Record<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java index a5d88f5a7f8a9..79db3847cfb06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java @@ -66,8 +66,7 @@ public void process(final Record record) { context.offset(), context.partition(), context.topic(), - record.headers(), - processorRecordContext.rawRecord() + record.headers() )); delegate.process(record.key(), record.value()); } finally { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 6a79434b622ab..b484d26f0fe87 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -261,8 +261,7 @@ public void forward(final Record record, final String childName) { recordContext.offset(), recordContext.partition(), recordContext.topic(), - record.headers(), - recordContext.rawRecord()); + record.headers()); } if (childName == null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index eabf9e3d5c471..c30d42ba7461d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -209,8 +209,6 @@ public void process(final Record record) { internalProcessorContext.partition(), internalProcessorContext.offset(), internalProcessorContext.headers(), - internalProcessorContext.recordContext().rawRecord().key(), - internalProcessorContext.recordContext().rawRecord().value(), internalProcessorContext.currentNode().name(), internalProcessorContext.taskId()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index 3d1ce0529e678..839baaad87528 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; @@ -38,28 +37,17 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata { private final String topic; private final int partition; private final Headers headers; - private final ConsumerRecord rawRecord; public ProcessorRecordContext(final long timestamp, final long offset, final int partition, final String topic, final Headers headers) { - this(timestamp, offset, partition, topic, headers, null); - } - - public ProcessorRecordContext(final long timestamp, - final long offset, - final int partition, - final String topic, - final Headers headers, - final ConsumerRecord rawRecord) { this.timestamp = timestamp; this.offset = offset; this.topic = topic; this.partition = partition; this.headers = Objects.requireNonNull(headers); - this.rawRecord = rawRecord; } @Override @@ -87,10 +75,6 @@ public Headers headers() { return headers; } - public ConsumerRecord rawRecord() { - return rawRecord; - } - public long residentMemorySizeEstimate() { long size = 0; size += Long.BYTES; // value.context.timestamp @@ -189,7 +173,7 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) { headers = new RecordHeaders(headerArr); } - return new ProcessorRecordContext(timestamp, offset, partition, topic, headers, null); + return new ProcessorRecordContext(timestamp, offset, partition, topic, headers); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index a3c9ea67f067d..a6b30a07ef96d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -230,7 +230,7 @@ private void updateHead() { droppedRecordsSensor.record(); continue; } - headRecord = new StampedRecord(deserialized, timestamp, raw); + headRecord = new StampedRecord(deserialized, timestamp); headRecordSizeInBytes = consumerRecordSizeInBytes(raw); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 871cb2284ee4d..6e79616d30a9c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -88,8 +88,7 @@ public void process(final Record record) { context.offset(), context.partition(), context.topic(), - record.headers(), - context.recordContext().rawRecord() + record.headers() ); final String topic = topicExtractor.extract(key, value, contextForExtraction); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java index d82cd98ed7eec..d50fbee68260f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java @@ -20,11 +20,8 @@ import org.apache.kafka.common.header.Headers; public class StampedRecord extends Stamped> { - private final ConsumerRecord rawRecord; - - public StampedRecord(final ConsumerRecord record, final long timestamp, final ConsumerRecord rawRecord) { + public StampedRecord(final ConsumerRecord record, final long timestamp) { super(record, timestamp); - this.rawRecord = rawRecord; } public String topic() { @@ -51,20 +48,6 @@ public Headers headers() { return value.headers(); } - public ConsumerRecord rawRecord() { - return rawRecord; - } - - @Override - public boolean equals(final Object other) { - return super.equals(other); - } - - @Override - public int hashCode() { - return super.hashCode(); - } - @Override public String toString() { return value.toString() + ", timestamp = " + timestamp; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 30b9038aa6a67..8cbe5780b90bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -844,8 +844,7 @@ private void doProcess(final long wallClockTime) { record.offset(), record.partition(), record.topic(), - record.headers(), - record.rawRecord() + record.headers() ); updateProcessorContext(currNode, wallClockTime, recordContext); @@ -906,8 +905,7 @@ public void punctuate(final ProcessorNode node, -1L, -1, null, - new RecordHeaders(), - null + new RecordHeaders() ); updateProcessorContext(node, time.milliseconds(), recordContext); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java index bb29ac64f02e0..6c1a64344ec76 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java @@ -176,8 +176,6 @@ public void configure(final Map configs) { } private static void assertProcessingExceptionHandlerInputs(final ErrorHandlerContext context, final Record record, final Exception exception) { - assertTrue(Arrays.asList("ID123-2-ERR", "ID123-5-ERR").contains(new String(context.sourceRawKey()))); - assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains(new String(context.sourceRawValue()))); assertTrue(Arrays.asList("ID123-2-ERR", "ID123-5-ERR").contains((String) record.key())); assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String) record.value())); assertEquals("TOPIC_NAME", context.topic()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index 6be033cb2dc08..df3f927686362 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -58,7 +57,6 @@ import java.util.Set; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -315,8 +313,7 @@ private InternalProcessorContext mockInternalProcessorContext() OFFSET, PARTITION, TOPIC, - new RecordHeaders(), - new ConsumerRecord<>(TOPIC, PARTITION, OFFSET, KEY.getBytes(), VALUE.getBytes()))); + new RecordHeaders())); when(internalProcessorContext.currentNode()).thenReturn(new ProcessorNode<>(NAME)); return internalProcessorContext; @@ -337,8 +334,6 @@ public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHa assertEquals(internalProcessorContext.topic(), context.topic()); assertEquals(internalProcessorContext.partition(), context.partition()); assertEquals(internalProcessorContext.offset(), context.offset()); - assertArrayEquals(internalProcessorContext.recordContext().rawRecord().key(), context.sourceRawKey()); - assertArrayEquals(internalProcessorContext.recordContext().rawRecord().value(), context.sourceRawValue()); assertEquals(internalProcessorContext.currentNode().name(), context.processorNodeId()); assertEquals(internalProcessorContext.taskId(), context.taskId()); assertEquals(KEY, record.key());