Skip to content

Commit

Permalink
KAFKA-15776: Support added to update remote.fetch.max.wait.ms dynamic…
Browse files Browse the repository at this point in the history
…ally (#16203)

Reviewers: Satish Duggana <[email protected]>, Luke Chen <[email protected]>
  • Loading branch information
kamalcph authored and satishd committed Jun 11, 2024
1 parent 781b93b commit d94a28b
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 7 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,7 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w

object DynamicRemoteLogConfig {
val ReconfigurableConfigs = Set(
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP
)
}
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami

def logLocalRetentionMs: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP)

def remoteFetchMaxWaitMs = getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP)

validateValues()

@nowarn("cat=deprecation")
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1479,10 +1479,9 @@ class ReplicaManager(val config: KafkaConfig,
return Some(createLogReadResult(e))
}

val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs()
val remoteFetchMaxWaitMs = config.remoteFetchMaxWaitMs.toLong
val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs,
fetchPartitionStatus, params, logReadResults, this, responseCallback)

delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,39 @@ class DynamicBrokerConfigTest {
verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
}

@Test
def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val config = KafkaConfig(props)
val kafkaBroker = mock(classOf[KafkaBroker])
when(kafkaBroker.config).thenReturn(config)
assertEquals(500, config.remoteFetchMaxWaitMs)

val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
config.dynamicConfig.initialize(None, None)
config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig)

val newProps = new Properties()
newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "30000")
// update default config
config.dynamicConfig.validate(newProps, perBrokerConfig = false)
config.dynamicConfig.updateDefaultConfig(newProps)
assertEquals(30000, config.remoteFetchMaxWaitMs)

// update per broker config
newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "10000")
config.dynamicConfig.validate(newProps, perBrokerConfig = true)
config.dynamicConfig.updateBrokerConfig(0, newProps)
assertEquals(10000, config.remoteFetchMaxWaitMs)

// invalid values
for (maxWaitMs <- Seq(-1, 0)) {
newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, maxWaitMs.toString)
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true))
assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = false))
}
}

@Test
def testUpdateDynamicRemoteLogManagerConfig(): Unit = {
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,10 +471,6 @@ public int remoteLogManagerFetchQuotaWindowSizeSeconds() {
return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP);
}

public int remoteFetchMaxWaitMs() {
return getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP);
}

public static void main(String[] args) {
System.out.println(configDef().toHtml(4, config -> "remote_log_manager_" + config));
}
Expand Down

0 comments on commit d94a28b

Please sign in to comment.