Skip to content

Commit

Permalink
fix(cluster): Wait before all access to slot migrations (#3180)
Browse files Browse the repository at this point in the history
* fix(cluster): Wait before all access to slot migrations

* refactor

* reduce lock scope
  • Loading branch information
chakaz authored Jun 16, 2024
1 parent 32844e0 commit 664cba9
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 22 deletions.
1 change: 0 additions & 1 deletion src/server/cluster/cluster_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ enum class MigrationState : uint8_t {
C_SYNC,
C_ERROR,
C_FINISHED,
C_MAX_INVALID = std::numeric_limits<uint8_t>::max()
};

SlotId KeySlot(std::string_view key);
Expand Down
2 changes: 0 additions & 2 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -660,8 +660,6 @@ static string_view StateToStr(MigrationState state) {
return "ERROR"sv;
case MigrationState::C_FINISHED:
return "FINISHED"sv;
case MigrationState::C_MAX_INVALID:
break;
}
DCHECK(false) << "Unknown State value " << static_cast<underlying_type_t<MigrationState>>(state);
return "UNDEFINED_STATE"sv;
Expand Down
56 changes: 37 additions & 19 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,37 @@ bool OutgoingMigration::ChangeState(MigrationState new_state) {
}

void OutgoingMigration::Finish(bool is_error) {
const auto new_state = is_error ? MigrationState::C_ERROR : MigrationState::C_FINISHED;
if (!ChangeState(new_state)) {
return;
bool should_cancel_flows = false;

{
std::lock_guard lk(state_mu_);
switch (state_) {
case MigrationState::C_FINISHED:
return; // Already finished, nothing else to do

case MigrationState::C_NO_STATE:
case MigrationState::C_CONNECTING:
should_cancel_flows = false;
break;

case MigrationState::C_SYNC:
case MigrationState::C_ERROR:
should_cancel_flows = true;
break;
}

state_ = is_error ? MigrationState::C_ERROR : MigrationState::C_FINISHED;
}

shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
auto& flow = slot_migrations_[shard->shard_id()];
CHECK(flow != nullptr);
flow->Cancel();
}
});
if (should_cancel_flows) {
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
auto& flow = slot_migrations_[shard->shard_id()];
CHECK(flow != nullptr);
flow->Cancel();
}
});
}
}

MigrationState OutgoingMigration::GetState() const {
Expand Down Expand Up @@ -174,10 +193,6 @@ void OutgoingMigration::SyncFb() {
continue;
}

if (!ChangeState(MigrationState::C_SYNC)) {
break;
}

shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (auto* shard = EngineShard::tlocal(); shard) {
server_family_->journal()->StartInThread();
Expand All @@ -186,13 +201,16 @@ void OutgoingMigration::SyncFb() {
}
});

// Start migrations in a separate hop to make sure that Finish() is called only after all
// migrations are created (see #3139)
if (!ChangeState(MigrationState::C_SYNC)) {
break;
}

shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (auto* shard = EngineShard::tlocal(); shard) {
auto& migration = *slot_migrations_[shard->shard_id()];
migration.Sync(cf_->MyID(), shard->shard_id());
if (migration.GetError()) {
auto& migration = slot_migrations_[shard->shard_id()];
CHECK(migration != nullptr);
migration->Sync(cf_->MyID(), shard->shard_id());
if (migration->GetError()) {
Finish(true);
}
}
Expand Down

0 comments on commit 664cba9

Please sign in to comment.