Skip to content

Commit

Permalink
MINOR: Raft module Cleanup (apache#16205)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
sjhajharia authored and TaiJuWu committed Jun 8, 2024
1 parent ba8e7e4 commit fc91e93
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private QuorumStateData readStateFromFile(File file) {
if (dataVersion < LOWEST_SUPPORTED_VERSION || dataVersion > HIGHEST_SUPPORTED_VERSION) {
throw new IllegalStateException(
String.format(
"data_version (%d) is not within the min (%d) and max ($d) supported version",
"data_version (%d) is not within the min (%d) and max (%d) supported version",
dataVersion,
LOWEST_SUPPORTED_VERSION,
HIGHEST_SUPPORTED_VERSION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public interface LogHistory<T> {
*/
void clear();

final static class Entry<T> {
final class Entry<T> {
private final long offset;
private final T value;

Expand All @@ -101,9 +101,7 @@ public boolean equals(Object o) {
Entry<?> that = (Entry<?>) o;

if (offset != that.offset) return false;
if (!Objects.equals(value, that.value)) return false;

return true;
return Objects.equals(value, that.value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ public boolean equals(Object o) {
ReplicaKey that = (ReplicaKey) o;

if (id != that.id) return false;
if (!Objects.equals(directoryId, that.directoryId)) return false;

return true;
return Objects.equals(directoryId, that.directoryId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,7 @@ public boolean hasOverlappingMajority(VoterSet that) {
.collect(Collectors.toSet());

if (Utils.diff(HashSet::new, thisReplicaKeys, thatReplicaKeys).size() > 1) return false;
if (Utils.diff(HashSet::new, thatReplicaKeys, thisReplicaKeys).size() > 1) return false;

return true;
return Utils.diff(HashSet::new, thatReplicaKeys, thisReplicaKeys).size() <= 1;
}

@Override
Expand Down Expand Up @@ -314,9 +312,7 @@ public boolean equals(Object o) {

if (!Objects.equals(voterKey, that.voterKey)) return false;
if (!Objects.equals(supportedKRaftVersion, that.supportedKRaftVersion)) return false;
if (!Objects.equals(listeners, that.listeners)) return false;

return true;
return Objects.equals(listeners, that.listeners);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
Expand Down Expand Up @@ -122,7 +123,7 @@ public void testLeaderListenerNotified(boolean entireLog) throws Exception {
// Check that listener was notified of the new snapshot
try (SnapshotReader<String> snapshot = context.listener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, snapshot.snapshotId());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot);
SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot);
}
}

Expand Down Expand Up @@ -164,7 +165,7 @@ public void testFollowerListenerNotified(boolean entireLog) throws Exception {
// Check that listener was notified of the new snapshot
try (SnapshotReader<String> snapshot = context.listener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, snapshot.snapshotId());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot);
SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot);
}
}

Expand Down Expand Up @@ -210,7 +211,7 @@ public void testSecondListenerNotified(boolean entireLog) throws Exception {
// Check that the second listener was notified of the new snapshot
try (SnapshotReader<String> snapshot = secondListener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, snapshot.snapshotId());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot);
SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot);
}
}

Expand Down Expand Up @@ -245,7 +246,7 @@ public void testListenerRenotified() throws Exception {
// Check that listener was notified of the new snapshot
try (SnapshotReader<String> snapshot = context.listener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, snapshot.snapshotId());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot);
SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot);
}

// Generate a new snapshot
Expand All @@ -264,7 +265,7 @@ public void testListenerRenotified() throws Exception {
// Check that listener was notified of the second snapshot
try (SnapshotReader<String> snapshot = context.listener.drainHandledSnapshot().get()) {
assertEquals(secondSnapshotId, snapshot.snapshotId());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot);
SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(), snapshot);
}
}

