Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16770; [2/2] Coalesce records into bigger batches #16215

Merged
merged 13 commits into from
Jun 12, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ boolean canTransitionFrom(CoordinatorState state) {
FAILED {
@Override
boolean canTransitionFrom(CoordinatorState state) {
return state == LOADING;
return state == LOADING || state == ACTIVE;
}
};

Expand Down Expand Up @@ -761,7 +761,10 @@ private void flushCurrentBatch() {
transitionTo(CoordinatorState.FAILED);
// Transition to LOADING to trigger the restoration of the state.
transitionTo(CoordinatorState.LOADING);
dajac marked this conversation as resolved.
Show resolved Hide resolved
return;
// Thrown NotCoordinatorException to fail the operation that
// triggered the write. We use NotCoordinatorException to be
// consistent with the transition to FAILED.
throw Errors.NOT_COORDINATOR.exception();
}

// Add all the pending deferred events to the deferred event queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3664,6 +3664,87 @@ public void testScheduleTransactionalWriteOperationWithBatching() throws Executi
assertNull(complete1.get(5, TimeUnit.SECONDS));
}

@Test
public void testStateMachineIsReloadedWhenOutOfSync() {
MockTimer timer = new MockTimer();
MockCoordinatorLoader loader = spy(new MockCoordinatorLoader());
MockPartitionWriter writer = new MockPartitionWriter() {
@Override
public long append(
TopicPartition tp,
VerificationGuard verificationGuard,
MemoryRecords batch
) {
// Add 1 to the returned offsets.
return super.append(tp, verificationGuard, batch) + 1;
}
};

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(loader)
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.build();

// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);

// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());
assertNull(ctx.currentBatch);

// Get the max batch size.
int maxBatchSize = writer.config(TP).maxMessageSize();

// Create records with a quarter of the max batch size each. Keep in mind that
// each batch has a header so it is not possible to have those four records
// in one single batch.
List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
char[] payload = new char[maxBatchSize / 4];
Arrays.fill(payload, c);
return new String(payload);
}).collect(Collectors.toList());

// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(0, 1), "response1"));

// Write #2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(1, 2), "response2"));

// Write #3.
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(2, 3), "response3"));

// Write #4. This write cannot make it in the current batch. So the current batch
// is flushed. It will fail. So we expect all writes to fail.
CompletableFuture<String> write4 = runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(records.subList(3, 4), "response4"));

// Verify the futures.
assertFutureThrows(write1, NotCoordinatorException.class);
dajac marked this conversation as resolved.
Show resolved Hide resolved
assertFutureThrows(write2, NotCoordinatorException.class);
assertFutureThrows(write3, NotCoordinatorException.class);
// Write #4 is also expected to fail.
assertFutureThrows(write4, NotCoordinatorException.class);

// Verify that the state machine was loaded twice.
verify(loader, times(2)).load(eq(TP), any());
}

private static <S extends CoordinatorShard<U>, U> ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
CoordinatorRuntime<S, U> runtime,
TopicPartition tp
Expand Down