diff --git a/src/membership.c b/src/membership.c index 5e1ad6936..0b1b6cdf7 100644 --- a/src/membership.c +++ b/src/membership.c @@ -155,23 +155,26 @@ int membershipRollback(struct raft *r) /* Fetch the last committed configuration entry. */ assert(r->configuration_index != 0); - entry = logGet(r->log, r->configuration_index); - - assert(entry != NULL); - /* Replace the current configuration with the last committed one. */ - raft_configuration_close(&r->configuration); - raft_configuration_init(&r->configuration); - - rv = configurationDecode(&entry->buf, &r->configuration); - if (rv != 0) { - return rv; + entry = logGet(r->log, r->configuration_index); + if (entry != NULL) { + raft_configuration_close(&r->configuration); + raft_configuration_init(&r->configuration); + + rv = configurationDecode(&entry->buf, &r->configuration); + if (rv != 0) { + return rv; + } + } else { + /* Configuration was truncated from log. */ + rv = configurationRestorePrevious(r); + if (rv != 0) { + return rv; + } } configurationTrace(r, &r->configuration, "roll back config"); - r->configuration_uncommitted_index = 0; - return 0; } diff --git a/src/replication.c b/src/replication.c index b247d35b6..c6b84ea32 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1240,10 +1240,17 @@ static void installSnapshotCb(struct raft_io_snapshot_put *req, int status) goto discard; } - tracef("restored snapshot with last index %llu", snapshot->index); + /* Enable configuration rollback if the next configuration after installing + * this snapshot needs to be rolled back. */ + rv = configurationBackup(r, &r->configuration); + if (rv != 0) { + /* Don't make this a hard fault, configuration rollback is a low + * probability event. */ + tracef("failed to backup current configuration."); + } + tracef("restored snapshot with last index %llu", snapshot->index); result.rejected = 0; - goto respond; discard: @@ -1468,6 +1475,7 @@ static void takeSnapshotCb(struct raft_io_snapshot_put *req, int status) { struct raft *r = req->data; struct raft_snapshot *snapshot; + int rv; r->snapshot.put.data = NULL; snapshot = &r->snapshot.pending; @@ -1478,6 +1486,20 @@ static void takeSnapshotCb(struct raft_io_snapshot_put *req, int status) goto out; } + /* While the snapshot was written, configuration changes could have + * occurred, these changes will not be purged from the log by this snapshot + * write. Therefore, we only need to backup a configuration in case + * configuration_index == snapshot->configuration_index, i.e. the last + * committed configuration is the configuration in the snapshot. (for + * simplicity this doesn't take into account the snapshot trailing parameter)*/ + if (r->configuration_index == snapshot->configuration_index) { + rv = configurationBackup(r, &snapshot->configuration); + if (rv != 0) { + /* Don't make this a hard fault, configuration rollback is a low + * probability event. */ + tracef("failed to backup last committed configuration."); + } + } logSnapshot(r->log, snapshot->index, r->snapshot.trailing); out: takeSnapshotClose(r, snapshot); @@ -1531,6 +1553,24 @@ static int takeSnapshotAsync(struct raft_io_async_work *take) return r->fsm->snapshot_async(r->fsm, &snapshot->bufs, &snapshot->n_bufs); } +static int copyLastCommittedConfiguration(const struct raft *r, struct raft_configuration *dst) +{ + const struct raft_entry *entry; + int rv; + + entry = logGet(r->log, r->configuration_index); + if (entry != NULL) { + tracef("entry != NULL index:%llu", r->configuration_index); + rv = configurationDecode(&entry->buf, dst); + } else { + tracef("entry == NULL index:%llu", r->configuration_index); + rv = configurationCopy(&r->configuration_previous, dst); + } + + assert(dst->n != 0); + return rv; +} + static int takeSnapshot(struct raft *r) { struct raft_snapshot *snapshot; @@ -1544,7 +1584,8 @@ static int takeSnapshot(struct raft *r) snapshot->bufs = NULL; snapshot->n_bufs = 0; - rv = configurationCopy(&r->configuration, &snapshot->configuration); + configurationInit(&snapshot->configuration); + rv = copyLastCommittedConfiguration(r, &snapshot->configuration); if (rv != 0) { goto abort; } diff --git a/src/snapshot.c b/src/snapshot.c index 2860fe594..026d7ec01 100644 --- a/src/snapshot.c +++ b/src/snapshot.c @@ -43,6 +43,7 @@ int snapshotRestore(struct raft *r, struct raft_snapshot *snapshot) configurationClose(&r->configuration); r->configuration = snapshot->configuration; r->configuration_index = snapshot->configuration_index; + r->configuration_uncommitted_index = 0; configurationTrace(r, &r->configuration, "configuration restore from snapshot"); r->commit_index = snapshot->index; diff --git a/src/start.c b/src/start.c index 9c04d3c35..5dd723c88 100644 --- a/src/start.c +++ b/src/start.c @@ -12,51 +12,59 @@ #define tracef(...) Tracef(r->tracer, __VA_ARGS__) -/* Restore the most recent configuration. */ -static int restoreMostRecentConfiguration(struct raft *r, - struct raft_entry *entry, - raft_index index) +/* Restore the most recent configurations. */ +static int restoreConfigurations(struct raft *r, raft_index prev_index, + raft_index last_index, struct raft_entry *last) { - struct raft_configuration configuration; + struct raft_configuration last_conf; int rv; - raft_configuration_init(&configuration); - rv = configurationDecode(&entry->buf, &configuration); - if (rv != 0) { - raft_configuration_close(&configuration); - return rv; + + /* No configuration entry loaded, nothing to do */ + if (last == NULL) { + assert(prev_index == 0); + return 0; + } else { + /* There is a latest configuration, we can't know if it's + * committed or not. Backup the configuration restored from the snapshot + * or noop in case there was no snapshot. */ + configurationBackup(r, &r->configuration); + raft_configuration_init(&last_conf); + rv = configurationDecode(&last->buf, &last_conf); + if (rv != 0) { + raft_configuration_close(&last_conf); + return rv; + } + configurationClose(&r->configuration); + r->configuration = last_conf; + r->configuration_uncommitted_index = last_index; + + /* If the last configuration is the first entry in the log, we know it's + * the bootstrap configuration and it's committed by default. */ + if (last_index == 1) { + assert(prev_index == 0); + r->configuration_index = 1; + r->configuration_uncommitted_index = 0; + } + + /* If there is a previous configuration it must have been committed as + * we don't allow multiple uncommitted configurations. */ + if (prev_index != 0) { + r->configuration_index = prev_index; + } } - configurationTrace(r, &configuration, "restore most recent configuration"); - raft_configuration_close(&r->configuration); - r->configuration = configuration; - r->configuration_index = index; + + configurationTrace(r, &r->configuration, "restore most recent configuration"); return 0; } /* Restore the entries that were loaded from persistent storage. The most recent * configuration entry will be restored as well, if any. * - * Note that we don't care whether the most recent configuration entry was - * actually committed or not. We don't allow more than one pending uncommitted - * configuration change at a time, plus - * - * when adding or removing just a single server, it is safe to switch directly - * to the new configuration. - * - * and - * - * The new configuration takes effect on each server as soon as it is added to - * that server's log: the C_new entry is replicated to the C_new servers, and - * a majority of the new configuration is used to determine the C_new entry's - * commitment. This means that servers do notwait for configuration entries to - * be committed, and each server always uses the latest configuration found in - * its log. - * - * as explained in section 4.1. - * - * TODO: we should probably set configuration_uncommitted_index as well, since we - * can't be sure a configuration change has been committed and we need to be - * ready to roll back to the last committed configuration. - */ + * Note that we cannot know if the last configuration in the log was committed + * or not, therefore we also need to track the second-to-last configuration + * entry. This second-to-last entry is committed by default as raft doesn't + * allow multipled uncommitted configuration entries and is used in case of + * configuration rollback scenarios. */ static int restoreEntries(struct raft *r, raft_index snapshot_index, raft_term snapshot_term, @@ -64,8 +72,9 @@ static int restoreEntries(struct raft *r, struct raft_entry *entries, size_t n) { - struct raft_entry *conf = NULL; - raft_index conf_index = 0; + struct raft_entry *last_conf = NULL; + raft_index last_conf_index = 0; + raft_index prev_conf_index = 0; size_t i; int rv; logStart(r->log, snapshot_index, snapshot_term, start_index); @@ -78,17 +87,20 @@ static int restoreEntries(struct raft *r, goto err; } r->last_stored++; - if (entry->type == RAFT_CHANGE) { - conf = entry; - conf_index = r->last_stored; + /* Only take configurations into account that are newer than the + * configuration restored from the snapshot. */ + if (entry->type == RAFT_CHANGE && r->last_stored > r->configuration_index) { + prev_conf_index = last_conf_index; + last_conf = entry; + last_conf_index = r->last_stored; } } - if (conf != NULL) { - rv = restoreMostRecentConfiguration(r, conf, conf_index); - if (rv != 0) { - goto err; - } + + rv = restoreConfigurations(r, prev_conf_index, last_conf_index, last_conf); + if (rv != 0) { + goto err; } + raft_free(entries); return 0; @@ -164,6 +176,14 @@ int raft_start(struct raft *r) snapshot_index = snapshot->index; snapshot_term = snapshot->term; raft_free(snapshot); + + /* Enable configuration rollback if the next configuration after installing + * this snapshot needs to be rolled back. */ + rv = configurationBackup(r, &r->configuration); + if (rv != 0) { + tracef("failed to backup current configuration."); + return rv; + } } else if (n_entries > 0) { /* If we don't have a snapshot and the on-disk log is not empty, then * the first entry must be a configuration entry. */ diff --git a/test/integration/test_replication.c b/test/integration/test_replication.c index 258f64a37..124fc8095 100644 --- a/test/integration/test_replication.c +++ b/test/integration/test_replication.c @@ -1,3 +1,4 @@ +#include "../../src/configuration.h" #include "../../src/progress.h" #include "../lib/cluster.h" #include "../lib/runner.h" @@ -525,6 +526,176 @@ TEST(replication, recvPrevLogTermMismatch, setUp, tearDown, 0, NULL) return MUNIT_OK; } +static void assert_config(struct raft* raft, struct raft_configuration *expected) +{ + struct raft_configuration *actual; + actual = &raft->configuration; + + munit_assert_uint(actual->n, ==, expected->n); + for (unsigned i = 0; i < actual->n; i++) { + munit_assert_ulong(actual->servers[i].id, ==, expected->servers[i].id); + munit_assert_int(actual->servers[i].role, + ==, + expected->servers[i].role); + munit_assert_string_equal(actual->servers[i].address, + expected->servers[i].address); + } +} + +/* If the term of the last log entry on the server is different from the one + * prevLogTerm, and value of prevLogIndex is greater than server's commit + * index (i.e. this is a normal inconsistency), we reject the request. This time + * it's a configuration entry. */ +TEST(replication, recvPrevLogTermMismatchConfiguration, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + struct raft_entry entry1; + struct raft_entry entry2; + struct raft_configuration base; /* For assertion purposes. */ + struct raft_configuration conf; + CLUSTER_BOOTSTRAP; + CLUSTER_CONFIGURATION(&base); + + /* The servers have an entry with a conflicting term. */ + entry1.type = RAFT_COMMAND; + entry1.term = 2; + FsmEncodeSetX(1, &entry1.buf); + CLUSTER_ADD_ENTRY(0, &entry1); + + entry2.type = RAFT_CHANGE; + entry2.term = 1; + + /* Add a different config to the log that will be rolled back. */ + CLUSTER_CONFIGURATION(&conf); + raft_configuration_add(&conf, 3, "3", 2); + raft_configuration_encode(&conf, &entry2.buf); + raft_configuration_close(&conf); + CLUSTER_ADD_ENTRY(1, &entry2); + + CLUSTER_START; + CLUSTER_ELECT(0); + + /* The follower eventually replicates the entry */ + CLUSTER_STEP_UNTIL_APPLIED(1, 2, 3000); + + /* Ensure config is rolled back. */ + assert_config(CLUSTER_RAFT(0), &base); + assert_config(CLUSTER_RAFT(1), &base); + raft_configuration_close(&base); + return MUNIT_OK; +} + +/* If the term of the last log entry on the server is different from the one + * prevLogTerm, and value of prevLogIndex is greater than server's commit + * index (i.e. this is a normal inconsistency), we reject the request. This time + * it's a configuration entry, there's also an older configuration entry + * present. */ +TEST(replication, recvPrevLogTermMismatchConfigurationWithPrevious, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + struct raft_entry entry1; + struct raft_entry entry2; + struct raft_entry entry3; + struct raft_entry entry4; + struct raft_configuration base; /* For assertion purposes. */ + struct raft_configuration conf; + CLUSTER_BOOTSTRAP; + CLUSTER_CONFIGURATION(&base); + + /* The servers have a matching configuration entry. */ + entry1.type = RAFT_CHANGE; + entry1.term = 1; + CLUSTER_CONFIGURATION(&conf); + raft_configuration_encode(&conf, &entry1.buf); + CLUSTER_ADD_ENTRY(0, &entry1); + + entry2.type = RAFT_CHANGE; + entry2.term = 1; + raft_configuration_encode(&conf, &entry2.buf); + CLUSTER_ADD_ENTRY(1, &entry2); + + /* The servers have an entry with a conflicting term. */ + entry3.type = RAFT_COMMAND; + entry3.term = 2; + FsmEncodeSetX(1, &entry3.buf); + CLUSTER_ADD_ENTRY(0, &entry3); + + entry4.type = RAFT_CHANGE; + entry4.term = 1; + /* Add a different config to the log that will be rolled back. */ + raft_configuration_add(&conf, 3, "3", 2); + raft_configuration_encode(&conf, &entry4.buf); + raft_configuration_close(&conf); + CLUSTER_ADD_ENTRY(1, &entry4); + + CLUSTER_START; + CLUSTER_ELECT(0); + + /* The follower eventually replicates the entry */ + CLUSTER_STEP_UNTIL_APPLIED(1, 3, 3000); + + /* Ensure config is rolled back. */ + assert_config(CLUSTER_RAFT(0), &base); + assert_config(CLUSTER_RAFT(1), &base); + raft_configuration_close(&base); + return MUNIT_OK; +} + +/* If the term of the last log entry on the server is different from the one + * prevLogTerm, and value of prevLogIndex is greater than server's commit + * index (i.e. this is a normal inconsistency), we reject the request. This time + * it's a configuration entry and there is a snapshot, there's no previous + * configuration entry in the log. */ +TEST(replication, recvPrevLogTermMismatchConfigurationSnapshotNoPrev, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + struct raft_entry entry1; + struct raft_entry entry2; + struct raft_configuration conf; + struct raft_configuration base; /* For assertion purposes. */ + int rv; + + CLUSTER_CONFIGURATION(&conf); + CLUSTER_CONFIGURATION(&base); + rv = raft_bootstrap(CLUSTER_RAFT(0), &conf); + munit_assert_int(rv, ==, 0); + + /* The second server has a snapshot up to entry 1 */ + CLUSTER_SET_SNAPSHOT(1 /* */, + 1 /* last index */, + 1 /* last term */, + 1 /* conf index */, + 5 /* x */, + 0 /* y */); + CLUSTER_SET_TERM(1,1); + + /* The servers have an entry with a conflicting term. */ + entry1.type = RAFT_COMMAND; + entry1.term = 3; + FsmEncodeSetX(1, &entry1.buf); + CLUSTER_ADD_ENTRY(0, &entry1); + + /* Add a different config to the log that will be rolled back. */ + entry2.type = RAFT_CHANGE; + entry2.term = 2; + raft_configuration_add(&conf, 3, "3", 2); + raft_configuration_encode(&conf, &entry2.buf); + raft_configuration_close(&conf); + CLUSTER_ADD_ENTRY(1, &entry2); + + CLUSTER_START; + CLUSTER_ELECT(0); + + /* The follower eventually replicates the entry */ + CLUSTER_STEP_UNTIL_APPLIED(1, 3, 3000); + + /* Ensure config is rolled back. */ + assert_config(CLUSTER_RAFT(0), &base); + assert_config(CLUSTER_RAFT(1), &base); + raft_configuration_close(&base); + return MUNIT_OK; +} + /* If any of the new entry has the same index of an existing entry in our log, * but different term, and that entry index is already committed, we bail out * with an error. */