Skip to content

Commit

Permalink
KAFKA-16526; Quorum state data version 1 (apache#15859)
Browse files Browse the repository at this point in the history
Allow KRaft replicas to read and write version 0 and 1 of the quorum-state file. Which version is written is controlled by the kraft.version. With kraft.version 0, version 0 of the quorum-state file is written. With kraft.version 1, version 1 of the quorum-state file is written. Version 1 of the quorum-state file adds the VotedDirectoryId field and removes the CurrentVoters. The other fields removed in version 1 are not important as they were not overwritten or used by KRaft.

In kraft.version 1 the set of voters will be stored in the kraft partition log segments and snapshots.

To implement this feature the following changes were made to KRaft.

FileBasedStateStore was renamed to FileQuorumStateStore to better match the name of the implemented interface QuorumStateStore.

The QuorumStateStore::writeElectionState was extended to include the kraft.version. This version is used to determine which version of QuorumStateData to store. When writing version 0 the VotedDirectoryId is not persisted but the latest value is kept in-memory. This allows the replica to vote consistently while they stay online. If a replica restarts in the middle of an election it will forget the VotedDirectoryId if the kraft.version is 0. This should be rare in practice and should only happen if there is an election and failure while the system is upgrading to kraft.version 1.

The type ElectionState, the interface EpochState and all of the implementations of EpochState (VotedState, UnattachedState, FollowerState, ResignedState, CandidateState and LeaderState) are extended to support the new voted directory id.

The type QuorumState is changed so that local directory id is used. The type is also changed so that the latest value for the set of voters and the kraft version is query from the KRaftControlRecordStateMachine.

The replica directory id is read from the meta.properties and passed to the KafkaRaftClient. The replica directory id is guaranteed to be set in the local replica.

Adds a new metric for current-vote-directory-id which exposes the latest in-memory value of the voted directory id.

Renames VoterSet.VoterKey to ReplicaKey.

It is important to note that after this change, version 1 of the quorum-state file will not be written by kraft controllers and brokers. This change adds support reading and writing version 1 of the file in preparation for future changes.

Reviewers: Jun Rao <[email protected]>
  • Loading branch information
jsancio authored May 16, 2024
1 parent 2c51594 commit 056d232
Show file tree
Hide file tree
Showing 45 changed files with 2,090 additions and 1,075 deletions.
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, RaftClient, QuorumConfig, ReplicatedLog}
import org.apache.kafka.raft.{FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, RaftClient, QuorumConfig, ReplicatedLog}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.util.KafkaScheduler
Expand Down Expand Up @@ -138,6 +138,7 @@ trait RaftManager[T] {
class KafkaRaftManager[T](
clusterId: String,
config: KafkaConfig,
metadataLogDirUuid: Uuid,
recordSerde: RecordSerde[T],
topicPartition: TopicPartition,
topicId: Uuid,
Expand Down Expand Up @@ -184,7 +185,7 @@ class KafkaRaftManager[T](
client.initialize(
controllerQuorumVotersFuture.get(),
config.controllerListenerNames.head,
new FileBasedStateStore(new File(dataDir, FileBasedStateStore.DEFAULT_FILE_NAME)),
new FileQuorumStateStore(new File(dataDir, FileQuorumStateStore.DEFAULT_FILE_NAME)),
metrics
)
netChannel.start()
Expand Down Expand Up @@ -218,6 +219,7 @@ class KafkaRaftManager[T](
private def buildRaftClient(): KafkaRaftClient[T] = {
val client = new KafkaRaftClient(
OptionalInt.of(config.nodeId),
metadataLogDirUuid,
recordSerde,
netChannel,
replicatedLog,
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ class KafkaServer(
val initialMetaPropsEnsemble = {
val loader = new MetaPropertiesEnsemble.Loader()
config.logDirs.foreach(loader.addLogDir)
if (config.migrationEnabled) {
loader.addMetadataLogDir(config.metadataLogDir)
}
loader.load()
}

Expand Down Expand Up @@ -432,6 +435,8 @@ class KafkaServer(
raftManager = new KafkaRaftManager[ApiMessageAndVersion](
metaPropsEnsemble.clusterId().get(),
config,
// metadata log dir and directory.id must exist because migration is enabled
metaPropsEnsemble.logDirProps.get(metaPropsEnsemble.metadataLogDir.get).directoryId.get,
new MetadataRecordSerde,
KafkaRaftServer.MetadataPartition,
KafkaRaftServer.MetadataTopicId,
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ class SharedServer(
val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
clusterId,
sharedServerConfig,
metaPropsEnsemble.logDirProps.get(metaPropsEnsemble.metadataLogDir.get).directoryId.get,
new MetadataRecordSerde,
KafkaRaftServer.MetadataPartition,
KafkaRaftServer.MetadataTopicId,
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import scala.jdk.CollectionConverters._
*/
class TestRaftServer(
val config: KafkaConfig,
val nodeDirectoryId: Uuid,
val throughput: Int,
val recordSize: Int
) extends Logging {
Expand Down Expand Up @@ -86,6 +87,7 @@ class TestRaftServer(
raftManager = new KafkaRaftManager[Array[Byte]](
Uuid.ZERO_UUID.toString,
config,
nodeDirectoryId,
new ByteArraySerde,
partition,
topicId,
Expand Down Expand Up @@ -431,6 +433,11 @@ object TestRaftServer extends Logging {
.ofType(classOf[Int])
.defaultsTo(256)

val directoryId: OptionSpec[String] = parser.accepts("replica-directory-id", "The directory id of the replica")
.withRequiredArg
.describedAs("directory id")
.ofType(classOf[String])

options = parser.parse(args : _*)
}

Expand All @@ -444,6 +451,11 @@ object TestRaftServer extends Logging {
if (configFile == null) {
throw new InvalidConfigurationException("Missing configuration file. Should specify with '--config'")
}

val directoryIdAsString = opts.options.valueOf(opts.directoryId)
if (directoryIdAsString == null) {
throw new InvalidConfigurationException("Missing replica directory id. Should specify with --replica-directory-id")
}
val serverProps = Utils.loadProps(configFile)

// KafkaConfig requires either `process.roles` or `zookeeper.connect`. Neither are
Expand All @@ -453,7 +465,7 @@ object TestRaftServer extends Logging {
val config = KafkaConfig.fromProps(serverProps, doLog = false)
val throughput = opts.options.valueOf(opts.throughputOpt)
val recordSize = opts.options.valueOf(opts.recordSizeOpt)
val server = new TestRaftServer(config, throughput, recordSize)
val server = new TestRaftServer(config, Uuid.fromString(directoryIdAsString), throughput, recordSize)

Exit.addShutdownHook("raft-shutdown-hook", server.shutdown())

Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class RaftManagerTest {
new KafkaRaftManager[Array[Byte]](
Uuid.randomUuid.toString,
config,
Uuid.randomUuid,
new ByteArraySerde,
topicPartition,
topicId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ class DumpLogSegmentsTest {
.setLastContainedLogTimestamp(lastContainedLogTimestamp)
.setRawSnapshotWriter(metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)).get)
.setKraftVersion(1)
.setVoterSet(Optional.of(VoterSetTest.voterSet(VoterSetTest.voterMap(Arrays.asList(1, 2, 3)))))
.setVoterSet(Optional.of(VoterSetTest.voterSet(VoterSetTest.voterMap(Arrays.asList(1, 2, 3), true))))
.build(MetadataRecordSerde.INSTANCE)
) { snapshotWriter =>
snapshotWriter.append(metadataRecords.asJava)
Expand Down
69 changes: 49 additions & 20 deletions raft/src/main/java/org/apache/kafka/raft/CandidateState.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@
*/
package org.apache.kafka.raft;

import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.raft.internals.ReplicaKey;
import org.apache.kafka.raft.internals.VoterSet;
import org.slf4j.Logger;

public class CandidateState implements EpochState {
private final int localId;
private final Uuid localDirectoryId;
private final int epoch;
private final int retries;
private final Map<Integer, State> voteStates = new HashMap<>();
Expand All @@ -39,7 +42,7 @@ public class CandidateState implements EpochState {
private final Logger log;

/**
* The lifetime of a candidate state is the following:
* The lifetime of a candidate state is the following.
*
* 1. Once started, it would keep record of the received votes.
* 2. If majority votes granted, it can then end its life and will be replaced by a leader state;
Expand All @@ -51,14 +54,27 @@ public class CandidateState implements EpochState {
protected CandidateState(
Time time,
int localId,
Uuid localDirectoryId,
int epoch,
Set<Integer> voters,
VoterSet voters,
Optional<LogOffsetMetadata> highWatermark,
int retries,
int electionTimeoutMs,
LogContext logContext
) {
if (!voters.isVoter(ReplicaKey.of(localId, Optional.of(localDirectoryId)))) {
throw new IllegalArgumentException(
String.format(
"Local replica (%d, %s) must be in the set of voters %s",
localId,
localDirectoryId,
voters
)
);
}

this.localId = localId;
this.localDirectoryId = localDirectoryId;
this.epoch = epoch;
this.highWatermark = highWatermark;
this.retries = retries;
Expand All @@ -68,7 +84,7 @@ protected CandidateState(
this.backoffTimer = time.timer(0);
this.log = logContext.logger(CandidateState.class);

for (Integer voterId : voters) {
for (Integer voterId : voters.voterIds()) {
voteStates.put(voterId, State.UNRECORDED);
}
voteStates.put(localId, State.GRANTED);
Expand Down Expand Up @@ -227,7 +243,11 @@ public long remainingElectionTimeMs(long currentTimeMs) {

@Override
public ElectionState election() {
return ElectionState.withVotedCandidate(epoch, localId, voteStates.keySet());
return ElectionState.withVotedCandidate(
epoch,
ReplicaKey.of(localId, Optional.of(localDirectoryId)),
voteStates.keySet()
);
}

@Override
Expand All @@ -241,24 +261,33 @@ public Optional<LogOffsetMetadata> highWatermark() {
}

@Override
public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
public boolean canGrantVote(
ReplicaKey candidateKey,
boolean isLogUpToDate
) {
// Still reject vote request even candidateId = localId, Although the candidate votes for
// itself, this vote is implicit and not "granted".
log.debug("Rejecting vote request from candidate {} since we are already candidate in epoch {}",
candidateId, epoch);
log.debug(
"Rejecting vote request from candidate ({}) since we are already candidate in epoch {}",
candidateKey,
epoch
);
return false;
}

@Override
public String toString() {
return "CandidateState(" +
"localId=" + localId +
", epoch=" + epoch +
", retries=" + retries +
", voteStates=" + voteStates +
", highWatermark=" + highWatermark +
", electionTimeoutMs=" + electionTimeoutMs +
')';
return String.format(
"CandidateState(localId=%d, localDirectoryId=%s,epoch=%d, retries=%d, voteStates=%s, " +
"highWatermark=%s, electionTimeoutMs=%d)",
localId,
localDirectoryId,
epoch,
retries,
voteStates,
highWatermark,
electionTimeoutMs
);
}

@Override
Expand Down
Loading

0 comments on commit 056d232

Please sign in to comment.