Skip to content

Commit

Permalink
KAFKA-15776: Introduce remote.fetch.max.timeout.ms to configure Delay…
Browse files Browse the repository at this point in the history
…edRemoteFetch timeout (#14778)

KIP-1018, part1, Introduce remote.fetch.max.timeout.ms to configure DelayedRemoteFetch timeout

Reviewers: Luke Chen <[email protected]>
  • Loading branch information
kamalcph authored and satishd committed Jun 11, 2024
1 parent 69158f6 commit b6848d6
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,10 @@ public class ConsumerConfig extends AbstractConfig {
* <code>fetch.max.wait.ms</code>
*/
public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.";
private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before " +
"answering the fetch request there isn't sufficient data to immediately satisfy the requirement given by " +
"fetch.min.bytes. This config is used only for local log fetch. To tune the remote fetch maximum wait " +
"time, please refer to 'remote.fetch.max.wait.ms' broker config";
public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500;

/** <code>metadata.max.age.ms</code> */
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ import scala.collection._
class DelayedRemoteFetch(remoteFetchTask: Future[Void],
remoteFetchResult: CompletableFuture[RemoteLogReadResult],
remoteFetchInfo: RemoteStorageFetchInfo,
remoteFetchMaxWaitMs: Long,
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)],
fetchParams: FetchParams,
localReadResults: Seq[(TopicIdPartition, LogReadResult)],
replicaManager: ReplicaManager,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit)
extends DelayedOperation(fetchParams.maxWaitMs) {
extends DelayedOperation(remoteFetchMaxWaitMs) {

if (fetchParams.isFromFollower) {
throw new IllegalStateException(s"The follower should not invoke remote fetch. Fetch params are: $fetchParams")
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1479,7 +1479,8 @@ class ReplicaManager(val config: KafkaConfig,
return Some(createLogReadResult(e))
}

val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo,
val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs()
val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs,
fetchPartitionStatus, params, logReadResults, this, responseCallback)

delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class DelayedRemoteFetchTest {
private val fetchOffset = 500L
private val logStartOffset = 0L
private val currentLeaderEpoch = Optional.of[Integer](10)
private val remoteFetchMaxWaitMs = 500

private val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
Expand All @@ -64,8 +65,8 @@ class DelayedRemoteFetchTest {
val leaderLogStartOffset = 10
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)

val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)

when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
.thenReturn(mock(classOf[Partition]))
Expand Down Expand Up @@ -100,8 +101,8 @@ class DelayedRemoteFetchTest {
val leaderLogStartOffset = 10
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)
val fetchParams = buildFetchParams(replicaId = 1, maxWaitMs = 500)
assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
Seq(topicIdPartition -> logReadInfo), replicaManager, callback))
assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback))
}

@Test
Expand All @@ -124,8 +125,8 @@ class DelayedRemoteFetchTest {

val logReadInfo = buildReadResult(Errors.NONE)

val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)

// delayed remote fetch should still be able to complete
assertTrue(delayedRemoteFetch.tryComplete())
Expand Down Expand Up @@ -155,8 +156,8 @@ class DelayedRemoteFetchTest {
// build a read result with error
val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)

val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)

assertTrue(delayedRemoteFetch.tryComplete())
assertTrue(delayedRemoteFetch.isCompleted)
Expand Down Expand Up @@ -184,8 +185,8 @@ class DelayedRemoteFetchTest {
val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null, false)
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)

val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, remoteFetchMaxWaitMs,
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)

when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
.thenReturn(mock(classOf[Partition]))
Expand Down
Loading

0 comments on commit b6848d6

Please sign in to comment.