diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 7ec147e9e3ced..76bfe7e91a149 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -198,7 +198,10 @@ public class ConsumerConfig extends AbstractConfig {
* fetch.max.wait.ms
*/
public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
- private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.";
+ private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before " +
+ "answering the fetch request there isn't sufficient data to immediately satisfy the requirement given by " +
+ "fetch.min.bytes. This config is used only for local log fetch. To tune the remote fetch maximum wait " +
+ "time, please refer to 'remote.fetch.max.wait.ms' broker config";
public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500;
/** metadata.max.age.ms
*/
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
index 00d6afb89ffe2..58a866aa4a63f 100644
--- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@@ -35,12 +35,13 @@ import scala.collection._
class DelayedRemoteFetch(remoteFetchTask: Future[Void],
remoteFetchResult: CompletableFuture[RemoteLogReadResult],
remoteFetchInfo: RemoteStorageFetchInfo,
+ remoteFetchMaxWaitMs: Long,
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)],
fetchParams: FetchParams,
localReadResults: Seq[(TopicIdPartition, LogReadResult)],
replicaManager: ReplicaManager,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit)
- extends DelayedOperation(fetchParams.maxWaitMs) {
+ extends DelayedOperation(remoteFetchMaxWaitMs) {
if (fetchParams.isFromFollower) {
throw new IllegalStateException(s"The follower should not invoke remote fetch. Fetch params are: $fetchParams")
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index a2a070bcd0331..74023dea1aad2 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1479,7 +1479,8 @@ class ReplicaManager(val config: KafkaConfig,
return Some(createLogReadResult(e))
}
- val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo,
+ val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs()
+ val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs,
fetchPartitionStatus, params, logReadResults, this, responseCallback)
delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
diff --git a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
index c35385bcbc8ca..ea1ffaf0b1179 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
@@ -40,6 +40,7 @@ class DelayedRemoteFetchTest {
private val fetchOffset = 500L
private val logStartOffset = 0L
private val currentLeaderEpoch = Optional.of[Integer](10)
+ private val remoteFetchMaxWaitMs = 500
private val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
@@ -64,8 +65,8 @@ class DelayedRemoteFetchTest {
val leaderLogStartOffset = 10
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)
- val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
- Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+ val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
+ Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
.thenReturn(mock(classOf[Partition]))
@@ -100,8 +101,8 @@ class DelayedRemoteFetchTest {
val leaderLogStartOffset = 10
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)
val fetchParams = buildFetchParams(replicaId = 1, maxWaitMs = 500)
- assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
- Seq(topicIdPartition -> logReadInfo), replicaManager, callback))
+ assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
+ Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback))
}
@Test
@@ -124,8 +125,8 @@ class DelayedRemoteFetchTest {
val logReadInfo = buildReadResult(Errors.NONE)
- val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
- Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+ val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
+ Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
// delayed remote fetch should still be able to complete
assertTrue(delayedRemoteFetch.tryComplete())
@@ -155,8 +156,8 @@ class DelayedRemoteFetchTest {
// build a read result with error
val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)
- val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
- Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+ val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
+ Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
assertTrue(delayedRemoteFetch.tryComplete())
assertTrue(delayedRemoteFetch.isCompleted)
@@ -184,8 +185,8 @@ class DelayedRemoteFetchTest {
val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null, false)
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)
- val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
- Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+ val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, remoteFetchMaxWaitMs,
+ Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
.thenReturn(mock(classOf[Partition]))
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 1add933d788b6..e41cc011c313f 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -54,7 +54,6 @@ public final class RemoteLogManagerConfig {
"implementation. For example this value can be `rlmm.config.`.";
public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX = "rlmm.config.";
-
public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = "remote.log.storage.system.enable";
public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether to enable tiered storage functionality in a broker or not. Valid values " +
"are `true` or `false` and the default value is false. When it is true broker starts all the services required for the tiered storage functionality.";
@@ -185,167 +184,178 @@ public final class RemoteLogManagerConfig {
"The default value is 1 second.";
public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS = 1;
+ public static final String REMOTE_FETCH_MAX_WAIT_MS_PROP = "remote.fetch.max.wait.ms";
+ public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request";
+ public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500;
+
public static final ConfigDef CONFIG_DEF = new ConfigDef();
static {
- CONFIG_DEF.define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
- BOOLEAN,
- DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE,
- null,
- MEDIUM,
- REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC)
- .define(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
- STRING,
- DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX,
- new ConfigDef.NonEmptyString(),
- MEDIUM,
- REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC)
- .define(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
- STRING,
- DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX,
- new ConfigDef.NonEmptyString(),
- MEDIUM,
- REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC)
- .define(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, STRING,
- null,
- new ConfigDef.NonEmptyString(),
- MEDIUM,
- REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC)
- .define(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, STRING,
- null,
- null,
- MEDIUM,
- REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC)
- .define(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
- STRING,
- DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME,
- new ConfigDef.NonEmptyString(),
- MEDIUM,
- REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC)
- .define(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP,
- STRING,
- null,
- null,
- MEDIUM,
- REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC)
- .define(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, STRING,
- null,
- new ConfigDef.NonEmptyString(),
- MEDIUM,
- REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC)
- .define(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
- INT,
- DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES,
- atLeast(0),
- LOW,
- REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
- .define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
- LONG,
- DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
- atLeast(1),
- LOW,
- REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC)
- .define(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP,
- INT,
- DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE,
- atLeast(1),
- MEDIUM,
- REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC)
- .defineInternal(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
+ CONFIG_DEF
+ .define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
+ BOOLEAN,
+ DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE,
+ null,
+ MEDIUM,
+ REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC)
+ .define(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
+ STRING,
+ DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX,
+ new ConfigDef.NonEmptyString(),
+ MEDIUM,
+ REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC)
+ .define(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
+ STRING,
+ DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX,
+ new ConfigDef.NonEmptyString(),
+ MEDIUM,
+ REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC)
+ .define(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, STRING,
+ null,
+ new ConfigDef.NonEmptyString(),
+ MEDIUM,
+ REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC)
+ .define(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP, STRING,
+ null,
+ null,
+ MEDIUM,
+ REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC)
+ .define(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+ STRING,
+ DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME,
+ new ConfigDef.NonEmptyString(),
+ MEDIUM,
+ REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC)
+ .define(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP,
+ STRING,
+ null,
+ null,
+ MEDIUM,
+ REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_DOC)
+ .define(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, STRING,
+ null,
+ new ConfigDef.NonEmptyString(),
+ MEDIUM,
+ REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC)
+ .define(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
+ INT,
+ DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES,
+ atLeast(0),
+ LOW,
+ REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
+ .define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
+ LONG,
+ DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
+ atLeast(1),
+ LOW,
+ REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC)
+ .define(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP,
+ INT,
+ DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE,
+ atLeast(1),
+ MEDIUM,
+ REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC)
+ .defineInternal(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
INT,
DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
atLeast(1),
MEDIUM,
REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC)
- .defineInternal(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
+ .defineInternal(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
INT,
DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
atLeast(1),
MEDIUM,
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC)
- .define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
- LONG,
- DEFAULT_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS,
- atLeast(1),
- LOW,
- REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC)
- .defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP,
- LONG,
- DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS,
- atLeast(1),
- LOW,
- REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_DOC)
- .defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP,
- LONG,
- DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS,
- atLeast(1), LOW,
- REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_DOC)
- .defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP,
- DOUBLE,
- DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_JITTER,
- between(0, 0.5),
- LOW,
- REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_DOC)
- .define(REMOTE_LOG_READER_THREADS_PROP,
- INT,
- DEFAULT_REMOTE_LOG_READER_THREADS,
- atLeast(1),
- MEDIUM,
- REMOTE_LOG_READER_THREADS_DOC)
- .define(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP,
- INT,
- DEFAULT_REMOTE_LOG_READER_MAX_PENDING_TASKS,
- atLeast(1),
- MEDIUM,
- REMOTE_LOG_READER_MAX_PENDING_TASKS_DOC)
- .define(LOG_LOCAL_RETENTION_MS_PROP,
- LONG,
- DEFAULT_LOG_LOCAL_RETENTION_MS,
- atLeast(DEFAULT_LOG_LOCAL_RETENTION_MS),
- MEDIUM,
- LOG_LOCAL_RETENTION_MS_DOC)
- .define(LOG_LOCAL_RETENTION_BYTES_PROP,
- LONG,
- DEFAULT_LOG_LOCAL_RETENTION_BYTES,
- atLeast(DEFAULT_LOG_LOCAL_RETENTION_BYTES),
- MEDIUM,
- LOG_LOCAL_RETENTION_BYTES_DOC)
- .define(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
- LONG,
- DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
- atLeast(1),
- MEDIUM,
- REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC)
- .define(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP,
- INT,
- DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM,
- atLeast(1),
- MEDIUM,
- REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC)
- .define(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP,
- INT,
- DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS,
- atLeast(1),
- MEDIUM,
- REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC)
- .define(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
- LONG,
- DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
- atLeast(1),
- MEDIUM,
- REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC)
- .define(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP,
- INT,
- DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM,
- atLeast(1),
- MEDIUM,
- REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC)
- .define(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP,
- INT,
- DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS,
- atLeast(1),
- MEDIUM,
- REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_DOC);
+ .define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
+ LONG,
+ DEFAULT_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS,
+ atLeast(1),
+ LOW,
+ REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC)
+ .defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP,
+ LONG,
+ DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS,
+ atLeast(1),
+ LOW,
+ REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_DOC)
+ .defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP,
+ LONG,
+ DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS,
+ atLeast(1), LOW,
+ REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_DOC)
+ .defineInternal(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP,
+ DOUBLE,
+ DEFAULT_REMOTE_LOG_MANAGER_TASK_RETRY_JITTER,
+ between(0, 0.5),
+ LOW,
+ REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_DOC)
+ .define(REMOTE_LOG_READER_THREADS_PROP,
+ INT,
+ DEFAULT_REMOTE_LOG_READER_THREADS,
+ atLeast(1),
+ MEDIUM,
+ REMOTE_LOG_READER_THREADS_DOC)
+ .define(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP,
+ INT,
+ DEFAULT_REMOTE_LOG_READER_MAX_PENDING_TASKS,
+ atLeast(1),
+ MEDIUM,
+ REMOTE_LOG_READER_MAX_PENDING_TASKS_DOC)
+ .define(LOG_LOCAL_RETENTION_MS_PROP,
+ LONG,
+ DEFAULT_LOG_LOCAL_RETENTION_MS,
+ atLeast(DEFAULT_LOG_LOCAL_RETENTION_MS),
+ MEDIUM,
+ LOG_LOCAL_RETENTION_MS_DOC)
+ .define(LOG_LOCAL_RETENTION_BYTES_PROP,
+ LONG,
+ DEFAULT_LOG_LOCAL_RETENTION_BYTES,
+ atLeast(DEFAULT_LOG_LOCAL_RETENTION_BYTES),
+ MEDIUM,
+ LOG_LOCAL_RETENTION_BYTES_DOC)
+ .define(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
+ LONG,
+ DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
+ atLeast(1),
+ MEDIUM,
+ REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC)
+ .define(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP,
+ INT,
+ DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM,
+ atLeast(1),
+ MEDIUM,
+ REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC)
+ .define(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP,
+ INT,
+ DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS,
+ atLeast(1),
+ MEDIUM,
+ REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC)
+ .define(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
+ LONG,
+ DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
+ atLeast(1),
+ MEDIUM,
+ REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC)
+ .define(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP,
+ INT,
+ DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM,
+ atLeast(1),
+ MEDIUM,
+ REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC)
+ .define(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP,
+ INT,
+ DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS,
+ atLeast(1),
+ MEDIUM,
+ REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_DOC)
+ .define(REMOTE_FETCH_MAX_WAIT_MS_PROP,
+ INT,
+ DEFAULT_REMOTE_FETCH_MAX_WAIT_MS,
+ atLeast(1),
+ MEDIUM,
+ REMOTE_FETCH_MAX_WAIT_MS_DOC);
}
private final boolean enableRemoteStorageSystem;
@@ -375,6 +385,7 @@ public final class RemoteLogManagerConfig {
private final long remoteLogManagerFetchMaxBytesPerSecond;
private final int remoteLogManagerFetchNumQuotaSamples;
private final int remoteLogManagerFetchQuotaWindowSizeSeconds;
+ private final int remoteFetchMaxWaitMs;
public RemoteLogManagerConfig(AbstractConfig config) {
this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP),
@@ -407,7 +418,8 @@ public RemoteLogManagerConfig(AbstractConfig config) {
config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP),
config.getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP),
config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP),
- config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP));
+ config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP),
+ config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP));
}
// Visible for testing
@@ -437,8 +449,8 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem,
int remoteLogManagerCopyQuotaWindowSizeSeconds,
long remoteLogManagerFetchMaxBytesPerSecond,
int remoteLogManagerFetchNumQuotaSamples,
- int remoteLogManagerFetchQuotaWindowSizeSeconds
- ) {
+ int remoteLogManagerFetchQuotaWindowSizeSeconds,
+ int remoteFetchMaxWaitMs) {
this.enableRemoteStorageSystem = enableRemoteStorageSystem;
this.remoteStorageManagerClassName = remoteStorageManagerClassName;
this.remoteStorageManagerClassPath = remoteStorageManagerClassPath;
@@ -466,6 +478,7 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem,
this.remoteLogManagerFetchMaxBytesPerSecond = remoteLogManagerFetchMaxBytesPerSecond;
this.remoteLogManagerFetchNumQuotaSamples = remoteLogManagerFetchNumQuotaSamples;
this.remoteLogManagerFetchQuotaWindowSizeSeconds = remoteLogManagerFetchQuotaWindowSizeSeconds;
+ this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
}
public boolean enableRemoteStorageSystem() {
@@ -576,6 +589,9 @@ public int remoteLogManagerFetchQuotaWindowSizeSeconds() {
return remoteLogManagerFetchQuotaWindowSizeSeconds;
}
+ public int remoteFetchMaxWaitMs() {
+ return remoteFetchMaxWaitMs;
+ }
@Override
public boolean equals(Object o) {
@@ -608,20 +624,22 @@ public boolean equals(Object o) {
&& remoteLogManagerCopyQuotaWindowSizeSeconds == that.remoteLogManagerCopyQuotaWindowSizeSeconds
&& remoteLogManagerFetchMaxBytesPerSecond == that.remoteLogManagerFetchMaxBytesPerSecond
&& remoteLogManagerFetchNumQuotaSamples == that.remoteLogManagerFetchNumQuotaSamples
- && remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds;
+ && remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds
+ && remoteFetchMaxWaitMs == that.remoteFetchMaxWaitMs;
}
@Override
public int hashCode() {
- return Objects.hash(enableRemoteStorageSystem, remoteStorageManagerClassName, remoteStorageManagerClassPath,
- remoteLogMetadataManagerClassName, remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName,
- remoteLogMetadataCustomMetadataMaxBytes, remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize,
- remoteLogManagerCopierThreadPoolSize, remoteLogManagerExpirationThreadPoolSize, remoteLogManagerTaskIntervalMs,
- remoteLogManagerTaskRetryBackoffMs, remoteLogManagerTaskRetryBackoffMaxMs, remoteLogManagerTaskRetryJitter,
- remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps,
- remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix, remoteLogManagerCopyMaxBytesPerSecond,
- remoteLogManagerCopyNumQuotaSamples, remoteLogManagerCopyQuotaWindowSizeSeconds, remoteLogManagerFetchMaxBytesPerSecond,
- remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds);
+ return Objects.hash(
+ enableRemoteStorageSystem, remoteStorageManagerClassName, remoteStorageManagerClassPath,
+ remoteLogMetadataManagerClassName, remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName,
+ remoteLogMetadataCustomMetadataMaxBytes, remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize,
+ remoteLogManagerCopierThreadPoolSize, remoteLogManagerExpirationThreadPoolSize, remoteLogManagerTaskIntervalMs,
+ remoteLogManagerTaskRetryBackoffMs, remoteLogManagerTaskRetryBackoffMaxMs, remoteLogManagerTaskRetryJitter,
+ remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps,
+ remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix, remoteLogManagerCopyMaxBytesPerSecond,
+ remoteLogManagerCopyNumQuotaSamples, remoteLogManagerCopyQuotaWindowSizeSeconds, remoteLogManagerFetchMaxBytesPerSecond,
+ remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds, remoteFetchMaxWaitMs);
}
public static void main(String[] args) {
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
index d04c42409b21a..058f81626d158 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
@@ -95,7 +95,8 @@ private static RemoteLogManagerConfig getRemoteLogManagerConfig(boolean useDefau
1,
Long.MAX_VALUE,
11,
- 1);
+ 1,
+ 500);
}
private Map extractProps(RemoteLogManagerConfig remoteLogManagerConfig) {
@@ -189,7 +190,8 @@ public void testHashCodeAndEquals_ForAllAndTwoFields() {
1,
Long.MAX_VALUE,
11,
- 1);
+ 1,
+ 500);
assertNotEquals(config1.hashCode(), config3.hashCode());
assertNotEquals(config1, config3);