Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18295: Remove deprecated function Partitioner#onNewBatch #18282

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -946,15 +946,6 @@ private void throwIfProducerClosed() {
throw new IllegalStateException("Cannot perform operation after producer has been closed");
}

/**
* Call deprecated {@link Partitioner#onNewBatch}
*/
@SuppressWarnings("deprecation")
private void onNewBatch(String topic, Cluster cluster, int prevPartition) {
assert partitioner != null;
partitioner.onNewBatch(topic, cluster, prevPartition);
}

/**
* Implementation of asynchronously send a record to a topic.
*/
Expand Down Expand Up @@ -1009,32 +1000,15 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();

// A custom partitioner may take advantage on the onNewBatch callback.
boolean abortOnNewBatch = partitioner != null;

// Append the record to the accumulator. Note, that the actual partition may be
// calculated there and can be accessed via appendCallbacks.topicPartition.
RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);
serializedValue, headers, appendCallbacks, remainingWaitMs, nowMs, cluster);
assert appendCallbacks.getPartition() != RecordMetadata.UNKNOWN_PARTITION;

if (result.abortForNewBatch) {
int prevPartition = partition;
// IMPORTANT NOTE: the following onNewBatch and partition calls should not interrupted to allow
// the custom partitioner to correctly track its state
onNewBatch(record.topic(), cluster, prevPartition);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need the path of "abortOnNewBatch", right? If so, could you please do a bit refactor for it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Updated it.

partition = partition(record, serializedKey, serializedValue, cluster);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, false, nowMs, cluster);
}

// Add the partition to the transaction (if in progress) after it has been successfully
// appended to the accumulator. We cannot do it before because the partition may be
// unknown or the initially selected partition may be changed when the batch is closed
// (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse to dequeue
// unknown. Note that the `Sender` will refuse to dequeue
// batches from the accumulator until they have been added to the transaction.
if (transactionManager != null) {
transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,4 @@ public interface Partitioner extends Configurable, Closeable {
* This is called when partitioner is closed.
*/
void close();

/**
* Note this method is only implemented in DefaultPartitioner and UniformStickyPartitioner which
* are now deprecated. See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner">KIP-794</a> for more info.
* <p>
* Notifies the partitioner a new batch is about to be created. When using the sticky partitioner,
* this method can change the chosen sticky partition for the new batch.
* <p>
* After onNewBatch, the {@link #partition(String, Object, byte[], Object, byte[], Cluster)} method is called again
* which allows the implementation to "redirect" the message on new batch creation.
* @param topic The topic name
* @param cluster The current cluster metadata
* @param prevPartition The partition previously selected for the record that triggered a new batch
* @deprecated Since 3.3.0
*/
@Deprecated
default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,5 @@ private int nextValue(String topic) {
return counter.getAndIncrement();
}

@SuppressWarnings("deprecation")
@Override
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
previousPartition.set(new TopicPartition(topic, prevPartition));
}

public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,6 @@ private boolean partitionChanged(String topic,
* @param headers the Headers for the record
* @param callbacks The callbacks to execute
* @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
* @param abortOnNewBatch A boolean that indicates returning before a new batch is created and
* running the partitioner's onNewBatch method before trying to append again
* @param nowMs The current time, in milliseconds
* @param cluster The cluster metadata
*/
Expand All @@ -290,7 +288,6 @@ public RecordAppendResult append(String topic,
Header[] headers,
AppendCallbacks callbacks,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs,
Cluster cluster) throws InterruptedException {
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize)));
Expand Down Expand Up @@ -336,12 +333,6 @@ public RecordAppendResult append(String topic,
}
}

// we don't have an in-progress record batch try to allocate a new batch
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true, 0);
}

if (buffer == null) {
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(
RecordBatch.CURRENT_MAGIC_VALUE, compression.type(), key, value, headers));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2315,40 +2315,6 @@ public void testPartitionAddedToTransaction() throws Exception {
}
}

@SuppressWarnings("deprecation")
@Test
public void testPartitionAddedToTransactionAfterFullBatchRetry() throws Exception {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);

String topic = "foo";
TopicPartition topicPartition0 = new TopicPartition(topic, 0);
TopicPartition topicPartition1 = new TopicPartition(topic, 1);
Cluster cluster = TestUtils.singletonCluster(topic, 2);

when(ctx.sender.isRunning()).thenReturn(true);
when(ctx.metadata.fetch()).thenReturn(cluster);

long timestamp = ctx.time.milliseconds();
ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, "key", "value");

FutureRecordMetadata future = expectAppendWithAbortForNewBatch(
ctx,
record,
topicPartition0,
topicPartition1,
cluster
);

try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
assertEquals(future, producer.send(record));
assertFalse(future.isDone());
verify(ctx.partitioner).onNewBatch(topic, cluster, 0);
verify(ctx.transactionManager, never()).maybeAddPartition(topicPartition0);
verify(ctx.transactionManager).maybeAddPartition(topicPartition1);
}
}

