Skip to content

Commit

Permalink
KAFKA-16448: Remove source raw key and source raw value from ErrorHan…
Browse files Browse the repository at this point in the history
…dlerContext
  • Loading branch information
loicgreffier committed Jul 29, 2024
1 parent 4e69bc0 commit b6aac75
Show file tree
Hide file tree
Showing 15 changed files with 12 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,34 +89,6 @@ public interface ErrorHandlerContext {
*/
Headers headers();

/**
* Return the non-deserialized byte[] of the input message key if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return {@code null}.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent
* to the repartition topic.
*
* @return the raw byte of the key of the source message
*/
byte[] sourceRawKey();

/**
* Return the non-deserialized byte[] of the input message value if the context has been triggered by a message.
*
* <p> If this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, it will return {@code null}.
*
* <p> If this method is invoked in a sub-topology due to a repartition, the returned value would be one sent
* to the repartition topic.
*
* @return the raw byte of the value of the source message
*/
byte[] sourceRawValue();

/**
* Return the current processor node ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,19 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext {
private final int partition;
private final long offset;
private final Headers headers;
private final byte[] sourceRawKey;
private final byte[] sourceRawValue;
private final String processorNodeId;
private final TaskId taskId;

public DefaultErrorHandlerContext(final String topic,
final int partition,
final long offset,
final Headers headers,
final byte[] sourceRawKey,
final byte[] sourceRawValue,
final String processorNodeId,
final TaskId taskId) {
this.topic = topic;
this.partition = partition;
this.offset = offset;
this.headers = headers;
this.sourceRawKey = sourceRawKey;
this.sourceRawValue = sourceRawValue;
this.processorNodeId = processorNodeId;
this.taskId = taskId;
}
Expand All @@ -71,16 +65,6 @@ public Headers headers() {
return headers;
}

@Override
public byte[] sourceRawKey() {
return sourceRawKey;
}

@Override
public byte[] sourceRawValue() {
return sourceRawValue;
}

@Override
public String processorNodeId() {
return processorNodeId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class CorruptedRecord extends StampedRecord {

CorruptedRecord(final ConsumerRecord<byte[], byte[]> rawRecord) {
super(rawRecord, ConsumerRecord.NO_TIMESTAMP, rawRecord);
super(rawRecord, ConsumerRecord.NO_TIMESTAMP);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,7 @@ private void reprocessState(final List<TopicPartition> topicPartitions,
record.offset(),
record.partition(),
record.topic(),
record.headers(),
record);
record.headers());
globalProcessorContext.setRecordContext(recordContext);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ public void update(final ConsumerRecord<byte[], byte[]> record) {
deserialized.offset(),
deserialized.partition(),
deserialized.topic(),
deserialized.headers(),
record);
deserialized.headers());
processorContext.setRecordContext(recordContext);
processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
final Record<Object, Object> toProcess = new Record<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public void process(final Record<KIn, VIn> record) {
context.offset(),
context.partition(),
context.topic(),
record.headers(),
processorRecordContext.rawRecord()
record.headers()
));
delegate.process(record.key(), record.value());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,7 @@ public <K, V> void forward(final Record<K, V> record, final String childName) {
recordContext.offset(),
recordContext.partition(),
recordContext.topic(),
record.headers(),
recordContext.rawRecord());
record.headers());
}

if (childName == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ public void process(final Record<KIn, VIn> record) {
internalProcessorContext.partition(),
internalProcessorContext.offset(),
internalProcessorContext.headers(),
internalProcessorContext.recordContext().rawRecord().key(),
internalProcessorContext.recordContext().rawRecord().value(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
Expand All @@ -38,28 +37,17 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
private final String topic;
private final int partition;
private final Headers headers;
private final ConsumerRecord<byte[], byte[]> rawRecord;

public ProcessorRecordContext(final long timestamp,
final long offset,
final int partition,
final String topic,
final Headers headers) {
this(timestamp, offset, partition, topic, headers, null);
}

public ProcessorRecordContext(final long timestamp,
final long offset,
final int partition,
final String topic,
final Headers headers,
final ConsumerRecord<byte[], byte[]> rawRecord) {
this.timestamp = timestamp;
this.offset = offset;
this.topic = topic;
this.partition = partition;
this.headers = Objects.requireNonNull(headers);
this.rawRecord = rawRecord;
}

@Override
Expand Down Expand Up @@ -87,10 +75,6 @@ public Headers headers() {
return headers;
}

public ConsumerRecord<byte[], byte[]> rawRecord() {
return rawRecord;
}

public long residentMemorySizeEstimate() {
long size = 0;
size += Long.BYTES; // value.context.timestamp
Expand Down Expand Up @@ -189,7 +173,7 @@ public static ProcessorRecordContext deserialize(final ByteBuffer buffer) {
headers = new RecordHeaders(headerArr);
}

return new ProcessorRecordContext(timestamp, offset, partition, topic, headers, null);
return new ProcessorRecordContext(timestamp, offset, partition, topic, headers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private void updateHead() {
droppedRecordsSensor.record();
continue;
}
headRecord = new StampedRecord(deserialized, timestamp, raw);
headRecord = new StampedRecord(deserialized, timestamp);
headRecordSizeInBytes = consumerRecordSizeInBytes(raw);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ public void process(final Record<KIn, VIn> record) {
context.offset(),
context.partition(),
context.topic(),
record.headers(),
context.recordContext().rawRecord()
record.headers()
);

final String topic = topicExtractor.extract(key, value, contextForExtraction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@
import org.apache.kafka.common.header.Headers;

public class StampedRecord extends Stamped<ConsumerRecord<?, ?>> {
private final ConsumerRecord<byte[], byte[]> rawRecord;

public StampedRecord(final ConsumerRecord<?, ?> record, final long timestamp, final ConsumerRecord<byte[], byte[]> rawRecord) {
public StampedRecord(final ConsumerRecord<?, ?> record, final long timestamp) {
super(record, timestamp);
this.rawRecord = rawRecord;
}

public String topic() {
Expand All @@ -51,20 +48,6 @@ public Headers headers() {
return value.headers();
}

public ConsumerRecord<byte[], byte[]> rawRecord() {
return rawRecord;
}

@Override
public boolean equals(final Object other) {
return super.equals(other);
}

@Override
public int hashCode() {
return super.hashCode();
}

@Override
public String toString() {
return value.toString() + ", timestamp = " + timestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -844,8 +844,7 @@ private void doProcess(final long wallClockTime) {
record.offset(),
record.partition(),
record.topic(),
record.headers(),
record.rawRecord()
record.headers()
);
updateProcessorContext(currNode, wallClockTime, recordContext);

Expand Down Expand Up @@ -906,8 +905,7 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node,
-1L,
-1,
null,
new RecordHeaders(),
null
new RecordHeaders()
);
updateProcessorContext(node, time.milliseconds(), recordContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,6 @@ public void configure(final Map<String, ?> configs) {
}

private static void assertProcessingExceptionHandlerInputs(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) {
assertTrue(Arrays.asList("ID123-2-ERR", "ID123-5-ERR").contains(new String(context.sourceRawKey())));
assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains(new String(context.sourceRawValue())));
assertTrue(Arrays.asList("ID123-2-ERR", "ID123-5-ERR").contains((String) record.key()));
assertTrue(Arrays.asList("ID123-A2", "ID123-A5").contains((String) record.value()));
assertEquals("TOPIC_NAME", context.topic());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeaders;
Expand Down Expand Up @@ -58,7 +57,6 @@
import java.util.Set;

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -315,8 +313,7 @@ private InternalProcessorContext<Object, Object> mockInternalProcessorContext()
OFFSET,
PARTITION,
TOPIC,
new RecordHeaders(),
new ConsumerRecord<>(TOPIC, PARTITION, OFFSET, KEY.getBytes(), VALUE.getBytes())));
new RecordHeaders()));
when(internalProcessorContext.currentNode()).thenReturn(new ProcessorNode<>(NAME));

return internalProcessorContext;
Expand All @@ -337,8 +334,6 @@ public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHa
assertEquals(internalProcessorContext.topic(), context.topic());
assertEquals(internalProcessorContext.partition(), context.partition());
assertEquals(internalProcessorContext.offset(), context.offset());
assertArrayEquals(internalProcessorContext.recordContext().rawRecord().key(), context.sourceRawKey());
assertArrayEquals(internalProcessorContext.recordContext().rawRecord().value(), context.sourceRawValue());
assertEquals(internalProcessorContext.currentNode().name(), context.processorNodeId());
assertEquals(internalProcessorContext.taskId(), context.taskId());
assertEquals(KEY, record.key());
Expand Down

0 comments on commit b6aac75

Please sign in to comment.