Skip to content

Commit

Permalink
[fix][store] The mono_store_engine's region supports split and merge
Browse files Browse the repository at this point in the history
  • Loading branch information
visualYJD authored and ketor committed May 28, 2024
1 parent 0802cc1 commit 4ddba44
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 35 deletions.
1 change: 1 addition & 0 deletions proto/error.proto
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ enum Errno {
EWATCH_NOT_EXIST = 40033;
EMERGE_STORE_ENGINE_NOT_MATCH = 40034;
EMERGE_RAW_ENGINE_NOT_MATCH = 40035;
ECHANGE_PEER_STORE_ENGINE_NOT_MATCH = 40036;

// raft [50000, 60000)
ERAFT_INIT = 50000;
Expand Down
31 changes: 24 additions & 7 deletions src/coordinator/coordinator_control_coor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2659,7 +2659,9 @@ butil::Status CoordinatorControl::SplitRegionWithTaskList(int64_t split_from_reg
auto vector_index_version = region_metrics.vector_index_status().last_build_epoch_version();
auto region_version = split_from_region.definition().epoch().version();

if (region_version != vector_index_version) {
// The mono store engine does not use snapshots, so the vector_index_version constantly remains zero.
if (region_version != vector_index_version &&
split_from_region.definition().store_engine() == pb::common::STORE_ENG_RAFT_STORE) {
DINGO_LOG(ERROR) << "SplitRegion split_from_region vector index epoch is not equal to region epoch, "
"split_from_region_id = "
<< split_from_region_id << " from_state=" << split_from_region.state()
Expand Down Expand Up @@ -2963,8 +2965,8 @@ butil::Status CoordinatorControl::MergeRegionWithTaskList(int64_t merge_from_reg
merge_from_region.definition().index_parameter().has_vector_index_parameter()) {
auto vector_index_version = merge_from_region_metrics.vector_index_status().last_build_epoch_version();
auto region_version = merge_from_region.definition().epoch().version();

if (region_version != vector_index_version) {
// The mono store engine does not use snapshots, so the vector_index_version constantly remains zero.
if (region_version != vector_index_version && merge_from_region.definition().store_engine() == pb::common::STORE_ENG_RAFT_STORE) {
DINGO_LOG(ERROR) << "MergeRegion merge_from_region vector index epoch is not equal to region epoch, "
"merge_from_region_id = "
<< merge_from_region_id << " from_state=" << merge_from_region.state()
Expand Down Expand Up @@ -3049,7 +3051,11 @@ butil::Status CoordinatorControl::ChangePeerRegionWithTaskList(
DINGO_LOG(ERROR) << "ChangePeerRegion region not exists, id = " << region_id;
return butil::Status(pb::error::Errno::EREGION_NOT_FOUND, "ChangePeerRegion region not exists");
}

if (region.definition().store_engine() != pb::common::STORE_ENG_RAFT_STORE) {
DINGO_LOG(ERROR) << "ChangePeerRegion, region_id" << region_id << "store engine not match";
return butil::Status(pb::error::Errno::ECHANGE_PEER_STORE_ENGINE_NOT_MATCH,
"ChangePeerRegion region store engine not match");
}
// validate region has NORMAL status
auto region_status = GetRegionStatus(region_id);
if (region.state() != ::dingodb::pb::common::RegionState::REGION_NORMAL ||
Expand Down Expand Up @@ -3313,7 +3319,11 @@ butil::Status CoordinatorControl::TransferLeaderRegionWithTaskList(
<< region_id;
return butil::Status(pb::error::Errno::EREGION_STATE, "TransferLeaderRegion region.state() != REGION_NORMAL");
}

if (region.definition().store_engine() != pb::common::STORE_ENG_RAFT_STORE) {
DINGO_LOG(ERROR) << "TransferLeaderRegion, region_id" << region_id << "store engine not match";
return butil::Status(pb::error::Errno::ECHANGE_PEER_STORE_ENGINE_NOT_MATCH,
"TransferLeaderRegion region store engine not match");
}
auto region_status = GetRegionStatus(region_id);
if (region_status.heartbeat_status() != pb::common::RegionHeartbeatState::REGION_ONLINE) {
DINGO_LOG(ERROR) << "TransferLeaderRegion region.heartbeat_state() "
Expand Down Expand Up @@ -5352,15 +5362,22 @@ bool CoordinatorControl::DoTaskPreCheck(const pb::coordinator::TaskPreCheck& tas
check_passed = false;
}
}

pb::coordinator_internal::RegionInternal region_internal;
int ret = region_map_.Get(store_region_check.region_id(), region_internal);
if (ret < 0) {
DINGO_LOG(INFO) << "check vector_index faild, region_id= " << store_region_check.region_id() << "not exists";
return false;
}
if (store_region_check.vector_index_version() > 0) {
if (!store_region_metrics.has_vector_index_status()) {
DINGO_LOG(INFO) << "check vector_index faild, region.has_vector_index_status() is false, can't do check, wait "
"for heartbeat. store_id="
<< store_region_check.store_id() << ", region_id=" << store_region_check.region_id();
check_passed = false;
} else if (store_region_check.vector_index_version() !=
store_region_metrics.vector_index_status().last_build_epoch_version()) {
store_region_metrics.vector_index_status().last_build_epoch_version() &&
region_internal.definition().store_engine() == pb::common::STORE_ENG_RAFT_STORE) {
// The mono store engine does not use snapshots, so the vector_index_version constantly remains zero.
DINGO_LOG(INFO) << "check vector_index failed, region_check.vector_index_version()="
<< store_region_check.vector_index_version()
<< " region.vector_index_status().vector_index_version()="
Expand Down
55 changes: 32 additions & 23 deletions src/handler/raft_apply_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,14 @@ int DeleteBatchHandler::Handle(std::shared_ptr<Context> ctx, store::RegionPtr re

static void LaunchAyncSaveSnapshot(store::RegionPtr region) { // NOLINT
auto store_region_meta = GET_STORE_REGION_META;
if (region->GetStoreEngineType() == pb::common::STORE_ENG_MONO_STORE) {
if (region->Type() == pb::common::STORE_REGION) {
store_region_meta->UpdateTemporaryDisableChange(region, false);
}
return;
}
store_region_meta->UpdateNeedBootstrapDoSnapshot(region, true);
auto engine = Server::GetInstance().GetEngine();
auto engine = Server::GetInstance().GetEngine(region->GetStoreEngineType());

bool is_success = false;
for (int i = 0; i < Constant::kSplitDoSnapshotRetryTimes; ++i) {
Expand Down Expand Up @@ -218,7 +224,6 @@ void SplitHandler::SplitClosure::Run() {
DINGO_LOG(INFO) << fmt::format("[split.spliting][region({})] not found region.", region_id_);
return;
}

if (!status().ok()) {
DINGO_LOG(WARNING) << fmt::format("[split.spliting][region({})] finish snapshot failed, error: {}", region_id_,
status().error_str());
Expand Down Expand Up @@ -700,9 +705,12 @@ static void LaunchCommitMergeCommand(const pb::raft::PrepareMergeRequest &reques
assert(storage != nullptr);

uint64_t start_time = Helper::TimestampMs();
// Generate LogEntry.
auto log_entries =
GetRaftLogEntries(source_region_definition.id(), request.min_applied_log_id() + 1, prepare_merge_log_id);
std::vector<pb::raft::LogEntry> log_entries;
if (target_region->GetStoreEngineType() == pb::common::STORE_ENG_RAFT_STORE) {
// Generate LogEntry.
log_entries =
GetRaftLogEntries(source_region_definition.id(), request.min_applied_log_id() + 1, prepare_merge_log_id);
}

// Timing commit CommitMerge command to target region
// Just target region leader node will success
Expand All @@ -716,7 +724,7 @@ static void LaunchCommitMergeCommand(const pb::raft::PrepareMergeRequest &reques
auto ctx = std::make_shared<Context>();
ctx->SetRegionId(request.target_region_id());
ctx->SetRegionEpoch(request.target_region_epoch());

ctx->SetStoreEngineType(target_region->GetStoreEngineType());
// Try to commit local target region raft.
auto status =
storage->CommitMerge(ctx, request.job_id(), source_region_definition, prepare_merge_log_id, log_entries);
Expand Down Expand Up @@ -865,24 +873,25 @@ int CommitMergeHandler::Handle(std::shared_ptr<Context>, store::RegionPtr target
return 0;
}
ADD_REGION_CHANGE_RECORD_TIMEPOINT(request.job_id(), "Apply target region CommitMerge");

// Catch up apply source region raft log.
auto node = raft_store_engine->GetNode(source_region->Id());
if (node == nullptr) {
DINGO_LOG(FATAL) << fmt::format("[merge.merging][job_id({}).region({}/{})] Not found source node.",
request.job_id(), request.source_region_id(), target_region->Id());
return 0;
}
auto state_machine = std::dynamic_pointer_cast<StoreStateMachine>(node->GetStateMachine());
if (state_machine == nullptr) {
DINGO_LOG(FATAL) << fmt::format("[merge.merging][job_id({}).region({}/{})] Not found source state machine.",
request.job_id(), request.source_region_id(), target_region->Id());
return 0;
}

int32_t actual_apply_log_count = 0;
if (!request.entries().empty()) {
actual_apply_log_count = state_machine->CatchUpApplyLog(Helper::PbRepeatedToVector(request.entries()));
if (target_region->GetStoreEngineType() == pb::common::STORE_ENG_RAFT_STORE) {
// Catch up apply source region raft log.
auto node = raft_store_engine->GetNode(source_region->Id());
if (node == nullptr) {
DINGO_LOG(FATAL) << fmt::format("[merge.merging][job_id({}).region({}/{})] Not found source node.",
request.job_id(), request.source_region_id(), target_region->Id());
return 0;
}
auto state_machine = std::dynamic_pointer_cast<StoreStateMachine>(node->GetStateMachine());
if (state_machine == nullptr) {
DINGO_LOG(FATAL) << fmt::format("[merge.merging][job_id({}).region({}/{})] Not found source state machine.",
request.job_id(), request.source_region_id(), target_region->Id());
return 0;
}

if (!request.entries().empty()) {
actual_apply_log_count = state_machine->CatchUpApplyLog(Helper::PbRepeatedToVector(request.entries()));
}
}

FAIL_POINT("before_commit_merge_modify_epoch");
Expand Down
11 changes: 11 additions & 0 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,17 @@ std::shared_ptr<RawEngine> Server::GetRawEngine(pb::common::RawEngine type) {
assert(raft_engine_ != nullptr);
return raft_engine_->GetRawEngine(type);
}
std::shared_ptr<Engine> Server::GetEngine(pb::common::StorageEngine store_engine_type) {
if (store_engine_type == pb::common::StorageEngine::STORE_ENG_RAFT_STORE) {
assert(raft_engine_ != nullptr);
return raft_engine_;
} else if (store_engine_type == pb::common::StorageEngine::STORE_ENG_MONO_STORE) {
assert(rocks_engine_ != nullptr);
return rocks_engine_;
}
DINGO_LOG(FATAL) << fmt::format("GetEngine not support sotre engine:{}", pb::common::StorageEngine_Name(store_engine_type));
return nullptr;
}

std::shared_ptr<RaftStoreEngine> Server::GetRaftStoreEngine() {
auto engine = GetEngine();
Expand Down
1 change: 1 addition & 0 deletions src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class Server {

std::shared_ptr<Engine> GetEngine();
std::shared_ptr<RawEngine> GetRawEngine(pb::common::RawEngine type);
std::shared_ptr<Engine> GetEngine(pb::common::StorageEngine store_engine_type);

std::shared_ptr<RaftStoreEngine> GetRaftStoreEngine();

Expand Down
23 changes: 18 additions & 5 deletions src/store/region_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,9 @@ butil::Status SplitRegionTask::ValidateSplitRegion(std::shared_ptr<StoreRegionMe
if (parent_region->State() != pb::common::NORMAL) {
return butil::Status(pb::error::EREGION_STATE, "Parent region state is NORMAL, not allow split.");
}

if (parent_region->GetStoreEngineType() != pb::common::STORE_ENG_RAFT_STORE) {
return butil::Status();
}
auto raft_store_engine = Server::GetInstance().GetRaftStoreEngine();
if (raft_store_engine == nullptr) {
return butil::Status(pb::error::EINTERNAL, "Not found raft store engine");
Expand Down Expand Up @@ -519,8 +521,10 @@ butil::Status SplitRegionTask::SplitRegion() {
auto ctx = std::make_shared<Context>();
ctx->SetRegionId(region_cmd_->split_request().split_from_region_id());
ctx->SetRegionEpoch(parent_region->Epoch());
status = Server::GetInstance().GetEngine()->Write(
ctx, WriteDataBuilder::BuildWrite(region_cmd_->job_id(), region_cmd_->split_request(), parent_region->Epoch()));
status = Server::GetInstance()
.GetEngine(parent_region->GetStoreEngineType())
->Write(ctx, WriteDataBuilder::BuildWrite(region_cmd_->job_id(), region_cmd_->split_request(),
parent_region->Epoch()));
DINGO_LOG_IF(ERROR, !status.ok()) << fmt::format("[control.region][region()] commit split command failed, error: {}",
status.error_str());

Expand Down Expand Up @@ -663,6 +667,10 @@ butil::Status MergeRegionTask::ValidateMergeRegion(std::shared_ptr<StoreRegionMe
return butil::Status(pb::error::EMERGE_PEER_NOT_MATCH, "Peers is differencce.");
}

if (source_region->GetStoreEngineType() != pb::common::STORE_ENG_RAFT_STORE) {
return butil::Status();
}

// Check source region follower commit log progress.
auto raft_store_engine = Server::GetInstance().GetRaftStoreEngine();
if (raft_store_engine == nullptr) {
Expand Down Expand Up @@ -902,8 +910,10 @@ butil::Status ChangeRegionTask::ValidateChangeRegion(std::shared_ptr<StoreMetaMa
if (region->TemporaryDisableChange()) {
return butil::Status(pb::error::EREGION_DISABLE_CHANGE, "Temporary disable region change.");
}

return CheckLeader(region_definition.id());
if (region->GetStoreEngineType() == pb::common::STORE_ENG_RAFT_STORE) {
return CheckLeader(region_definition.id());
}
return butil::Status();
}

butil::Status ChangeRegionTask::ChangeRegion(std::shared_ptr<Context> ctx, RegionCmdPtr command) {
Expand Down Expand Up @@ -1361,6 +1371,9 @@ butil::Status HoldVectorIndexTask::ValidateHoldVectorIndex(int64_t region_id) {
if (region == nullptr) {
return butil::Status(pb::error::EREGION_NOT_FOUND, fmt::format("Not found region {}", region_id));
}
if (region->GetStoreEngineType() != pb::common::STORE_ENG_RAFT_STORE) {
return butil::Status();
}
// Validate is follower
auto raft_store_engine = Server::GetInstance().GetRaftStoreEngine();
if (raft_store_engine != nullptr) {
Expand Down

0 comments on commit 4ddba44

Please sign in to comment.