diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 7ec147e9e3ced..76bfe7e91a149 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -198,7 +198,10 @@ public class ConsumerConfig extends AbstractConfig { * fetch.max.wait.ms */ public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms"; - private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes."; + private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before " + + "answering the fetch request there isn't sufficient data to immediately satisfy the requirement given by " + + "fetch.min.bytes. This config is used only for local log fetch. To tune the remote fetch maximum wait " + + "time, please refer to 'remote.fetch.max.wait.ms' broker config"; public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500; /** metadata.max.age.ms */ diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala index 00d6afb89ffe2..58a866aa4a63f 100644 --- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala @@ -35,12 +35,13 @@ import scala.collection._ class DelayedRemoteFetch(remoteFetchTask: Future[Void], remoteFetchResult: CompletableFuture[RemoteLogReadResult], remoteFetchInfo: RemoteStorageFetchInfo, + remoteFetchMaxWaitMs: Long, fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], fetchParams: FetchParams, localReadResults: Seq[(TopicIdPartition, LogReadResult)], replicaManager: ReplicaManager, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit) - extends DelayedOperation(fetchParams.maxWaitMs) { + extends DelayedOperation(remoteFetchMaxWaitMs) { if (fetchParams.isFromFollower) { throw new IllegalStateException(s"The follower should not invoke remote fetch. Fetch params are: $fetchParams") diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6908ec096d179..17e9a5c1b9e79 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1479,7 +1479,8 @@ class ReplicaManager(val config: KafkaConfig, return Some(createLogReadResult(e)) } - val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, + val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs() + val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs, fetchPartitionStatus, params, logReadResults, this, responseCallback) delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key)) diff --git a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala index c35385bcbc8ca..ea1ffaf0b1179 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala @@ -40,6 +40,7 @@ class DelayedRemoteFetchTest { private val fetchOffset = 500L private val logStartOffset = 0L private val currentLeaderEpoch = Optional.of[Integer](10) + private val remoteFetchMaxWaitMs = 500 private val fetchStatus = FetchPartitionStatus( startOffsetMetadata = new LogOffsetMetadata(fetchOffset), @@ -64,8 +65,8 @@ class DelayedRemoteFetchTest { val leaderLogStartOffset = 10 val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) - val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams, - Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) .thenReturn(mock(classOf[Partition])) @@ -100,8 +101,8 @@ class DelayedRemoteFetchTest { val leaderLogStartOffset = 10 val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) val fetchParams = buildFetchParams(replicaId = 1, maxWaitMs = 500) - assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams, - Seq(topicIdPartition -> logReadInfo), replicaManager, callback)) + assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)) } @Test @@ -124,8 +125,8 @@ class DelayedRemoteFetchTest { val logReadInfo = buildReadResult(Errors.NONE) - val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams, - Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) // delayed remote fetch should still be able to complete assertTrue(delayedRemoteFetch.tryComplete()) @@ -155,8 +156,8 @@ class DelayedRemoteFetchTest { // build a read result with error val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH) - val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams, - Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) assertTrue(delayedRemoteFetch.tryComplete()) assertTrue(delayedRemoteFetch.isCompleted) @@ -184,8 +185,8 @@ class DelayedRemoteFetchTest { val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null, false) val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) - val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams, - Seq(topicIdPartition -> logReadInfo), replicaManager, callback) + val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, remoteFetchMaxWaitMs, + Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback) when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) .thenReturn(mock(classOf[Partition])) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 6ea752886a992..f50d11db263d3 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -54,7 +54,6 @@ public final class RemoteLogManagerConfig { "implementation. For example this value can be `rlmm.config.`."; public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX = "rlmm.config."; - public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = "remote.log.storage.system.enable"; public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether to enable tiered storage functionality in a broker or not. Valid values " + "are `true` or `false` and the default value is false. When it is true broker starts all the services required for the tiered storage functionality."; @@ -175,6 +174,10 @@ public final class RemoteLogManagerConfig { "The default value is 1 second."; public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS = 1; + public static final String REMOTE_FETCH_MAX_WAIT_MS_PROP = "remote.fetch.max.wait.ms"; + public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request"; + public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500; + public static final ConfigDef CONFIG_DEF = new ConfigDef(); static { @@ -323,7 +326,13 @@ public final class RemoteLogManagerConfig { DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, atLeast(1), MEDIUM, - REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_DOC); + REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_DOC) + .define(REMOTE_FETCH_MAX_WAIT_MS_PROP, + INT, + DEFAULT_REMOTE_FETCH_MAX_WAIT_MS, + atLeast(1), + MEDIUM, + REMOTE_FETCH_MAX_WAIT_MS_DOC); } private final boolean enableRemoteStorageSystem; @@ -351,6 +360,7 @@ public final class RemoteLogManagerConfig { private final long remoteLogManagerFetchMaxBytesPerSecond; private final int remoteLogManagerFetchNumQuotaSamples; private final int remoteLogManagerFetchQuotaWindowSizeSeconds; + private final int remoteFetchMaxWaitMs; public RemoteLogManagerConfig(AbstractConfig config) { this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP), @@ -381,7 +391,8 @@ public RemoteLogManagerConfig(AbstractConfig config) { config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP), config.getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP), config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP), - config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP)); + config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP), + config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP)); } // Visible for testing @@ -409,8 +420,8 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem, int remoteLogManagerCopyQuotaWindowSizeSeconds, long remoteLogManagerFetchMaxBytesPerSecond, int remoteLogManagerFetchNumQuotaSamples, - int remoteLogManagerFetchQuotaWindowSizeSeconds - ) { + int remoteLogManagerFetchQuotaWindowSizeSeconds, + int remoteFetchMaxWaitMs) { this.enableRemoteStorageSystem = enableRemoteStorageSystem; this.remoteStorageManagerClassName = remoteStorageManagerClassName; this.remoteStorageManagerClassPath = remoteStorageManagerClassPath; @@ -436,6 +447,7 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem, this.remoteLogManagerFetchMaxBytesPerSecond = remoteLogManagerFetchMaxBytesPerSecond; this.remoteLogManagerFetchNumQuotaSamples = remoteLogManagerFetchNumQuotaSamples; this.remoteLogManagerFetchQuotaWindowSizeSeconds = remoteLogManagerFetchQuotaWindowSizeSeconds; + this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs; } public boolean enableRemoteStorageSystem() { @@ -538,6 +550,9 @@ public int remoteLogManagerFetchQuotaWindowSizeSeconds() { return remoteLogManagerFetchQuotaWindowSizeSeconds; } + public int remoteFetchMaxWaitMs() { + return remoteFetchMaxWaitMs; + } @Override public boolean equals(Object o) { @@ -568,7 +583,8 @@ public boolean equals(Object o) { && remoteLogManagerCopyQuotaWindowSizeSeconds == that.remoteLogManagerCopyQuotaWindowSizeSeconds && remoteLogManagerFetchMaxBytesPerSecond == that.remoteLogManagerFetchMaxBytesPerSecond && remoteLogManagerFetchNumQuotaSamples == that.remoteLogManagerFetchNumQuotaSamples - && remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds; + && remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds + && remoteFetchMaxWaitMs == that.remoteFetchMaxWaitMs; } @Override @@ -580,7 +596,7 @@ public int hashCode() { remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps, remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix, remoteLogManagerCopyMaxBytesPerSecond, remoteLogManagerCopyNumQuotaSamples, remoteLogManagerCopyQuotaWindowSizeSeconds, remoteLogManagerFetchMaxBytesPerSecond, - remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds); + remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds, remoteFetchMaxWaitMs); } public static void main(String[] args) { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index 45fd6669e7d4f..b119571d8e151 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -49,7 +49,7 @@ public void testValidConfigs(boolean useDefaultRemoteLogMetadataManagerClass) { remoteLogMetadataManagerClass, "dummy.remote.log.metadata.class.path", "listener.name", 1024 * 1024L, 1, 60000L, 100L, 60000L, 0.3, 10, 100, 100, rsmPrefix, rsmProps, rlmmPrefix, rlmmProps, Long.MAX_VALUE, 11, 1, - Long.MAX_VALUE, 11, 1); + Long.MAX_VALUE, 11, 1, 500); Map props = extractProps(expectedRemoteLogManagerConfig); rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v));