Expand Down Expand Up @@ -660,7 +661,7 @@ public void testFetchSnapshotRequestAsLeader() throws Exception {
List<String> records = Arrays.asList("foo", "bar");

RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.appendToLog(snapshotId.epoch(), Arrays.asList("a"))
.appendToLog(snapshotId.epoch(), Collections.singletonList("a"))
.build();

context.becomeLeader();
Expand Down Expand Up @@ -712,7 +713,7 @@ public void testLeaderShouldResignLeadershipIfNotGetFetchSnapshotRequestFromMajo
List<String> records = Arrays.asList("foo", "bar");

RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.appendToLog(snapshotId.epoch(), Arrays.asList("a"))
.appendToLog(snapshotId.epoch(), Collections.singletonList("a"))
.build();

int resignLeadershipTimeout = context.checkQuorumTimeoutMs;
Expand Down Expand Up @@ -909,7 +910,7 @@ public void testFetchSnapshotRequestWithInvalidPosition() throws Exception {
List<String> records = Arrays.asList("foo", "bar");

RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.appendToLog(snapshotId.epoch(), Arrays.asList("a"))
.appendToLog(snapshotId.epoch(), Collections.singletonList("a"))
.build();

context.becomeLeader();
Expand Down Expand Up @@ -1136,12 +1137,12 @@ public void testFetchResponseWithSnapshotId() throws Exception {
// Check that the snapshot was written to the log
RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get();
assertEquals(memorySnapshot.buffer().remaining(), snapshot.sizeInBytes());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot);
SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records), snapshot);

// Check that listener was notified of the new snapshot
try (SnapshotReader<String> reader = context.listener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, reader.snapshotId());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), reader);
SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records), reader);
}
}

Expand Down Expand Up @@ -1239,12 +1240,12 @@ public void testFetchSnapshotResponsePartialData() throws Exception {
// Check that the snapshot was written to the log
RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get();
assertEquals(memorySnapshot.buffer().remaining(), snapshot.sizeInBytes());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot);
SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records), snapshot);

// Check that listener was notified of the new snapshot
try (SnapshotReader<String> reader = context.listener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, reader.snapshotId());
SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), reader);
SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records), reader);
}
}

Expand Down
22 changes: 11 additions & 11 deletions raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ public void testResignInOlderEpochIgnored() throws Exception {
context.client.poll();

// Ensure we are still leader even after expiration of the election timeout.
context.time.sleep(context.electionTimeoutMs() * 2);
context.time.sleep(context.electionTimeoutMs() * 2L);
context.client.poll();
context.assertElectedLeader(currentEpoch, localId);
}
Expand Down Expand Up @@ -607,7 +607,7 @@ public void testElectionTimeoutAfterUserInitiatedResign() throws Exception {
resignedEpoch, OptionalInt.of(localId));

