From 90025cf9aa03961e4b8c98f9d5c4f74112587445 Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Thu, 16 Nov 2023 15:28:52 +0530 Subject: [PATCH 1/5] KAFKA-15776: Added remote.fetch.max.wait.ms to configure DelayedRemoteFetch timeout KAFKA-15776: Use the FETCH request timeout as the delay timeout for DelayedRemoteFetch DelayedRemoteFetch uses `fetch.max.wait.ms` config as a delay timeout for DelayedRemoteFetchPurgatory. `fetch.max.wait.ms` purpose is to wait for the given amount of time when there is no data available to serve the FETCH request. ``` The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes. ``` Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the user on how to configure optimal value for each purpose. Moreover, the config is of LOW importance and most of the users won't configure it and use the default value of 500 ms. Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to higher number of expired delayed remote fetch requests when the remote storage have any degradation. --- .../kafka/server/DelayedRemoteFetch.scala | 3 +- .../scala/kafka/server/ReplicaManager.scala | 3 +- .../kafka/server/DelayedRemoteFetchTest.scala | 21 +- .../storage/RemoteLogManagerConfig.java | 342 +++++++++--------- .../storage/RemoteLogManagerConfigTest.java | 6 +- 5 files changed, 199 insertions(+), 176 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala index 00d6afb89ffe2..58a866aa4a63f 100644 --- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala @@ -35,12 +35,13 @@ import scala.collection._ class DelayedRemoteFetch(remoteFetchTask: Future[Void], remoteFetchResult: CompletableFuture[RemoteLogReadResult], remoteFetchInfo: RemoteStorageFetchInfo, + remoteFetchMaxWaitMs: Long, fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], fetchParams: FetchParams, localReadResults: Seq[(TopicIdPartition, LogReadResult)], replicaManager: ReplicaManager, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit) - extends DelayedOperation(fetchParams.maxWaitMs) { + extends DelayedOperation(remoteFetchMaxWaitMs) { if (fetchParams.isFromFollower) { throw new IllegalStateException(s"The follower should not invoke remote fetch. Fetch params are: $fetchParams") diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index a2a070bcd0331..74023dea1aad2 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1479,7 +1479,8 @@ class ReplicaManager(val config: KafkaConfig, return Some(createLogReadResult(e)) } - val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, + val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs() + val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs, fetchPartitionStatus, params, logReadResults, this, responseCallback) delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key)) diff --git a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala index c35385bcbc8ca..ea1ffaf0b1179 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala @@ -40,6 +40,7 @@ class DelayedRemoteFetchTest { private val fetchOffset = 500L private val logStartOffset = 0L private val currentLeaderEpoch = Optional.of[Integer](10) + private val remoteFetchMaxWaitMs = 500 private val fetchStatus = FetchPartitionStatus( startOffsetMetadata = new LogOffsetMetadata(fetchOffset), @@ -64,8 +65,8 @@ class DelayedRemoteFetchTest { val leaderLogStartOffset = 10 val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) - val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams, - Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) .thenReturn(mock(classOf[Partition])) @@ -100,8 +101,8 @@ class DelayedRemoteFetchTest { val leaderLogStartOffset = 10 val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) val fetchParams = buildFetchParams(replicaId = 1, maxWaitMs = 500) - assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams, - Seq(topicIdPartition -> logReadInfo), replicaManager, callback)) + assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)) } @Test @@ -124,8 +125,8 @@ class DelayedRemoteFetchTest { val logReadInfo = buildReadResult(Errors.NONE) - val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams, - Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) // delayed remote fetch should still be able to complete assertTrue(delayedRemoteFetch.tryComplete()) @@ -155,8 +156,8 @@ class DelayedRemoteFetchTest { // build a read result with error val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH) - val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams, - Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) assertTrue(delayedRemoteFetch.tryComplete()) assertTrue(delayedRemoteFetch.isCompleted) @@ -184,8 +185,8 @@ class DelayedRemoteFetchTest { val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null, false) val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) - val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams, - Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) .thenReturn(mock(classOf[Partition])) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 1add933d788b6..e41cc011c313f 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -54,7 +54,6 @@ public final class RemoteLogManagerConfig { "implementation. For example this value can be `rlmm.config.`."; public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX = "rlmm.config."; - public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = "remote.log.storage.system.enable"; public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether to enable tiered storage functionality in a broker or not. Valid values " + "are `true` or `false` and the default value is false. When it is true broker starts all the services required for the tiered storage functionality."; @@ -185,167 +184,178 @@ public final class RemoteLogManagerConfig { "The default value is 1 second."; public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS = 1; + public static final String REMOTE_FETCH_MAX_WAIT_MS_PROP = "remote.fetch.max.wait.ms"; + public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request"; + public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500; + public static final ConfigDef CONFIG_DEF = new ConfigDef(); static { - CONFIG_DEF.define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, - BOOLEAN, - DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE, - null, - MEDIUM, - REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC) - .define(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, - STRING, - DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX, - new ConfigDef.NonEmptyString(), - MEDIUM, - REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC) - .define(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, - STRING, - DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX, - new ConfigDef.NonEmptyString(), - MEDIUM, - REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC) - .define(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, STRING, - null, - new ConfigDef.NonEmptyString(), - MEDIUM, - REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC) - .define(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, STRING, - null, - null, - MEDIUM, - REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC) - .define(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, - STRING, - DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME, - new ConfigDef.NonEmptyString(), - MEDIUM, - REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC) - .define(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP, - STRING, - null, - null, - MEDIUM, - REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC) - .define(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, STRING, - null, - new ConfigDef.NonEmptyString(), - MEDIUM, - REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC) - .define(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP, - INT, - DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES, - atLeast(0), - LOW, - REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC) - .define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, - LONG, - DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES, - atLeast(1), - LOW, - REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC) - .define(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP, - INT, - DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, - atLeast(1), - MEDIUM, - REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC) - .defineInternal(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP, + CONFIG_DEF + .define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, + BOOLEAN, + DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE, + null, + MEDIUM, + REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC) + .define(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, + STRING, + DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX, + new ConfigDef.NonEmptyString(), + MEDIUM, + REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC) + .define(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, + STRING, + DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX, + new ConfigDef.NonEmptyString(), + MEDIUM, + REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC) + .define(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, STRING, + null, + new ConfigDef.NonEmptyString(), + MEDIUM, + REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC) + .define(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, STRING, + null, + null, + MEDIUM, + REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC) + .define(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, + STRING, + DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME, + new ConfigDef.NonEmptyString(), + MEDIUM, + REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC) + .define(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP, + STRING, + null, + null, + MEDIUM, + REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC) + .define(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, STRING, + null, + new ConfigDef.NonEmptyString(), + MEDIUM, + REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC) + .define(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP, + INT, + DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES, + atLeast(0), + LOW, + REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC) + .define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, + LONG, + DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES, + atLeast(1), + LOW, + REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC) + .define(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP, + INT, + DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, + atLeast(1), + MEDIUM, + REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC) + .defineInternal(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP, INT, DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE, atLeast(1), MEDIUM, REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC) - .defineInternal(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP, + .defineInternal(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP, INT, DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE, atLeast(1), MEDIUM, REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC) - .define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, - LONG, - DEFAULT_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS, - atLeast(1), - LOW, - REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC) - .defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP, - LONG, - DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS, - atLeast(1), - LOW, - REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_DOC) - .defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP, - LONG, - DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS, - atLeast(1), LOW, - REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_DOC) - .defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP, - DOUBLE, - DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_JITTER, - between(0, 0.5), - LOW, - REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_DOC) - .define(REMOTE_LOG_READER_THREADS_PROP, - INT, - DEFAULT_REMOTE_LOG_READER_THREADS, - atLeast(1), - MEDIUM, - REMOTE_LOG_READER_THREADS_DOC) - .define(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP, - INT, - DEFAULT_REMOTE_LOG_READER_MAX_PENDING_TASKS, - atLeast(1), - MEDIUM, - REMOTE_LOG_READER_MAX_PENDING_TASKS_DOC) - .define(LOG_LOCAL_RETENTION_MS_PROP, - LONG, - DEFAULT_LOG_LOCAL_RETENTION_MS, - atLeast(DEFAULT_LOG_LOCAL_RETENTION_MS), - MEDIUM, - LOG_LOCAL_RETENTION_MS_DOC) - .define(LOG_LOCAL_RETENTION_BYTES_PROP, - LONG, - DEFAULT_LOG_LOCAL_RETENTION_BYTES, - atLeast(DEFAULT_LOG_LOCAL_RETENTION_BYTES), - MEDIUM, - LOG_LOCAL_RETENTION_BYTES_DOC) - .define(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, - LONG, - DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, - atLeast(1), - MEDIUM, - REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC) - .define(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, - INT, - DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, - atLeast(1), - MEDIUM, - REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC) - .define(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, - INT, - DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS, - atLeast(1), - MEDIUM, - REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC) - .define(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, - LONG, - DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, - atLeast(1), - MEDIUM, - REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC) - .define(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, - INT, - DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM, - atLeast(1), - MEDIUM, - REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC) - .define(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, - INT, - DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, - atLeast(1), - MEDIUM, - REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_DOC); + .define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, + LONG, + DEFAULT_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS, + atLeast(1), + LOW, + REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC) + .defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP, + LONG, + DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS, + atLeast(1), + LOW, + REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_DOC) + .defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP, + LONG, + DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS, + atLeast(1), LOW, + REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_DOC) + .defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP, + DOUBLE, + DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_JITTER, + between(0, 0.5), + LOW, + REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_DOC) + .define(REMOTE_LOG_READER_THREADS_PROP, + INT, + DEFAULT_REMOTE_LOG_READER_THREADS, + atLeast(1), + MEDIUM, + REMOTE_LOG_READER_THREADS_DOC) + .define(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP, + INT, + DEFAULT_REMOTE_LOG_READER_MAX_PENDING_TASKS, + atLeast(1), + MEDIUM, + REMOTE_LOG_READER_MAX_PENDING_TASKS_DOC) + .define(LOG_LOCAL_RETENTION_MS_PROP, + LONG, + DEFAULT_LOG_LOCAL_RETENTION_MS, + atLeast(DEFAULT_LOG_LOCAL_RETENTION_MS), + MEDIUM, + LOG_LOCAL_RETENTION_MS_DOC) + .define(LOG_LOCAL_RETENTION_BYTES_PROP, + LONG, + DEFAULT_LOG_LOCAL_RETENTION_BYTES, + atLeast(DEFAULT_LOG_LOCAL_RETENTION_BYTES), + MEDIUM, + LOG_LOCAL_RETENTION_BYTES_DOC) + .define(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, + LONG, + DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, + atLeast(1), + MEDIUM, + REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC) + .define(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, + INT, + DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, + atLeast(1), + MEDIUM, + REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC) + .define(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, + INT, + DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS, + atLeast(1), + MEDIUM, + REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC) + .define(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, + LONG, + DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, + atLeast(1), + MEDIUM, + REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC) + .define(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, + INT, + DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM, + atLeast(1), + MEDIUM, + REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC) + .define(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, + INT, + DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, + atLeast(1), + MEDIUM, + REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_DOC) + .define(REMOTE_FETCH_MAX_WAIT_MS_PROP, + INT, + DEFAULT_REMOTE_FETCH_MAX_WAIT_MS, + atLeast(1), + MEDIUM, + REMOTE_FETCH_MAX_WAIT_MS_DOC); } private final boolean enableRemoteStorageSystem; @@ -375,6 +385,7 @@ public final class RemoteLogManagerConfig { private final long remoteLogManagerFetchMaxBytesPerSecond; private final int remoteLogManagerFetchNumQuotaSamples; private final int remoteLogManagerFetchQuotaWindowSizeSeconds; + private final int remoteFetchMaxWaitMs; public RemoteLogManagerConfig(AbstractConfig config) { this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP), @@ -407,7 +418,8 @@ public RemoteLogManagerConfig(AbstractConfig config) { config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP), config.getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP), config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP), - config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP)); + config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP), + config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP)); } // Visible for testing @@ -437,8 +449,8 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem, int remoteLogManagerCopyQuotaWindowSizeSeconds, long remoteLogManagerFetchMaxBytesPerSecond, int remoteLogManagerFetchNumQuotaSamples, - int remoteLogManagerFetchQuotaWindowSizeSeconds - ) { + int remoteLogManagerFetchQuotaWindowSizeSeconds, + int remoteFetchMaxWaitMs) { this.enableRemoteStorageSystem = enableRemoteStorageSystem; this.remoteStorageManagerClassName = remoteStorageManagerClassName; this.remoteStorageManagerClassPath = remoteStorageManagerClassPath; @@ -466,6 +478,7 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem, this.remoteLogManagerFetchMaxBytesPerSecond = remoteLogManagerFetchMaxBytesPerSecond; this.remoteLogManagerFetchNumQuotaSamples = remoteLogManagerFetchNumQuotaSamples; this.remoteLogManagerFetchQuotaWindowSizeSeconds = remoteLogManagerFetchQuotaWindowSizeSeconds; + this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs; } public boolean enableRemoteStorageSystem() { @@ -576,6 +589,9 @@ public int remoteLogManagerFetchQuotaWindowSizeSeconds() { return remoteLogManagerFetchQuotaWindowSizeSeconds; } + public int remoteFetchMaxWaitMs() { + return remoteFetchMaxWaitMs; + } @Override public boolean equals(Object o) { @@ -608,20 +624,22 @@ public boolean equals(Object o) { && remoteLogManagerCopyQuotaWindowSizeSeconds == that.remoteLogManagerCopyQuotaWindowSizeSeconds && remoteLogManagerFetchMaxBytesPerSecond == that.remoteLogManagerFetchMaxBytesPerSecond && remoteLogManagerFetchNumQuotaSamples == that.remoteLogManagerFetchNumQuotaSamples - && remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds; + && remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds + && remoteFetchMaxWaitMs == that.remoteFetchMaxWaitMs; } @Override public int hashCode() { - return Objects.hash(enableRemoteStorageSystem, remoteStorageManagerClassName, remoteStorageManagerClassPath, - remoteLogMetadataManagerClassName, remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName, - remoteLogMetadataCustomMetadataMaxBytes, remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize, - remoteLogManagerCopierThreadPoolSize, remoteLogManagerExpirationThreadPoolSize, remoteLogManagerTaskIntervalMs, - remoteLogManagerTaskRetryBackoffMs, remoteLogManagerTaskRetryBackoffMaxMs, remoteLogManagerTaskRetryJitter, - remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps, - remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix, remoteLogManagerCopyMaxBytesPerSecond, - remoteLogManagerCopyNumQuotaSamples, remoteLogManagerCopyQuotaWindowSizeSeconds, remoteLogManagerFetchMaxBytesPerSecond, - remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds); + return Objects.hash( + enableRemoteStorageSystem, remoteStorageManagerClassName, remoteStorageManagerClassPath, + remoteLogMetadataManagerClassName, remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName, + remoteLogMetadataCustomMetadataMaxBytes, remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize, + remoteLogManagerCopierThreadPoolSize, remoteLogManagerExpirationThreadPoolSize, remoteLogManagerTaskIntervalMs, + remoteLogManagerTaskRetryBackoffMs, remoteLogManagerTaskRetryBackoffMaxMs, remoteLogManagerTaskRetryJitter, + remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps, + remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix, remoteLogManagerCopyMaxBytesPerSecond, + remoteLogManagerCopyNumQuotaSamples, remoteLogManagerCopyQuotaWindowSizeSeconds, remoteLogManagerFetchMaxBytesPerSecond, + remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds, remoteFetchMaxWaitMs); } public static void main(String[] args) { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index d04c42409b21a..058f81626d158 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -95,7 +95,8 @@ private static RemoteLogManagerConfig getRemoteLogManagerConfig(boolean useDefau 1, Long.MAX_VALUE, 11, - 1); + 1, + 500); } private Map extractProps(RemoteLogManagerConfig remoteLogManagerConfig) { @@ -189,7 +190,8 @@ public void testHashCodeAndEquals_ForAllAndTwoFields() { 1, Long.MAX_VALUE, 11, - 1); + 1, + 500); assertNotEquals(config1.hashCode(), config3.hashCode()); assertNotEquals(config1, config3); From e8d2c6a0ff0b25b9eff358883f36f89d6325e38e Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Tue, 4 Jun 2024 12:59:34 +0530 Subject: [PATCH 2/5] Updated the FETCH_MAX_WAIT_MS_DOC --- .../java/org/apache/kafka/clients/consumer/ConsumerConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 7ec147e9e3ced..ddde535b1290c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -198,7 +198,7 @@ public class ConsumerConfig extends AbstractConfig { * fetch.max.wait.ms */ public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms"; - private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes."; + private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request when it is reading near to the tail of the partition (high-watermark) and there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes."; public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500; /** metadata.max.age.ms */ From b66d67677a9b3dbe3915c9900a0996890945dcfb Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Tue, 4 Jun 2024 18:24:41 +0530 Subject: [PATCH 3/5] Addressed the review comments. --- .../org/apache/kafka/clients/consumer/ConsumerConfig.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index ddde535b1290c..76bfe7e91a149 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -198,7 +198,10 @@ public class ConsumerConfig extends AbstractConfig { * fetch.max.wait.ms */ public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms"; - private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request when it is reading near to the tail of the partition (high-watermark) and there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes."; + private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before " + + "answering the fetch request there isn't sufficient data to immediately satisfy the requirement given by " + + "fetch.min.bytes. This config is used only for local log fetch. To tune the remote fetch maximum wait " + + "time, please refer to 'remote.fetch.max.wait.ms' broker config"; public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500; /** metadata.max.age.ms */ From 9965ff8a5420d51f71f001d1cf2179ba46e63960 Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Tue, 4 Jun 2024 20:27:29 +0530 Subject: [PATCH 4/5] Support added to update the remote.fetch.max.wait.ms config dynamically. --- .../kafka/server/DynamicBrokerConfig.scala | 3 +- .../main/scala/kafka/server/KafkaConfig.scala | 2 ++ .../scala/kafka/server/ReplicaManager.scala | 3 +- .../server/DynamicBrokerConfigTest.scala | 30 +++++++++++++++++++ .../storage/RemoteLogManagerConfig.java | 17 +++-------- .../storage/RemoteLogManagerConfigTest.java | 6 ++-- 6 files changed, 41 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index c9bb2e3b4ffda..22576bdceb6fd 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -1203,6 +1203,7 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w object DynamicRemoteLogConfig { val ReconfigurableConfigs = Set( - RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP + RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, + RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP ) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 45ec15b1008f8..08e8a0d5272c2 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1211,6 +1211,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def logLocalRetentionMs: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP) + def remoteFetchMaxWaitMs = getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP) + validateValues() @nowarn("cat=deprecation") diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 74023dea1aad2..8c4c243214379 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1479,10 +1479,9 @@ class ReplicaManager(val config: KafkaConfig, return Some(createLogReadResult(e)) } - val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs() + val remoteFetchMaxWaitMs = config.remoteFetchMaxWaitMs.asInstanceOf[Long] val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs, fetchPartitionStatus, params, logReadResults, this, responseCallback) - delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key)) None } diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 328d6e41b5722..e685f133fe21c 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -792,6 +792,36 @@ class DynamicBrokerConfigTest { verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100) } + @Test + def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + val config = KafkaConfig(props) + val kafkaBroker = mock(classOf[KafkaBroker]) + when(kafkaBroker.config).thenReturn(config) + + val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker) + config.dynamicConfig.initialize(None, None) + config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig) + + val newProps = new Properties() + newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "30000") + // update default config + config.dynamicConfig.validate(newProps, perBrokerConfig = false) + config.dynamicConfig.updateDefaultConfig(newProps) + assertEquals(30000, config.remoteFetchMaxWaitMs) + + // update per broker config + newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "10000") + config.dynamicConfig.validate(newProps, perBrokerConfig = true) + config.dynamicConfig.updateBrokerConfig(0, newProps) + assertEquals(10000, config.remoteFetchMaxWaitMs) + + // invalid value + newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "-1") + assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true)) + assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = false)) + } + @Test def testUpdateDynamicRemoteLogManagerConfig(): Unit = { val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index e41cc011c313f..bf91b884f838c 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -385,7 +385,6 @@ public final class RemoteLogManagerConfig { private final long remoteLogManagerFetchMaxBytesPerSecond; private final int remoteLogManagerFetchNumQuotaSamples; private final int remoteLogManagerFetchQuotaWindowSizeSeconds; - private final int remoteFetchMaxWaitMs; public RemoteLogManagerConfig(AbstractConfig config) { this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP), @@ -418,8 +417,7 @@ public RemoteLogManagerConfig(AbstractConfig config) { config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP), config.getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP), config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP), - config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP), - config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP)); + config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP)); } // Visible for testing @@ -449,8 +447,7 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem, int remoteLogManagerCopyQuotaWindowSizeSeconds, long remoteLogManagerFetchMaxBytesPerSecond, int remoteLogManagerFetchNumQuotaSamples, - int remoteLogManagerFetchQuotaWindowSizeSeconds, - int remoteFetchMaxWaitMs) { + int remoteLogManagerFetchQuotaWindowSizeSeconds) { this.enableRemoteStorageSystem = enableRemoteStorageSystem; this.remoteStorageManagerClassName = remoteStorageManagerClassName; this.remoteStorageManagerClassPath = remoteStorageManagerClassPath; @@ -478,7 +475,6 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem, this.remoteLogManagerFetchMaxBytesPerSecond = remoteLogManagerFetchMaxBytesPerSecond; this.remoteLogManagerFetchNumQuotaSamples = remoteLogManagerFetchNumQuotaSamples; this.remoteLogManagerFetchQuotaWindowSizeSeconds = remoteLogManagerFetchQuotaWindowSizeSeconds; - this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs; } public boolean enableRemoteStorageSystem() { @@ -589,10 +585,6 @@ public int remoteLogManagerFetchQuotaWindowSizeSeconds() { return remoteLogManagerFetchQuotaWindowSizeSeconds; } - public int remoteFetchMaxWaitMs() { - return remoteFetchMaxWaitMs; - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -624,8 +616,7 @@ public boolean equals(Object o) { && remoteLogManagerCopyQuotaWindowSizeSeconds == that.remoteLogManagerCopyQuotaWindowSizeSeconds && remoteLogManagerFetchMaxBytesPerSecond == that.remoteLogManagerFetchMaxBytesPerSecond && remoteLogManagerFetchNumQuotaSamples == that.remoteLogManagerFetchNumQuotaSamples - && remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds - && remoteFetchMaxWaitMs == that.remoteFetchMaxWaitMs; + && remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds; } @Override @@ -639,7 +630,7 @@ public int hashCode() { remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps, remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix, remoteLogManagerCopyMaxBytesPerSecond, remoteLogManagerCopyNumQuotaSamples, remoteLogManagerCopyQuotaWindowSizeSeconds, remoteLogManagerFetchMaxBytesPerSecond, - remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds, remoteFetchMaxWaitMs); + remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds); } public static void main(String[] args) { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index 058f81626d158..d04c42409b21a 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -95,8 +95,7 @@ private static RemoteLogManagerConfig getRemoteLogManagerConfig(boolean useDefau 1, Long.MAX_VALUE, 11, - 1, - 500); + 1); } private Map extractProps(RemoteLogManagerConfig remoteLogManagerConfig) { @@ -190,8 +189,7 @@ public void testHashCodeAndEquals_ForAllAndTwoFields() { 1, Long.MAX_VALUE, 11, - 1, - 500); + 1); assertNotEquals(config1.hashCode(), config3.hashCode()); assertNotEquals(config1, config3); From 6667b68e61b0125784c5ad65f0c124e9355bd34d Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Wed, 5 Jun 2024 07:41:04 +0530 Subject: [PATCH 5/5] Revert "Support added to update the remote.fetch.max.wait.ms config dynamically." This reverts commit 9965ff8a5420d51f71f001d1cf2179ba46e63960. --- .../kafka/server/DynamicBrokerConfig.scala | 3 +- .../main/scala/kafka/server/KafkaConfig.scala | 2 -- .../scala/kafka/server/ReplicaManager.scala | 3 +- .../server/DynamicBrokerConfigTest.scala | 30 ------------------- .../storage/RemoteLogManagerConfig.java | 17 ++++++++--- .../storage/RemoteLogManagerConfigTest.java | 6 ++-- 6 files changed, 20 insertions(+), 41 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 22576bdceb6fd..c9bb2e3b4ffda 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -1203,7 +1203,6 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w object DynamicRemoteLogConfig { val ReconfigurableConfigs = Set( - RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, - RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP + RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP ) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 08e8a0d5272c2..45ec15b1008f8 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1211,8 +1211,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def logLocalRetentionMs: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP) - def remoteFetchMaxWaitMs = getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP) - validateValues() @nowarn("cat=deprecation") diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8c4c243214379..74023dea1aad2 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1479,9 +1479,10 @@ class ReplicaManager(val config: KafkaConfig, return Some(createLogReadResult(e)) } - val remoteFetchMaxWaitMs = config.remoteFetchMaxWaitMs.asInstanceOf[Long] + val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs() val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs, fetchPartitionStatus, params, logReadResults, this, responseCallback) + delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key)) None } diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index e685f133fe21c..328d6e41b5722 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -792,36 +792,6 @@ class DynamicBrokerConfigTest { verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100) } - @Test - def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = { - val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) - val config = KafkaConfig(props) - val kafkaBroker = mock(classOf[KafkaBroker]) - when(kafkaBroker.config).thenReturn(config) - - val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker) - config.dynamicConfig.initialize(None, None) - config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig) - - val newProps = new Properties() - newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "30000") - // update default config - config.dynamicConfig.validate(newProps, perBrokerConfig = false) - config.dynamicConfig.updateDefaultConfig(newProps) - assertEquals(30000, config.remoteFetchMaxWaitMs) - - // update per broker config - newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "10000") - config.dynamicConfig.validate(newProps, perBrokerConfig = true) - config.dynamicConfig.updateBrokerConfig(0, newProps) - assertEquals(10000, config.remoteFetchMaxWaitMs) - - // invalid value - newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "-1") - assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true)) - assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = false)) - } - @Test def testUpdateDynamicRemoteLogManagerConfig(): Unit = { val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index bf91b884f838c..e41cc011c313f 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -385,6 +385,7 @@ public final class RemoteLogManagerConfig { private final long remoteLogManagerFetchMaxBytesPerSecond; private final int remoteLogManagerFetchNumQuotaSamples; private final int remoteLogManagerFetchQuotaWindowSizeSeconds; + private final int remoteFetchMaxWaitMs; public RemoteLogManagerConfig(AbstractConfig config) { this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP), @@ -417,7 +418,8 @@ public RemoteLogManagerConfig(AbstractConfig config) { config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP), config.getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP), config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP), - config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP)); + config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP), + config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP)); } // Visible for testing @@ -447,7 +449,8 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem, int remoteLogManagerCopyQuotaWindowSizeSeconds, long remoteLogManagerFetchMaxBytesPerSecond, int remoteLogManagerFetchNumQuotaSamples, - int remoteLogManagerFetchQuotaWindowSizeSeconds) { + int remoteLogManagerFetchQuotaWindowSizeSeconds, + int remoteFetchMaxWaitMs) { this.enableRemoteStorageSystem = enableRemoteStorageSystem; this.remoteStorageManagerClassName = remoteStorageManagerClassName; this.remoteStorageManagerClassPath = remoteStorageManagerClassPath; @@ -475,6 +478,7 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem, this.remoteLogManagerFetchMaxBytesPerSecond = remoteLogManagerFetchMaxBytesPerSecond; this.remoteLogManagerFetchNumQuotaSamples = remoteLogManagerFetchNumQuotaSamples; this.remoteLogManagerFetchQuotaWindowSizeSeconds = remoteLogManagerFetchQuotaWindowSizeSeconds; + this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs; } public boolean enableRemoteStorageSystem() { @@ -585,6 +589,10 @@ public int remoteLogManagerFetchQuotaWindowSizeSeconds() { return remoteLogManagerFetchQuotaWindowSizeSeconds; } + public int remoteFetchMaxWaitMs() { + return remoteFetchMaxWaitMs; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -616,7 +624,8 @@ public boolean equals(Object o) { && remoteLogManagerCopyQuotaWindowSizeSeconds == that.remoteLogManagerCopyQuotaWindowSizeSeconds && remoteLogManagerFetchMaxBytesPerSecond == that.remoteLogManagerFetchMaxBytesPerSecond && remoteLogManagerFetchNumQuotaSamples == that.remoteLogManagerFetchNumQuotaSamples - && remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds; + && remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds + && remoteFetchMaxWaitMs == that.remoteFetchMaxWaitMs; } @Override @@ -630,7 +639,7 @@ public int hashCode() { remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps, remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix, remoteLogManagerCopyMaxBytesPerSecond, remoteLogManagerCopyNumQuotaSamples, remoteLogManagerCopyQuotaWindowSizeSeconds, remoteLogManagerFetchMaxBytesPerSecond, - remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds); + remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds, remoteFetchMaxWaitMs); } public static void main(String[] args) { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index d04c42409b21a..058f81626d158 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -95,7 +95,8 @@ private static RemoteLogManagerConfig getRemoteLogManagerConfig(boolean useDefau 1, Long.MAX_VALUE, 11, - 1); + 1, + 500); } private Map extractProps(RemoteLogManagerConfig remoteLogManagerConfig) { @@ -189,7 +190,8 @@ public void testHashCodeAndEquals_ForAllAndTwoFields() { 1, Long.MAX_VALUE, 11, - 1); + 1, + 500); assertNotEquals(config1.hashCode(), config3.hashCode()); assertNotEquals(config1, config3);