Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
Update tests to use a barrier where required
Browse files Browse the repository at this point in the history
Signed-off-by: Cole Miller <[email protected]>
  • Loading branch information
cole-miller committed Sep 2, 2022
1 parent d0f2bc9 commit b994983
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 66 deletions.
2 changes: 2 additions & 0 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,7 @@ static void appendFollowerCb(struct raft_io_append *req, int status)
*/
if (args->leader_commit > r->commit_index) {
r->commit_index = min(args->leader_commit, r->last_stored);
tracef("new commit index %llu", r->commit_index);
rv = replicationApply(r);
if (rv != 0) {
goto out;
Expand Down Expand Up @@ -1087,6 +1088,7 @@ int replicationAppend(struct raft *r,
if ((args->leader_commit > r->commit_index)
&& !replicationInstallSnapshotBusy(r)) {
r->commit_index = min(args->leader_commit, r->last_stored);
tracef("new commit index %llu", r->commit_index);
rv = replicationApply(r);
if (rv != 0) {
return rv;
Expand Down
51 changes: 1 addition & 50 deletions test/integration/test_barrier.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,55 +29,6 @@ static void tearDown(void *data)
free(f);
}

/******************************************************************************
*
* Helper macros
*
*****************************************************************************/

struct result
{
int status;
bool done;
};

static void barrierCbAssertResult(struct raft_barrier *req, int status)
{
struct result *result = req->data;
munit_assert_int(status, ==, result->status);
result->done = true;
}

static bool barrierCbHasFired(struct raft_fixture *f, void *arg)
{
struct result *result = arg;
(void)f;
return result->done;
}

/* Submit a barrier request. */
#define BARRIER_SUBMIT(I) \
struct raft_barrier _req; \
struct result _result = {0, false}; \
int _rv; \
_req.data = &_result; \
_rv = raft_barrier(CLUSTER_RAFT(I), &_req, barrierCbAssertResult); \
munit_assert_int(_rv, ==, 0);

/* Expect the barrier callback to fire with the given status. */
#define BARRIER_EXPECT(STATUS) _result.status = STATUS

/* Wait until the barrier request completes. */
#define BARRIER_WAIT CLUSTER_STEP_UNTIL(barrierCbHasFired, &_result, 2000)

/* Submit to the I'th server a barrier request and wait for the operation to
* succeed. */
#define BARRIER(I) \
do { \
BARRIER_SUBMIT(I); \
BARRIER_WAIT; \
} while (0)

/******************************************************************************
*
* Success scenarios
Expand All @@ -89,6 +40,6 @@ SUITE(raft_barrier)
TEST(raft_barrier, cb, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
BARRIER(0);
CLUSTER_BARRIER(0);
return MUNIT_OK;
}
33 changes: 19 additions & 14 deletions test/integration/test_membership.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ struct result
};

/* Submit an apply request. */
#define APPLY_SUBMIT(I) \
struct raft_buffer _buf; \
struct raft_apply _req; \
struct result _result = {0, false}; \
int _rv; \
FsmEncodeSetX(123, &_buf); \
_req.data = &_result; \
_rv = raft_apply(CLUSTER_RAFT(I), &_req, &_buf, 1, NULL); \
#define APPLY_SUBMIT(I) \
struct raft_buffer _buf; \
struct raft_apply _req; \
struct result _result = {0, false}; \
int _rv; \
FsmEncodeSetX(123, &_buf); \
_req.data = &_result; \
_rv = raft_apply(CLUSTER_RAFT(I), &_req, &_buf, 1, NULL); \
munit_assert_int(_rv, ==, 0);

/******************************************************************************
Expand Down Expand Up @@ -250,19 +250,14 @@ TEST(raft_remove, selfThreeNodeClusterReplicate, setup, tear_down, 0, NULL)
* log entries but does not count itself in majorities.`
*
* */
APPLY_SUBMIT(0)
APPLY_SUBMIT(0);

/* The removed leader eventually steps down */
CLUSTER_STEP_UNTIL_HAS_NO_LEADER(5000);
raft_leader(CLUSTER_RAFT(0), &leader_id, &leader_address);
munit_assert_ulong(leader_id, ==, 0);
munit_assert_ptr_null(leader_address);

/* Every node should have all entries */
CLUSTER_STEP_UNTIL_APPLIED(0, 4, 10000);
CLUSTER_STEP_UNTIL_APPLIED(1, 4, 10000);
CLUSTER_STEP_UNTIL_APPLIED(2, 4, 10000);

/* The removed leader eventually steps down */
CLUSTER_STEP_UNTIL_HAS_LEADER(5000);

Expand All @@ -276,6 +271,16 @@ TEST(raft_remove, selfThreeNodeClusterReplicate, setup, tear_down, 0, NULL)
munit_assert_ulong(leader_id, !=, 0);
munit_assert_ulong(leader_id, !=, 1);
munit_assert_ptr_not_null(leader_address);

/* The new leader applies a barrier to commit entries from previous terms */
CLUSTER_BARRIER(CLUSTER_LEADER);

/* Every node should have at least 4 entries, and the non-removed nodes
* should have 5 (because of the barrier) */
CLUSTER_STEP_UNTIL_APPLIED(0, 4, 10000);
CLUSTER_STEP_UNTIL_APPLIED(1, 5, 10000);
CLUSTER_STEP_UNTIL_APPLIED(2, 5, 10000);

return MUNIT_OK;
}

Expand Down
7 changes: 5 additions & 2 deletions test/integration/test_replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,10 @@ TEST(replication, recvMissingEntries, setUp, tearDown, 0, NULL)
CLUSTER_STEP_UNTIL_HAS_LEADER(5000);
munit_assert_int(CLUSTER_LEADER, ==, 0);

CLUSTER_BARRIER(CLUSTER_LEADER);

/* The first server replicates missing entries to the second. */
CLUSTER_STEP_UNTIL_APPLIED(1, 2, 3000);
CLUSTER_STEP_UNTIL_APPLIED(1, 3, 3000);

return MUNIT_OK;
}
Expand Down Expand Up @@ -806,11 +808,12 @@ TEST(replication, resultRetry, setUp, tearDown, 0, NULL)

CLUSTER_START;
CLUSTER_ELECT(0);
CLUSTER_BARRIER(0);

/* The first server receives an AppendEntries result from the second server
* indicating that its log does not have the entry at index 2, so it will
* resend it. */
CLUSTER_STEP_UNTIL_APPLIED(1, 2, 2000);
CLUSTER_STEP_UNTIL_APPLIED(1, 3, 2000);

return MUNIT_OK;
}
14 changes: 14 additions & 0 deletions test/lib/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,17 @@ void cluster_randomize(struct raft_fixture *f, struct raft_fixture_event *event)
{
randomize(f, event->server_index, event->type);
}

void barrierCbAssertResult(struct raft_barrier *req, int status)
{
struct barrierCbResult *result = req->data;
munit_assert_int(status, ==, result->status);
result->done = true;
}

bool barrierCbHasFired(struct raft_fixture *f, void *arg)
{
struct barrierCbResult *result = arg;
(void)f;
return result->done;
}
32 changes: 32 additions & 0 deletions test/lib/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,4 +436,36 @@ void cluster_randomize_init(struct raft_fixture *f);
void cluster_randomize(struct raft_fixture *f,
struct raft_fixture_event *event);

/* Submit a barrier request. */
#define CLUSTER_BARRIER_SUBMIT(I) \
struct raft_barrier _barrier_req; \
struct barrierCbResult _barrier_result = {0, false}; \
int _barrier_rv; \
_barrier_req.data = &_barrier_result; \
_barrier_rv = raft_barrier(CLUSTER_RAFT(I), &_barrier_req, barrierCbAssertResult); \
munit_assert_int(_barrier_rv, ==, 0);

/* Expect the barrier callback to fire with the given status. */
#define CLUSTER_BARRIER_EXPECT(STATUS) _barrier_result.status = STATUS

/* Wait until the barrier request completes. */
#define CLUSTER_BARRIER_WAIT CLUSTER_STEP_UNTIL(barrierCbHasFired, &_barrier_result, 2000)

/* Submit to the I'th server a barrier request and wait for the operation to
* succeed. */
#define CLUSTER_BARRIER(I) \
do { \
CLUSTER_BARRIER_SUBMIT(I); \
CLUSTER_BARRIER_WAIT; \
} while (0)

struct barrierCbResult
{
int status;
bool done;
};

void barrierCbAssertResult(struct raft_barrier *req, int status);
bool barrierCbHasFired(struct raft_fixture *f, void *arg);

#endif /* TEST_CLUSTER_H */

0 comments on commit b994983

Please sign in to comment.