Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16770; [2/2] Coalesce records into bigger batches #16215

Merged
merged 13 commits into from
Jun 12, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,18 @@ public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Head
return this.writeLimit >= estimatedBytesWritten() + recordSize;
}

/**
* Check if we have room for a given number of bytes.
*/
public boolean hasRoomFor(int estimatedRecordsSize) {
if (isFull()) return false;
return this.writeLimit >= estimatedBytesWritten() + estimatedRecordsSize;
}

public int maxAllowedBytes() {
return this.writeLimit - this.batchHeaderSizeInBytes;
}

public boolean isClosed() {
return builtRecords != null;
}
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ class BrokerServer(
val serde = new CoordinatorRecordSerde
val groupCoordinatorConfig = new GroupCoordinatorConfig(
config.groupCoordinatorNumThreads,
config.groupCoordinatorAppendLingerMs,
config.consumerGroupSessionTimeoutMs,
config.consumerGroupHeartbeatIntervalMs,
config.consumerGroupMaxSize,
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ object KafkaConfig {
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
ValidList.in(Utils.enumOptions(classOf[GroupType]):_*), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DOC)
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
// Internal configuration used by integration and system tests.
.defineInternal(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DOC)

Expand Down Expand Up @@ -948,6 +949,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val isNewGroupCoordinatorEnabled = getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) ||
groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER)
val groupCoordinatorNumThreads = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG)
val groupCoordinatorAppendLingerMs = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG)

/** Consumer group configs */
val consumerGroupSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public class GroupCoordinatorConfig {
Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(",")) + ". " +
"The " + Group.GroupType.CONSUMER + " rebalance protocol is in early access and therefore must not be used in production.";
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = Collections.singletonList(Group.GroupType.CLASSIC.toString());
public final static String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms";
public final static String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " +
dajac marked this conversation as resolved.
Show resolved Hide resolved
"wait for writes to accumulate before flushing them to disk. Transactional writes are not accumulated.";
public final static int GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT = 10;

public final static String GROUP_COORDINATOR_NUM_THREADS_CONFIG = "group.coordinator.threads";
public final static String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number of threads used by the group coordinator.";
Expand Down Expand Up @@ -164,6 +168,12 @@ public class GroupCoordinatorConfig {
*/
public final int numThreads;

/**
* The duration in milliseconds that the coordinator will wait for writes to
* accumulate before flushing them to disk.
*/
public final int appendLingerMs;

/**
* The consumer group session timeout in milliseconds.
*/
Expand Down Expand Up @@ -259,6 +269,7 @@ public class GroupCoordinatorConfig {

public GroupCoordinatorConfig(
int numThreads,
int appendLingerMs,
int consumerGroupSessionTimeoutMs,
int consumerGroupHeartbeatIntervalMs,
int consumerGroupMaxSize,
Expand All @@ -277,6 +288,7 @@ public GroupCoordinatorConfig(
CompressionType compressionType
) {
this.numThreads = numThreads;
this.appendLingerMs = appendLingerMs;
this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs;
this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
this.consumerGroupMaxSize = consumerGroupMaxSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ public GroupCoordinatorService build() {
.withPartitionWriter(writer)
.withLoader(loader)
.withCoordinatorShardBuilderSupplier(supplier)
.withTime(time)
.withDefaultWriteTimeOut(Duration.ofMillis(config.offsetCommitTimeoutMs))
.withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics)
.withCoordinatorMetrics(groupCoordinatorMetrics)
.withSerializer(new CoordinatorRecordSerde())
.withCompression(Compression.of(config.compressionType).build())
.withAppendLingerMs(config.appendLingerMs)
.build();

return new GroupCoordinatorService(
Expand Down
Loading