Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

replication: Don't use majority rule for old entries #302

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/raft/fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,12 @@ RAFT_API bool raft_fixture_step_until_delivered(struct raft_fixture *f,
unsigned j,
unsigned max_msecs);

RAFT_API bool raft_fixture_step_until_replicated(struct raft_fixture *f,
unsigned i,
raft_index index,
raft_term term,
unsigned max_msecs);

/**
* Set a function to be called after every time a fixture event occurs as
* consequence of a step.
Expand Down
2 changes: 1 addition & 1 deletion src/convert.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ int convertToLeader(struct raft *r)
* we are the only voter around. */
size_t n_voters = configurationVoterCount(&r->configuration);
if (n_voters == 1 && (r->last_stored > r->commit_index)) {
tracef("Apply log entries after self election %llu %llu", r->last_stored, r->commit_index);
tracef("apply log entries after self election %llu %llu", r->last_stored, r->commit_index);
r->commit_index = r->last_stored;
rv = replicationApply(r);
}
Expand Down
36 changes: 36 additions & 0 deletions src/fixture.c
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,42 @@ bool raft_fixture_step_until_has_no_leader(struct raft_fixture *f,
return raft_fixture_step_until(f, hasNoLeader, NULL, max_msecs);
}

struct entryIsReplicatedArg
{
unsigned i;
raft_index index;
raft_term term;
};

static bool entryIsReplicated(struct raft_fixture *f, void *arg)
{
struct entryIsReplicatedArg *x = arg;
struct raft_log *log;
unsigned i;
bool result = true;
assert(x->i <= f->n);
if (x->i == f->n) {
for (i = 0; i < f->n; i++) {
log = &f->servers[i].raft.log;
result = result && logLastIndex(log) >= x->index && logTermOf(log, x->index) == x->term;
}
} else {
log = &f->servers[x->i].raft.log;
result = logLastIndex(log) >= x->index && logTermOf(log, x->index) == x->term;
}
return result;
}

bool raft_fixture_step_until_replicated(struct raft_fixture *f,
unsigned i,
raft_index index,
raft_term term,
unsigned max_msecs)
{
struct entryIsReplicatedArg arg = {i, index, term};
return raft_fixture_step_until(f, entryIsReplicated, &arg, max_msecs);
}