// After the election timer, we should become a candidate.
context.time.sleep(2 * context.electionTimeoutMs());
context.time.sleep(2L * context.electionTimeoutMs());
context.pollUntil(context.client.quorum()::isCandidate);
assertEquals(resignedEpoch + 1, context.currentEpoch());
assertEquals(new LeaderAndEpoch(OptionalInt.empty(), resignedEpoch + 1),
Expand Down Expand Up @@ -693,7 +693,7 @@ public void testInitializeAsCandidateAndBecomeLeader() throws Exception {
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();

context.assertUnknownLeader(0);
context.time.sleep(2 * context.electionTimeoutMs());
context.time.sleep(2L * context.electionTimeoutMs());

context.pollUntilRequest();
context.assertVotedCandidate(1, localId);
Expand Down Expand Up @@ -737,7 +737,7 @@ public void testInitializeAsCandidateAndBecomeLeaderQuorumOfThree() throws Excep
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();

context.assertUnknownLeader(0);
context.time.sleep(2 * context.electionTimeoutMs());
context.time.sleep(2L * context.electionTimeoutMs());

context.pollUntilRequest();
context.assertVotedCandidate(1, localId);
Expand Down Expand Up @@ -1118,7 +1118,7 @@ public void testVoteRequestTimeout() throws Exception {
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
context.assertUnknownLeader(0);

context.time.sleep(2 * context.electionTimeoutMs());
context.time.sleep(2L * context.electionTimeoutMs());
context.pollUntilRequest();
context.assertVotedCandidate(epoch, localId);

Expand Down Expand Up @@ -1361,7 +1361,7 @@ public void testRetryElection() throws Exception {

context.assertUnknownLeader(0);

context.time.sleep(2 * context.electionTimeoutMs());
context.time.sleep(2L * context.electionTimeoutMs());
context.pollUntilRequest();
context.assertVotedCandidate(epoch, localId);

Expand Down Expand Up @@ -2090,7 +2090,7 @@ public void testVoteResponseIgnoredAfterBecomingFollower() throws Exception {
context.assertUnknownLeader(epoch - 1);

// Sleep a little to ensure that we become a candidate
context.time.sleep(context.electionTimeoutMs() * 2);
context.time.sleep(context.electionTimeoutMs() * 2L);

// Wait until the vote requests are inflight
context.pollUntilRequest();
Expand Down Expand Up @@ -2696,7 +2696,7 @@ public void testFollowerLogReconciliation() throws Exception {
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withElectedLeader(epoch, otherNodeId)
.appendToLog(lastEpoch, Arrays.asList("foo", "bar"))
.appendToLog(lastEpoch, Arrays.asList("baz"))
.appendToLog(lastEpoch, singletonList("baz"))
.build();

context.assertElectedLeader(epoch, otherNodeId);
Expand Down Expand Up @@ -2827,7 +2827,7 @@ public void testClusterAuthorizationFailedInVote() throws Exception {
.build();

// Sleep a little to ensure that we become a candidate
context.time.sleep(context.electionTimeoutMs() * 2);
context.time.sleep(context.electionTimeoutMs() * 2L);
context.pollUntilRequest();
context.assertVotedCandidate(epoch, localId);

Expand Down Expand Up @@ -3186,7 +3186,7 @@ public void testHandleCommitCallbackFiresInCandidateState() throws Exception {

// Timeout the election and become candidate
int candidateEpoch = epoch + 2;
context.time.sleep(context.electionTimeoutMs() * 2);
context.time.sleep(context.electionTimeoutMs() * 2L);
context.client.poll();
context.assertVotedCandidate(candidateEpoch, localId);

Expand Down Expand Up @@ -3232,7 +3232,7 @@ public void testHandleLeaderChangeFiresAfterUnattachedRegistration() throws Exce
LeaderAndEpoch expectedLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), epoch);
assertEquals(expectedLeaderAndEpoch, secondListener.currentLeaderAndEpoch());

// Transition to follower and the expect a leader changed notification
// Transition to follower and then expect a leader changed notification
context.deliverRequest(context.beginEpochRequest(epoch, otherNodeId));
context.pollUntilResponse();

Expand Down
4 changes: 2 additions & 2 deletions raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ public void testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot()
appendBatch(numberOfRecords, 2);
appendBatch(numberOfRecords, 4);

// offset is not equal to oldest snapshot's offset
// offset is not equal to the oldest snapshot's offset
ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 3);
assertEquals(ValidOffsetAndEpoch.diverging(new OffsetAndEpoch(20, 2)), resultOffsetAndEpoch);
}
Expand All @@ -845,7 +845,7 @@ public void testValidateEpochLessThanFirstEpochInLog() {

appendBatch(numberOfRecords, 3);

// offset is not equal to oldest snapshot's offset
// offset is not equal to the oldest snapshot's offset
ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 2);
assertEquals(ValidOffsetAndEpoch.diverging(olderEpochSnapshotId), resultOffsetAndEpoch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ static RaftClientTestContext initializeAsLeader(int localId, Set<Integer> voters

public void becomeLeader() throws Exception {
int currentEpoch = currentEpoch();
time.sleep(electionTimeoutMs * 2);
time.sleep(electionTimeoutMs * 2L);
expectAndGrantVotes(currentEpoch + 1);
expectBeginEpoch(currentEpoch + 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ public void testDrainDoesNotBlockWithConcurrentAppend() throws Exception {
List<BatchAccumulator.CompletedBatch<String>> drained = acc.drain();
assertEquals(1, drained.size());
assertEquals(Long.MAX_VALUE - time.milliseconds(), acc.timeUntilDrain(time.milliseconds()));
drained.stream().forEach(completedBatch -> {
drained.forEach(completedBatch -> {
completedBatch.data.batches().forEach(recordBatch -> {
assertEquals(leaderEpoch, recordBatch.partitionLeaderEpoch()); });
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -69,7 +70,7 @@ void testBuildBatch(CompressionType compressionType) {

records.forEach(record -> builder.appendRecord(record, null));
MemoryRecords builtRecordSet = builder.build();
assertTrue(builder.bytesNeeded(Arrays.asList("a"), null).isPresent());
assertTrue(builder.bytesNeeded(Collections.singletonList("a"), null).isPresent());
assertThrows(IllegalStateException.class, () -> builder.appendRecord("a", null));

List<MutableRecordBatch> builtBatches = Utils.toList(builtRecordSet.batchIterator());
Expand Down Expand Up @@ -113,7 +114,7 @@ public void testHasRoomForUncompressed(int batchSize) {

String record = "i am a record";

while (!builder.bytesNeeded(Arrays.asList(record), null).isPresent()) {
while (!builder.bytesNeeded(Collections.singletonList(record), null).isPresent()) {
builder.appendRecord(record, null);
}

Expand Down

0 comments on commit fc91e93

Please sign in to comment.