diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 6a14b37cfffe1..aa07ce89203dc 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -314,7 +314,7 @@ private void updateLeaderEndOffsetAndTimestamp( ) { final LogOffsetMetadata endOffsetMetadata = log.endOffset(); - if (state.updateLocalState(endOffsetMetadata)) { + if (state.updateLocalState(endOffsetMetadata, partitionState.lastVoterSet().voterIds())) { onUpdateLeaderHighWatermark(state, currentTimeMs); } diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index df4cc1315ac52..13a6d106345d8 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -32,6 +32,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -243,8 +244,9 @@ private boolean maybeUpdateHighWatermark() { ); return true; } else if (highWatermarkUpdateOffset < currentHighWatermarkMetadata.offset) { - log.error("The latest computed high watermark {} is smaller than the current " + - "value {}, which suggests that one of the voters has lost committed data. " + + log.info("The latest computed high watermark {} is smaller than the current " + + "value {}, which should only happen when voter set membership changes. If the voter " + + "set has not changed this suggests that one of the voters has lost committed data. " + "Full voter replication state: {}", highWatermarkUpdateOffset, currentHighWatermarkMetadata.offset, voterStates.values()); return false; @@ -296,10 +298,12 @@ private void logHighWatermarkUpdate( * Update the local replica state. * * @param endOffsetMetadata updated log end offset of local replica + * @param lastVoterSet the up-to-date voter set * @return true if the high watermark is updated as a result of this call */ public boolean updateLocalState( - LogOffsetMetadata endOffsetMetadata + LogOffsetMetadata endOffsetMetadata, + Set lastVoterSet ) { ReplicaState state = getOrCreateReplicaState(localId); state.endOffset.ifPresent(currentEndOffset -> { @@ -308,7 +312,8 @@ public boolean updateLocalState( "end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset); } }); - state.updateLeaderState(endOffsetMetadata); + state.updateLeaderEndOffset(endOffsetMetadata); + updateVoterAndObserverStates(lastVoterSet); return maybeUpdateHighWatermark(); } @@ -341,9 +346,7 @@ public boolean updateReplicaState( state.nodeId, currentEndOffset.offset, fetchOffsetMetadata.offset); } }); - - Optional leaderEndOffsetOpt = - voterStates.get(localId).endOffset; + Optional leaderEndOffsetOpt = getOrCreateReplicaState(localId).endOffset; state.updateFollowerState( currentTimeMs, @@ -435,9 +438,13 @@ private DescribeQuorumResponseData.ReplicaState describeReplicaState( } + /** + * Clear observer states that have not been active for a while and are not the leader. + */ private void clearInactiveObservers(final long currentTimeMs) { observerStates.entrySet().removeIf(integerReplicaStateEntry -> - currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS + currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS && + integerReplicaStateEntry.getKey() != localId ); } @@ -445,6 +452,26 @@ private boolean isVoter(int remoteNodeId) { return voterStates.containsKey(remoteNodeId); } + private void updateVoterAndObserverStates(Set lastVoterSet) { + // Move any replica that is not in the last voter set from voterStates to observerStates + for (Iterator> iter = voterStates.entrySet().iterator(); iter.hasNext(); ) { + Map.Entry replica = iter.next(); + if (!lastVoterSet.contains(replica.getKey())) { + observerStates.put(replica.getKey(), replica.getValue()); + iter.remove(); + } + } + + // Add replicas that are in the last voter set and not in voterStates to voterStates (from observerStates + // if they exist) + for (int voterId : lastVoterSet) { + if (!voterStates.containsKey(voterId)) { + Optional existingObserverState = Optional.ofNullable(observerStates.remove(voterId)); + voterStates.put(voterId, existingObserverState.orElse(new ReplicaState(voterId, false))); + } + } + } + private static class ReplicaState implements Comparable { final int nodeId; Optional endOffset; @@ -462,7 +489,7 @@ public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) { this.hasAcknowledgedLeader = hasAcknowledgedLeader; } - void updateLeaderState( + void updateLeaderEndOffset( LogOffsetMetadata endOffsetMetadata ) { // For the leader, we only update the end offset. The remaining fields diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index e8fd1bb9ff365..6d31154ccf233 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -105,25 +105,27 @@ public void testNonFollowerAcknowledgement() { @Test public void testUpdateHighWatermarkQuorumSizeOne() { - LeaderState state = newLeaderState(singleton(localId), 15L); + Set voterSet = singleton(localId); + LeaderState state = newLeaderState(voterSet, 15L); assertEquals(Optional.empty(), state.highWatermark()); - assertFalse(state.updateLocalState(new LogOffsetMetadata(15L))); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), voterSet)); assertEquals(emptySet(), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); - assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); + assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), voterSet)); assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); - assertTrue(state.updateLocalState(new LogOffsetMetadata(20))); + assertTrue(state.updateLocalState(new LogOffsetMetadata(20), voterSet)); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); } @Test public void testNonMonotonicLocalEndOffsetUpdate() { - LeaderState state = newLeaderState(singleton(localId), 15L); + Set voterSet = singleton(localId); + LeaderState state = newLeaderState(voterSet, 15L); assertEquals(Optional.empty(), state.highWatermark()); - assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); + assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), voterSet)); assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); assertThrows(IllegalStateException.class, - () -> state.updateLocalState(new LogOffsetMetadata(15L))); + () -> state.updateLocalState(new LogOffsetMetadata(15L), voterSet)); } @Test @@ -133,14 +135,15 @@ public void testLastCaughtUpTimeVoters() { int currentTime = 1000; int fetchTime = 0; int caughtUpTime = -1; - LeaderState state = newLeaderState(mkSet(localId, node1, node2), 10L); + Set voterSet = mkSet(localId, node1, node2); + LeaderState state = newLeaderState(voterSet, 10L); assertEquals(Optional.empty(), state.highWatermark()); - assertFalse(state.updateLocalState(new LogOffsetMetadata(10L))); + assertFalse(state.updateLocalState(new LogOffsetMetadata(10L), voterSet)); assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); // Node 1 falls behind - assertFalse(state.updateLocalState(new LogOffsetMetadata(11L))); + assertFalse(state.updateLocalState(new LogOffsetMetadata(11L), voterSet)); assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L))); assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); @@ -152,14 +155,14 @@ public void testLastCaughtUpTimeVoters() { assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); // Node 1 falls behind - assertFalse(state.updateLocalState(new LogOffsetMetadata(100L))); + assertFalse(state.updateLocalState(new LogOffsetMetadata(100L), voterSet)); assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L))); assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); assertEquals(caughtUpTime, describeVoterState(state, node1, currentTime).lastCaughtUpTimestamp()); // Node 1 catches up to the last fetch offset int prevFetchTime = fetchTime; - assertFalse(state.updateLocalState(new LogOffsetMetadata(200L))); + assertFalse(state.updateLocalState(new LogOffsetMetadata(200L), voterSet)); assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(100L))); caughtUpTime = prevFetchTime; assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); @@ -167,7 +170,7 @@ public void testLastCaughtUpTimeVoters() { // Node2 has never caught up to leader assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); - assertFalse(state.updateLocalState(new LogOffsetMetadata(300L))); + assertFalse(state.updateLocalState(new LogOffsetMetadata(300L), voterSet)); assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(200L))); assertEquals(-1L, describeVoterState(state, node2, currentTime).lastCaughtUpTimestamp()); assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(250L))); @@ -180,12 +183,13 @@ public void testLastCaughtUpTimeObserver() { int currentTime = 1000; int fetchTime = 0; int caughtUpTime = -1; - LeaderState state = newLeaderState(singleton(localId), 5L); + Set voterSet = singleton(localId); + LeaderState state = newLeaderState(voterSet, 5L); assertEquals(Optional.empty(), state.highWatermark()); assertEquals(emptySet(), state.nonAcknowledgingVoters()); // Node 1 falls behind - assertTrue(state.updateLocalState(new LogOffsetMetadata(11L))); + assertTrue(state.updateLocalState(new LogOffsetMetadata(11L), voterSet)); assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L))); assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); @@ -197,14 +201,14 @@ public void testLastCaughtUpTimeObserver() { assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); // Node 1 falls behind - assertTrue(state.updateLocalState(new LogOffsetMetadata(100L))); + assertTrue(state.updateLocalState(new LogOffsetMetadata(100L), voterSet)); assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L))); assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); assertEquals(caughtUpTime, describeObserverState(state, node1, currentTime).lastCaughtUpTimestamp()); // Node 1 catches up to the last fetch offset int prevFetchTime = fetchTime; - assertTrue(state.updateLocalState(new LogOffsetMetadata(200L))); + assertTrue(state.updateLocalState(new LogOffsetMetadata(200L), voterSet)); assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(102L))); caughtUpTime = prevFetchTime; assertEquals(currentTime, describeVoterState(state, localId, currentTime).lastCaughtUpTimestamp()); @@ -219,32 +223,35 @@ public void testLastCaughtUpTimeObserver() { @Test public void testIdempotentEndOffsetUpdate() { - LeaderState state = newLeaderState(singleton(localId), 15L); + Set voterSet = singleton(localId); + LeaderState state = newLeaderState(voterSet, 15L); assertEquals(Optional.empty(), state.highWatermark()); - assertTrue(state.updateLocalState(new LogOffsetMetadata(16L))); - assertFalse(state.updateLocalState(new LogOffsetMetadata(16L))); + assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), voterSet)); + assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), voterSet)); assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); } @Test public void testUpdateHighWatermarkMetadata() { - LeaderState state = newLeaderState(singleton(localId), 15L); + Set voterSet = singleton(localId); + LeaderState state = newLeaderState(voterSet, 15L); assertEquals(Optional.empty(), state.highWatermark()); LogOffsetMetadata initialHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("bar"))); - assertTrue(state.updateLocalState(initialHw)); + assertTrue(state.updateLocalState(initialHw, voterSet)); assertEquals(Optional.of(initialHw), state.highWatermark()); LogOffsetMetadata updateHw = new LogOffsetMetadata(16L, Optional.of(new MockOffsetMetadata("baz"))); - assertTrue(state.updateLocalState(updateHw)); + assertTrue(state.updateLocalState(updateHw, voterSet)); assertEquals(Optional.of(updateHw), state.highWatermark()); } @Test public void testUpdateHighWatermarkQuorumSizeTwo() { int otherNodeId = 1; - LeaderState state = newLeaderState(mkSet(localId, otherNodeId), 10L); - assertFalse(state.updateLocalState(new LogOffsetMetadata(13L))); + Set voterSet = mkSet(localId, otherNodeId); + LeaderState state = newLeaderState(voterSet, 10L); + assertFalse(state.updateLocalState(new LogOffsetMetadata(13L), voterSet)); assertEquals(singleton(otherNodeId), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); assertFalse(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(10L))); @@ -260,8 +267,9 @@ public void testUpdateHighWatermarkQuorumSizeTwo() { public void testUpdateHighWatermarkQuorumSizeThree() { int node1 = 1; int node2 = 2; - LeaderState state = newLeaderState(mkSet(localId, node1, node2), 10L); - assertFalse(state.updateLocalState(new LogOffsetMetadata(15L))); + Set voterSet = mkSet(localId, node1, node2); + LeaderState state = newLeaderState(voterSet, 10L); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), voterSet)); assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(10L))); @@ -272,7 +280,7 @@ public void testUpdateHighWatermarkQuorumSizeThree() { assertEquals(Optional.empty(), state.highWatermark()); assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); - assertFalse(state.updateLocalState(new LogOffsetMetadata(20L))); + assertFalse(state.updateLocalState(new LogOffsetMetadata(20L), voterSet)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(20L))); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); @@ -280,12 +288,135 @@ public void testUpdateHighWatermarkQuorumSizeThree() { assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); } + @Test + public void testHighWatermarkDoesIncreaseFromNewVoter() { + int node1 = 1; + int node2 = 2; + Set originalVoterSet = mkSet(localId, node1); + LeaderState state = newLeaderState(originalVoterSet, 5L); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet)); + assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(10L))); + assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark()); + + // updating replica state of node2 before it joins voterSet should not increase HW to 15L + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); + assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark()); + + // adding node2 to voterSet will cause HW to increase to 15L + Set voterSetWithNode2 = mkSet(localId, node1, node2); + assertTrue(state.updateLocalState(new LogOffsetMetadata(15L), voterSetWithNode2)); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW will not update to 16L until a majority reaches it + assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), voterSetWithNode2)); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(16L))); + assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); + } + + @Test + public void testHighWatermarkDoesNotDecreaseFromNewVoter() { + int node1 = 1; + int node2 = 2; + int node3 = 3; + // start with three voters with HW at 15L + Set originalVoterSet = mkSet(localId, node1, node2); + LeaderState state = newLeaderState(originalVoterSet, 5L); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet)); + assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(15L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L))); + + // updating replica state of node3 before it joins voterSet + assertFalse(state.updateReplicaState(node3, 0, new LogOffsetMetadata(10L))); + + // adding node3 to voterSet should not cause HW to decrease even if majority is < HW + Set voterSetWithNode3 = mkSet(localId, node1, node2, node3); + assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), voterSetWithNode3)); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW will not decrease if calculated HW is anything lower than the last HW + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(13L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateReplicaState(node3, 0, new LogOffsetMetadata(13L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(16L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW will update to 16L once a majority of the voterSet is at least 16L + assertTrue(state.updateReplicaState(node3, 0, new LogOffsetMetadata(16L))); + assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); + } + + @Test + public void testUpdateHighWatermarkRemovingFollowerFromVoterStates() { + int node1 = 1; + int node2 = 2; + Set originalVoterSet = mkSet(localId, node1, node2); + LeaderState state = newLeaderState(originalVoterSet, 10L); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet)); + assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(15L))); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // removing node1 should not decrement HW to 10L + Set voterSetWithoutNode1 = mkSet(localId, node2); + assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), voterSetWithoutNode1)); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW cannot change until after node2 catches up to last HW + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(14L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateLocalState(new LogOffsetMetadata(18L), voterSetWithoutNode1)); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(18L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW should update to 16L + assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(16L))); + assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); + } + + @Test + public void testUpdateHighWatermarkQuorumRemovingLeaderFromVoterStates() { + int node1 = 1; + int node2 = 2; + Set originalVoterSet = mkSet(localId, node1, node2); + LeaderState state = newLeaderState(originalVoterSet, 10L); + assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), originalVoterSet)); + assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(15L))); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // removing leader should not decrement HW to 10L + Set voterSetWithoutLeader = mkSet(node1, node2); + assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), voterSetWithoutLeader)); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW cannot change until node2 catches up to last HW + assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(16L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateLocalState(new LogOffsetMetadata(18L), voterSetWithoutLeader)); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(14L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); + assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); + + // HW will not update to 16L until majority of remaining voterSet (node1, node2) are at least 16L + assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(16L))); + assertEquals(Optional.of(new LogOffsetMetadata(16L)), state.highWatermark()); + } + @Test public void testNonMonotonicHighWatermarkUpdate() { MockTime time = new MockTime(); int node1 = 1; - LeaderState state = newLeaderState(mkSet(localId, node1), 0L); - state.updateLocalState(new LogOffsetMetadata(10L)); + Set voterSet = mkSet(localId, node1); + LeaderState state = newLeaderState(voterSet, 0L); + state.updateLocalState(new LogOffsetMetadata(10L), voterSet); state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(10L)); assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark()); @@ -315,7 +446,8 @@ public void testDescribeQuorumWithSingleVoter() { long leaderStartOffset = 10L; long leaderEndOffset = 15L; - LeaderState state = newLeaderState(mkSet(localId), leaderStartOffset); + Set voterSet = singleton(localId); + LeaderState state = newLeaderState(voterSet, leaderStartOffset); // Until we have updated local state, high watermark should be uninitialized assertEquals(Optional.empty(), state.highWatermark()); @@ -334,7 +466,7 @@ public void testDescribeQuorumWithSingleVoter() { // Now update the high watermark and verify the describe output - assertTrue(state.updateLocalState(new LogOffsetMetadata(leaderEndOffset))); + assertTrue(state.updateLocalState(new LogOffsetMetadata(leaderEndOffset), voterSet)); assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark()); time.sleep(500); @@ -361,8 +493,9 @@ public void testDescribeQuorumWithMultipleVoters() { long leaderStartOffset = 10L; long leaderEndOffset = 15L; - LeaderState state = newLeaderState(mkSet(localId, activeFollowerId, inactiveFollowerId), leaderStartOffset); - assertFalse(state.updateLocalState(new LogOffsetMetadata(leaderEndOffset))); + Set voterSet = mkSet(localId, activeFollowerId, inactiveFollowerId); + LeaderState state = newLeaderState(voterSet, leaderStartOffset); + assertFalse(state.updateLocalState(new LogOffsetMetadata(leaderEndOffset), voterSet)); assertEquals(Optional.empty(), state.highWatermark()); long activeFollowerFetchTimeMs = time.milliseconds(); @@ -412,8 +545,9 @@ private LeaderState setUpLeaderAndFollowers(int follower1, int follower2, long leaderStartOffset, long leaderEndOffset) { - LeaderState state = newLeaderState(mkSet(localId, follower1, follower2), leaderStartOffset); - state.updateLocalState(new LogOffsetMetadata(leaderEndOffset)); + Set voterSet = mkSet(localId, follower1, follower2); + LeaderState state = newLeaderState(voterSet, leaderStartOffset); + state.updateLocalState(new LogOffsetMetadata(leaderEndOffset), voterSet); assertEquals(Optional.empty(), state.highWatermark()); state.updateReplicaState(follower1, 0, new LogOffsetMetadata(leaderStartOffset)); state.updateReplicaState(follower2, 0, new LogOffsetMetadata(leaderEndOffset)); @@ -426,8 +560,9 @@ public void testDescribeQuorumWithObservers() { int observerId = 10; long epochStartOffset = 10L; - LeaderState state = newLeaderState(mkSet(localId), epochStartOffset); - assertTrue(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 1))); + Set voterSet = singleton(localId); + LeaderState state = newLeaderState(voterSet, epochStartOffset); + assertTrue(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 1), voterSet)); assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), state.highWatermark()); time.sleep(500); @@ -455,6 +590,110 @@ public void testDescribeQuorumWithObservers() { observerState); } + @Test + public void testDescribeQuorumWithVotersAndObservers() { + MockTime time = new MockTime(); + int leader = localId; + int node1 = 1; + int node2 = 2; + long epochStartOffset = 10L; + + Set voterSet = mkSet(leader, node1, node2); + LeaderState state = newLeaderState(voterSet, epochStartOffset); + assertFalse(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 1), voterSet)); + assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(epochStartOffset + 1))); + assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), state.highWatermark()); + + // node1 becomes an observer + long fetchTimeMs = time.milliseconds(); + assertFalse(state.updateReplicaState(node1, fetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1))); + Set voterSetWithoutNode1 = mkSet(leader, node2); + state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 5), voterSetWithoutNode1); + + + time.sleep(500); + DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(epochStartOffset + 1, partitionData.highWatermark()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(epoch, partitionData.leaderEpoch()); + DescribeQuorumResponseData.ReplicaState observer = partitionData.observers().get(0); + assertEquals(node1, observer.replicaId()); + assertEquals(epochStartOffset + 1, observer.logEndOffset()); + assertEquals(2, partitionData.currentVoters().size()); + + // node1 catches up with leader, HW should not change + time.sleep(500); + fetchTimeMs = time.milliseconds(); + assertFalse(state.updateReplicaState(node1, fetchTimeMs, new LogOffsetMetadata(epochStartOffset + 5))); + assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), state.highWatermark()); + + // node1 becomes a voter again, HW should change + assertTrue(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 10), voterSet)); + + time.sleep(500); + partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(epochStartOffset + 5, partitionData.highWatermark()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(epoch, partitionData.leaderEpoch()); + assertEquals(Collections.emptyList(), partitionData.observers()); + assertEquals(3, partitionData.currentVoters().size()); + DescribeQuorumResponseData.ReplicaState node1State = partitionData.currentVoters().stream() + .filter(replicaState -> replicaState.replicaId() == node1) + .findFirst().get(); + assertEquals(epochStartOffset + 5, node1State.logEndOffset()); + assertEquals(fetchTimeMs, node1State.lastFetchTimestamp()); + } + + @Test + public void testClearInactiveObserversIgnoresLeader() { + MockTime time = new MockTime(); + int followerId = 1; + int observerId = 10; + long epochStartOffset = 10L; + + Set voterSet = mkSet(localId, followerId); + LeaderState state = newLeaderState(voterSet, epochStartOffset); + assertFalse(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 1), voterSet)); + assertTrue(state.updateReplicaState(followerId, time.milliseconds(), new LogOffsetMetadata(epochStartOffset + 1))); + + // observer is returned since its lastFetchTimestamp is within OBSERVER_SESSION_TIMEOUT_MS + time.sleep(500); + long observerFetchTimeMs = time.milliseconds(); + assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1))); + + time.sleep(500); + DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(epochStartOffset + 1, partitionData.highWatermark()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(2, partitionData.currentVoters().size()); + assertEquals(1, partitionData.observers().size()); + assertEquals(observerId, partitionData.observers().get(0).replicaId()); + + // observer is not returned once its lastFetchTimestamp surpasses OBSERVER_SESSION_TIMEOUT_MS + time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS); + partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(epochStartOffset + 1, partitionData.highWatermark()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(2, partitionData.currentVoters().size()); + assertEquals(0, partitionData.observers().size()); + + // leader becomes observer + Set voterSetWithoutLeader = singleton(followerId); + assertFalse(state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 10), voterSetWithoutLeader)); + + // leader should be returned in describe quorum output + time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS); + long describeQuorumCalledTime = time.milliseconds(); + partitionData = state.describeQuorum(describeQuorumCalledTime); + assertEquals(epochStartOffset + 1, partitionData.highWatermark()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(1, partitionData.currentVoters().size()); + assertEquals(1, partitionData.observers().size()); + DescribeQuorumResponseData.ReplicaState observer = partitionData.observers().get(0); + assertEquals(localId, observer.replicaId()); + assertEquals(describeQuorumCalledTime, observer.lastFetchTimestamp()); + } + @Test public void testCheckQuorum() { int node1 = 1; diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index 240a55d44030a..ce59d587b9dd5 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -147,7 +147,7 @@ public void shouldRecordVoterQuorumState(short kraftVersion) { assertEquals((double) 1, getMetric(metrics, "current-epoch").metricValue()); assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); - state.leaderStateOrThrow().updateLocalState(new LogOffsetMetadata(5L)); + state.leaderStateOrThrow().updateLocalState(new LogOffsetMetadata(5L), voters.voterIds()); state.leaderStateOrThrow().updateReplicaState(1, 0, new LogOffsetMetadata(5L)); assertEquals((double) 5L, getMetric(metrics, "high-watermark").metricValue());