/* Enable/disable dropping outgoing messages of a certain type from all servers
* except one. */
static void dropAllExcept(struct raft_fixture *f,
Expand Down
51 changes: 35 additions & 16 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,7 @@ static void appendFollowerCb(struct raft_io_append *req, int status)
*/
if (args->leader_commit > r->commit_index) {
r->commit_index = min(args->leader_commit, r->last_stored);
tracef("new commit index %llu", r->commit_index);
rv = replicationApply(r);
if (rv != 0) {
goto out;
Expand Down Expand Up @@ -1087,6 +1088,7 @@ int replicationAppend(struct raft *r,
if ((args->leader_commit > r->commit_index)
&& !replicationInstallSnapshotBusy(r)) {
r->commit_index = min(args->leader_commit, r->last_stored);
tracef("new commit index %llu", r->commit_index);
rv = replicationApply(r);
if (rv != 0) {
return rv;
Expand Down Expand Up @@ -1620,16 +1622,31 @@ int replicationApply(struct raft *r)
return rv;
}

void replicationQuorum(struct raft *r, const raft_index index)
static bool matchIndicesAtLeast(struct raft *r, const raft_index index)
{
size_t votes = 0;
size_t i;
for (i = 0; i < r->configuration.n; i++) {
struct raft_server *server = &r->configuration.servers[i];
tracef("server->id %llu server->role %d index %llu", server->id, server->role, index);
if (server->role != RAFT_VOTER) {
continue;
}
tracef("match_index %llu", r->leader_state.progress[i].match_index);
if (r->leader_state.progress[i].match_index >= index) {
votes++;
}
}

assert(r->state == RAFT_LEADER);
tracef("votes %ld", votes);
return votes > (configurationVoterCount(&r->configuration) / 2);
}

if (index <= r->commit_index) {
return;
}
void replicationQuorum(struct raft *r, const raft_index index)
{
raft_index n;

assert(r->state == RAFT_LEADER);

/* TODO: fuzzy-test --seed 0x8db5fccc replication/entries/partitioned
* fails the assertion below. */
Expand All @@ -1639,19 +1656,21 @@ void replicationQuorum(struct raft *r, const raft_index index)
// assert(logTermOf(&r->log, index) > 0);
assert(logTermOf(&r->log, index) <= r->current_term);

for (i = 0; i < r->configuration.n; i++) {
struct raft_server *server = &r->configuration.servers[i];
if (server->role != RAFT_VOTER) {
continue;
}
if (r->leader_state.progress[i].match_index >= index) {
votes++;
/* Raft Paper fig. 2 - Rules for Servers
* If there exists an N such that N > commitIndex, a majority
* of matchIndex[i] >= N, and log[N].term == currentTerm:
* set commitIndex = N (5.3, 5.4).
* */
for (n = index; n > r->commit_index; n -= 1) {
if (logTermOf(&r->log, n) < r->current_term) {
break;
}
}

if (votes > configurationVoterCount(&r->configuration) / 2) {
r->commit_index = index;
tracef("new commit index %llu", r->commit_index);
if (matchIndicesAtLeast(r, n)) {
r->commit_index = n;
tracef("new commit index %llu", n);
break;
}
}

return;
Expand Down
51 changes: 1 addition & 50 deletions test/integration/test_barrier.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,55 +29,6 @@ static void tearDown(void *data)
free(f);
}

/******************************************************************************
*
* Helper macros
*
*****************************************************************************/

struct result
{
int status;
bool done;
};

static void barrierCbAssertResult(struct raft_barrier *req, int status)
{
struct result *result = req->data;
munit_assert_int(status, ==, result->status);
result->done = true;
}

static bool barrierCbHasFired(struct raft_fixture *f, void *arg)
{
struct result *result = arg;
(void)f;
return result->done;
}

/* Submit a barrier request. */
#define BARRIER_SUBMIT(I) \
struct raft_barrier _req; \
struct result _result = {0, false}; \
int _rv; \
_req.data = &_result; \
_rv = raft_barrier(CLUSTER_RAFT(I), &_req, barrierCbAssertResult); \
munit_assert_int(_rv, ==, 0);

/* Expect the barrier callback to fire with the given status. */
#define BARRIER_EXPECT(STATUS) _result.status = STATUS

/* Wait until the barrier request completes. */
#define BARRIER_WAIT CLUSTER_STEP_UNTIL(barrierCbHasFired, &_result, 2000)

/* Submit to the I'th server a barrier request and wait for the operation to
* succeed. */
#define BARRIER(I) \
do { \
BARRIER_SUBMIT(I); \
BARRIER_WAIT; \
} while (0)

/******************************************************************************
*
* Success scenarios
Expand All @@ -89,6 +40,6 @@ SUITE(raft_barrier)
TEST(raft_barrier, cb, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
BARRIER(0);
CLUSTER_BARRIER(0);
return MUNIT_OK;
}
33 changes: 19 additions & 14 deletions test/integration/test_membership.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ struct result
};

/* Submit an apply request. */
#define APPLY_SUBMIT(I) \
struct raft_buffer _buf; \
struct raft_apply _req; \
struct result _result = {0, false}; \
int _rv; \
FsmEncodeSetX(123, &_buf); \
_req.data = &_result; \
_rv = raft_apply(CLUSTER_RAFT(I), &_req, &_buf, 1, NULL); \
#define APPLY_SUBMIT(I) \
struct raft_buffer _buf; \
struct raft_apply _req; \
struct result _result = {0, false}; \
int _rv; \
FsmEncodeSetX(123, &_buf); \
_req.data = &_result; \
_rv = raft_apply(CLUSTER_RAFT(I), &_req, &_buf, 1, NULL); \
munit_assert_int(_rv, ==, 0);

/******************************************************************************
Expand Down Expand Up @@ -250,19 +250,14 @@ TEST(raft_remove, selfThreeNodeClusterReplicate, setup, tear_down, 0, NULL)
* log entries but does not count itself in majorities.`
*
* */
APPLY_SUBMIT(0)
APPLY_SUBMIT(0);

/* The removed leader eventually steps down */
CLUSTER_STEP_UNTIL_HAS_NO_LEADER(5000);
raft_leader(CLUSTER_RAFT(0), &leader_id, &leader_address);
munit_assert_ulong(leader_id, ==, 0);
munit_assert_ptr_null(leader_address);

/* Every node should have all entries */
CLUSTER_STEP_UNTIL_APPLIED(0, 4, 10000);
CLUSTER_STEP_UNTIL_APPLIED(1, 4, 10000);
CLUSTER_STEP_UNTIL_APPLIED(2, 4, 10000);

/* The removed leader eventually steps down */
CLUSTER_STEP_UNTIL_HAS_LEADER(5000);

Expand All @@ -276,6 +271,16 @@ TEST(raft_remove, selfThreeNodeClusterReplicate, setup, tear_down, 0, NULL)
munit_assert_ulong(leader_id, !=, 0);
munit_assert_ulong(leader_id, !=, 1);
munit_assert_ptr_not_null(leader_address);

/* The new leader applies a barrier to commit entries from previous terms */
CLUSTER_BARRIER(CLUSTER_LEADER);

/* Every node should have at least 4 entries, and the non-removed nodes
* should have 5 (because of the barrier) */
CLUSTER_STEP_UNTIL_APPLIED(0, 4, 10000);
CLUSTER_STEP_UNTIL_APPLIED(1, 5, 10000);
CLUSTER_STEP_UNTIL_APPLIED(2, 5, 10000);

return MUNIT_OK;
}

Expand Down
36 changes: 34 additions & 2 deletions test/integration/test_replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,10 @@ TEST(replication, recvMissingEntries, setUp, tearDown, 0, NULL)
CLUSTER_STEP_UNTIL_HAS_LEADER(5000);
munit_assert_int(CLUSTER_LEADER, ==, 0);

CLUSTER_BARRIER(CLUSTER_LEADER);

/* The first server replicates missing entries to the second. */
CLUSTER_STEP_UNTIL_APPLIED(1, 2, 3000);
CLUSTER_STEP_UNTIL_APPLIED(1, 3, 3000);

return MUNIT_OK;
}
Expand Down Expand Up @@ -806,11 +808,41 @@ TEST(replication, resultRetry, setUp, tearDown, 0, NULL)

CLUSTER_START;
CLUSTER_ELECT(0);
CLUSTER_BARRIER(0);

/* The first server receives an AppendEntries result from the second server
* indicating that its log does not have the entry at index 2, so it will
* resend it. */
CLUSTER_STEP_UNTIL_APPLIED(1, 2, 2000);
CLUSTER_STEP_UNTIL_APPLIED(1, 3, 2000);

return MUNIT_OK;
}

/* A leader doesn't commit any log entries from previous terms before committing
* an entry from its own term. */
TEST(replication, commitPreviousTerm, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
struct raft_entry entry;
bool result;

CLUSTER_GROW;
CLUSTER_BOOTSTRAP;

entry.type = RAFT_COMMAND;
entry.term = 1;
FsmEncodeSetX(5, &entry.buf);
CLUSTER_ADD_ENTRY(0, &entry);

CLUSTER_START;
CLUSTER_ELECT(0);
result = raft_fixture_step_until_replicated(&f->cluster, 3, 2, 1, 2000);
/* Give the leader enough time to notice that the entry is replicated. */
CLUSTER_STEP_N(10);
munit_assert_true(result);
/* Even though the entry is replicated everywhere, the leader doesn't
* commit it because it was appended in a previous term. */
munit_assert_uint64(raft_last_applied(CLUSTER_RAFT(0)), ==, 1);

return MUNIT_OK;
}
14 changes: 14 additions & 0 deletions test/lib/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,17 @@ void cluster_randomize(struct raft_fixture *f, struct raft_fixture_event *event)
{
randomize(f, event->server_index, event->type);
}

void barrierCbAssertResult(struct raft_barrier *req, int status)
{
struct barrierCbResult *result = req->data;
munit_assert_int(status, ==, result->status);
result->done = true;
}

bool barrierCbHasFired(struct raft_fixture *f, void *arg)
{
struct barrierCbResult *result = arg;
(void)f;
return result->done;
}
32 changes: 32 additions & 0 deletions test/lib/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,4 +436,36 @@ void cluster_randomize_init(struct raft_fixture *f);
void cluster_randomize(struct raft_fixture *f,
struct raft_fixture_event *event);

/* Submit a barrier request. */
#define CLUSTER_BARRIER_SUBMIT(I) \
struct raft_barrier _barrier_req; \
struct barrierCbResult _barrier_result = {0, false}; \
int _barrier_rv; \
_barrier_req.data = &_barrier_result; \
_barrier_rv = raft_barrier(CLUSTER_RAFT(I), &_barrier_req, barrierCbAssertResult); \
munit_assert_int(_barrier_rv, ==, 0);

/* Expect the barrier callback to fire with the given status. */
#define CLUSTER_BARRIER_EXPECT(STATUS) _barrier_result.status = STATUS

/* Wait until the barrier request completes. */
#define CLUSTER_BARRIER_WAIT CLUSTER_STEP_UNTIL(barrierCbHasFired, &_barrier_result, 2000)

/* Submit to the I'th server a barrier request and wait for the operation to
* succeed. */
#define CLUSTER_BARRIER(I) \
do { \
CLUSTER_BARRIER_SUBMIT(I); \
CLUSTER_BARRIER_WAIT; \
} while (0)

struct barrierCbResult
{
int status;
bool done;
};

void barrierCbAssertResult(struct raft_barrier *req, int status);
bool barrierCbHasFired(struct raft_fixture *f, void *arg);

#endif /* TEST_CLUSTER_H */