private <T> FutureRecordMetadata expectAppend(
KafkaProducerTestContext<T> ctx,
ProducerRecord<T, T> record,
Expand Down Expand Up @@ -2387,7 +2353,6 @@ private <T> FutureRecordMetadata expectAppend(
eq(Record.EMPTY_HEADERS), // 5
any(RecordAccumulator.AppendCallbacks.class), // 6 <--
anyLong(),
eq(true),
anyLong(),
any()
)).thenAnswer(invocation -> {
Expand All @@ -2405,89 +2370,6 @@ private <T> FutureRecordMetadata expectAppend(
return futureRecordMetadata;
}

private <T> FutureRecordMetadata expectAppendWithAbortForNewBatch(
KafkaProducerTestContext<T> ctx,
ProducerRecord<T, T> record,
TopicPartition initialSelectedPartition,
TopicPartition retrySelectedPartition,
Cluster cluster
) throws InterruptedException {
byte[] serializedKey = ctx.serializer.serialize(topic, record.key());
byte[] serializedValue = ctx.serializer.serialize(topic, record.value());
long timestamp = record.timestamp() == null ? ctx.time.milliseconds() : record.timestamp();

ProduceRequestResult requestResult = new ProduceRequestResult(retrySelectedPartition);
FutureRecordMetadata futureRecordMetadata = new FutureRecordMetadata(
requestResult,
0,
timestamp,
serializedKey.length,
serializedValue.length,
ctx.time
);

when(ctx.partitioner.partition(
initialSelectedPartition.topic(),
record.key(),
serializedKey,
record.value(),
serializedValue,
cluster
)).thenReturn(initialSelectedPartition.partition())
.thenReturn(retrySelectedPartition.partition());

when(ctx.accumulator.append(
eq(initialSelectedPartition.topic()), // 0
eq(initialSelectedPartition.partition()), // 1
eq(timestamp), // 2
eq(serializedKey), // 3
eq(serializedValue), // 4
eq(Record.EMPTY_HEADERS), // 5
any(RecordAccumulator.AppendCallbacks.class), // 6 <--
anyLong(),
eq(true), // abortOnNewBatch
anyLong(),
any()
)).thenAnswer(invocation -> {
RecordAccumulator.AppendCallbacks callbacks =
(RecordAccumulator.AppendCallbacks) invocation.getArguments()[6];
callbacks.setPartition(initialSelectedPartition.partition());
return new RecordAccumulator.RecordAppendResult(
null,
false,
false,
true,
0);
});

when(ctx.accumulator.append(
eq(retrySelectedPartition.topic()), // 0
eq(retrySelectedPartition.partition()), // 1
eq(timestamp), // 2
eq(serializedKey), // 3
eq(serializedValue), // 4
eq(Record.EMPTY_HEADERS), // 5
any(RecordAccumulator.AppendCallbacks.class), // 6 <--
anyLong(),
eq(false), // abortOnNewBatch
anyLong(),
any()
)).thenAnswer(invocation -> {
RecordAccumulator.AppendCallbacks callbacks =
(RecordAccumulator.AppendCallbacks) invocation.getArguments()[6];
callbacks.setPartition(retrySelectedPartition.partition());
return new RecordAccumulator.RecordAppendResult(
futureRecordMetadata,
false,
true,
false,
0);
});

return futureRecordMetadata;
}


private static final List<String> CLIENT_IDS = new ArrayList<>();

public static class SerializerForClientId implements Serializer<byte[]> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,77 +96,4 @@ public void testRoundRobinWithKeyBytes() {
assertEquals(10, partitionCount.get(1).intValue());
assertEquals(10, partitionCount.get(2).intValue());
}

@SuppressWarnings("deprecation")
@Test
public void testRoundRobinWithNullKeyBytes() {
final String topicA = "topicA";
final String topicB = "topicB";

List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 0, NODES[0], NODES, NODES),
new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
new PartitionInfo(topicB, 0, NODES[0], NODES, NODES));
Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions,
Collections.emptySet(), Collections.emptySet());

final Map<Integer, Integer> partitionCount = new HashMap<>();

Partitioner partitioner = new RoundRobinPartitioner();
for (int i = 0; i < 30; ++i) {
int partition = partitioner.partition(topicA, null, null, null, null, testCluster);
// Simulate single-message batches
partitioner.onNewBatch(topicA, testCluster, partition);
int nextPartition = partitioner.partition(topicA, null, null, null, null, testCluster);
assertEquals(partition, nextPartition, "New batch creation should not affect the partition selection");
Integer count = partitionCount.get(partition);
if (null == count)
count = 0;
partitionCount.put(partition, count + 1);

if (i % 5 == 0) {
partitioner.partition(topicB, null, null, null, null, testCluster);
}
}

assertEquals(10, partitionCount.get(0).intValue());
assertEquals(10, partitionCount.get(1).intValue());
assertEquals(10, partitionCount.get(2).intValue());
}

@SuppressWarnings("deprecation")
@Test
public void testRoundRobinWithNullKeyBytesAndEvenPartitionCount() {
final String topicA = "topicA";
final String topicB = "topicB";

List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 0, NODES[0], NODES, NODES),
new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
new PartitionInfo(topicB, 0, NODES[0], NODES, NODES), new PartitionInfo(topicA, 3, NODES[0], NODES, NODES));
Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions,
Collections.emptySet(), Collections.emptySet());

final Map<Integer, Integer> partitionCount = new HashMap<>();

Partitioner partitioner = new RoundRobinPartitioner();
for (int i = 0; i < 40; ++i) {
int partition = partitioner.partition(topicA, null, null, null, null, testCluster);
// Simulate single-message batches
partitioner.onNewBatch(topicA, testCluster, partition);
int nextPartition = partitioner.partition(topicA, null, null, null, null, testCluster);
assertEquals(partition, nextPartition, "New batch creation should not affect the partition selection");
Integer count = partitionCount.get(partition);
if (null == count)
count = 0;
partitionCount.put(partition, count + 1);

if (i % 5 == 0) {
partitioner.partition(topicB, null, null, null, null, testCluster);
}
}

assertEquals(10, partitionCount.get(0).intValue());
assertEquals(10, partitionCount.get(1).intValue());
assertEquals(10, partitionCount.get(2).intValue());
assertEquals(10, partitionCount.get(3).intValue());
}
}
Loading
Loading