Skip to content

Commit

Permalink
KAFKA-16770; [2/2] Coalesce records into bigger batches (apache#16215)
Browse files Browse the repository at this point in the history
This patch is the continuation of apache#15964. It introduces the records coalescing to the CoordinatorRuntime. It also introduces a new configuration `group.coordinator.append.linger.ms` which allows administrators to chose the linger time or disable it with zero. The new configuration defaults to 10ms.

Reviewers: Jeff Kim <[email protected]>, Justine Olshan <[email protected]>
  • Loading branch information
dajac authored and gongxuanzhang committed Jun 12, 2024
1 parent 007f480 commit ef3158c
Show file tree
Hide file tree
Showing 10 changed files with 1,263 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,18 @@ public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Head
return this.writeLimit >= estimatedBytesWritten() + recordSize;
}

/**
* Check if we have room for a given number of bytes.
*/
public boolean hasRoomFor(int estimatedRecordsSize) {
if (isFull()) return false;
return this.writeLimit >= estimatedBytesWritten() + estimatedRecordsSize;
}

public int maxAllowedBytes() {
return this.writeLimit - this.batchHeaderSizeInBytes;
}

public boolean isClosed() {
return builtRecords != null;
}
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ class BrokerServer(
val serde = new CoordinatorRecordSerde
val groupCoordinatorConfig = new GroupCoordinatorConfig(
config.groupCoordinatorNumThreads,
config.groupCoordinatorAppendLingerMs,
config.consumerGroupSessionTimeoutMs,
config.consumerGroupHeartbeatIntervalMs,
config.consumerGroupMaxSize,
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ object KafkaConfig {
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
ValidList.in(Utils.enumOptions(classOf[GroupType]):_*), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DOC)
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
// Internal configuration used by integration and system tests.
.defineInternal(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DOC)

Expand Down Expand Up @@ -965,6 +966,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val isNewGroupCoordinatorEnabled = getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) ||
groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER)
val groupCoordinatorNumThreads = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG)
val groupCoordinatorAppendLingerMs = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG)

/** Consumer group configs */
val consumerGroupSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public class GroupCoordinatorConfig {
Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(",")) + ". " +
"The " + Group.GroupType.CONSUMER + " rebalance protocol is in early access and therefore must not be used in production.";
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = Collections.singletonList(Group.GroupType.CLASSIC.toString());
public final static String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms";
public final static String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " +
"wait for writes to accumulate before flushing them to disk. Transactional writes are not accumulated.";
public final static int GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT = 10;

public static final String GROUP_COORDINATOR_NUM_THREADS_CONFIG = "group.coordinator.threads";
public static final String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number of threads used by the group coordinator.";
Expand Down Expand Up @@ -164,6 +168,12 @@ public class GroupCoordinatorConfig {
*/
public final int numThreads;

/**
* The duration in milliseconds that the coordinator will wait for writes to
* accumulate before flushing them to disk.
*/
public final int appendLingerMs;

/**
* The consumer group session timeout in milliseconds.
*/
Expand Down Expand Up @@ -259,6 +269,7 @@ public class GroupCoordinatorConfig {

public GroupCoordinatorConfig(
int numThreads,
int appendLingerMs,
int consumerGroupSessionTimeoutMs,
int consumerGroupHeartbeatIntervalMs,
int consumerGroupMaxSize,
Expand All @@ -277,6 +288,7 @@ public GroupCoordinatorConfig(
CompressionType compressionType
) {
this.numThreads = numThreads;
this.appendLingerMs = appendLingerMs;
this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs;
this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
this.consumerGroupMaxSize = consumerGroupMaxSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ public GroupCoordinatorService build() {
.withPartitionWriter(writer)
.withLoader(loader)
.withCoordinatorShardBuilderSupplier(supplier)
.withTime(time)
.withDefaultWriteTimeOut(Duration.ofMillis(config.offsetCommitTimeoutMs))
.withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics)
.withCoordinatorMetrics(groupCoordinatorMetrics)
.withSerializer(new CoordinatorRecordSerde())
.withCompression(Compression.of(config.compressionType).build())
.withAppendLingerMs(config.appendLingerMs)
.build();

return new GroupCoordinatorService(
Expand Down
Loading

0 comments on commit ef3158c

Please sign in to comment.