From db4727acd8bb7a3cc04cd6693dbc93c049376038 Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Mon, 10 Jun 2024 20:42:12 +0530 Subject: [PATCH] KAFKA-15776: Support added to update remote.fetch.max.wait.ms dynamically (#16203) Reviewers: Satish Duggana , Luke Chen --- .../kafka/server/DynamicBrokerConfig.scala | 3 +- .../main/scala/kafka/server/KafkaConfig.scala | 2 ++ .../scala/kafka/server/ReplicaManager.scala | 3 +- .../server/DynamicBrokerConfigTest.scala | 33 +++++++++++++++++++ .../storage/RemoteLogManagerConfig.java | 4 --- 5 files changed, 38 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index c9bb2e3b4ffda..22576bdceb6fd 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -1203,6 +1203,7 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w object DynamicRemoteLogConfig { val ReconfigurableConfigs = Set( - RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP + RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, + RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP ) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 6473b7a801280..4de4a3ee37f5a 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1211,6 +1211,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def logLocalRetentionMs: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP) + def remoteFetchMaxWaitMs = getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP) + validateValues() @nowarn("cat=deprecation") diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 17e9a5c1b9e79..6ab67a84ec426 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1479,10 +1479,9 @@ class ReplicaManager(val config: KafkaConfig, return Some(createLogReadResult(e)) } - val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs() + val remoteFetchMaxWaitMs = config.remoteFetchMaxWaitMs.toLong val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs, fetchPartitionStatus, params, logReadResults, this, responseCallback) - delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key)) None } diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 328d6e41b5722..2e5d77cdc6fae 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -792,6 +792,39 @@ class DynamicBrokerConfigTest { verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100) } + @Test + def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + val config = KafkaConfig(props) + val kafkaBroker = mock(classOf[KafkaBroker]) + when(kafkaBroker.config).thenReturn(config) + assertEquals(500, config.remoteFetchMaxWaitMs) + + val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker) + config.dynamicConfig.initialize(None, None) + config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig) + + val newProps = new Properties() + newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "30000") + // update default config + config.dynamicConfig.validate(newProps, perBrokerConfig = false) + config.dynamicConfig.updateDefaultConfig(newProps) + assertEquals(30000, config.remoteFetchMaxWaitMs) + + // update per broker config + newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "10000") + config.dynamicConfig.validate(newProps, perBrokerConfig = true) + config.dynamicConfig.updateBrokerConfig(0, newProps) + assertEquals(10000, config.remoteFetchMaxWaitMs) + + // invalid values + for (maxWaitMs <- Seq(-1, 0)) { + newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, maxWaitMs.toString) + assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true)) + assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = false)) + } + } + @Test def testUpdateDynamicRemoteLogManagerConfig(): Unit = { val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 6323ba65bfec7..25f1334226ac8 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -471,10 +471,6 @@ public int remoteLogManagerFetchQuotaWindowSizeSeconds() { return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP); } - public int remoteFetchMaxWaitMs() { - return getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP); - } - public static void main(String[] args) { System.out.println(configDef().toHtml(4, config -> "remote_log_manager_" + config)); }