Skip to content

Commit

Permalink
MINOR: Prevent consumer protocol to be used in ZK mode (apache#16121)
Browse files Browse the repository at this point in the history
This patch disallows enabling the new consumer rebalance protocol in ZK mode.

Reviewers: Chia-Ping Tsai <[email protected]>, Justine Olshan <[email protected]>
  • Loading branch information
dajac authored May 30, 2024
1 parent 131ce0b commit 2a6078a
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 4 deletions.
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -934,8 +934,11 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' protocol is not supported.")
}
if (protocols.contains(GroupType.CONSUMER)) {
if (processRoles.isEmpty) {
throw new ConfigException(s"The new '${GroupType.CONSUMER}' rebalance protocol is only supported in KRaft cluster.")
}
warn(s"The new '${GroupType.CONSUMER}' rebalance protocol is enabled along with the new group coordinator. " +
"This is part of the early access of KIP-848 and MUST NOT be used in production.")
"This is part of the preview of KIP-848 and MUST NOT be used in production.")
}
protocols
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,18 @@ public void testDefaults(ClusterInstance clusterInstance) {
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
}),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
}),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
}),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
@ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
@ClusterConfigProperty(key = GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
}, tags = {"disable-new-coordinator-and-enable-new-consumer-rebalance-coordinator"}),
Expand Down
6 changes: 6 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,12 @@ class KafkaConfigTest {
@Test
def testGroupCoordinatorRebalanceProtocols(): Unit = {
val props = new Properties()

// consumer cannot be enabled in ZK mode.
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))

// Setting KRaft's properties.
props.putAll(kraftProps())

// Only classic and consumer are supported.
Expand Down

0 comments on commit 2a6078a

Please sign in to comment.