From 4ddba443705e6e3002f9e13a1ee03e6a0a91fe24 Mon Sep 17 00:00:00 2001 From: yangjundong <1047934838@qq.com> Date: Thu, 23 May 2024 01:58:21 +0000 Subject: [PATCH] [fix][store] The mono_store_engine's region supports split and merge --- proto/error.proto | 1 + src/coordinator/coordinator_control_coor.cc | 31 +++++++++--- src/handler/raft_apply_handler.cc | 55 ++++++++++++--------- src/server/server.cc | 11 +++++ src/server/server.h | 1 + src/store/region_controller.cc | 23 +++++++-- 6 files changed, 87 insertions(+), 35 deletions(-) diff --git a/proto/error.proto b/proto/error.proto index f8df8fcf4..5aaf076b0 100644 --- a/proto/error.proto +++ b/proto/error.proto @@ -151,6 +151,7 @@ enum Errno { EWATCH_NOT_EXIST = 40033; EMERGE_STORE_ENGINE_NOT_MATCH = 40034; EMERGE_RAW_ENGINE_NOT_MATCH = 40035; + ECHANGE_PEER_STORE_ENGINE_NOT_MATCH = 40036; // raft [50000, 60000) ERAFT_INIT = 50000; diff --git a/src/coordinator/coordinator_control_coor.cc b/src/coordinator/coordinator_control_coor.cc index 169c32b87..d82686a07 100644 --- a/src/coordinator/coordinator_control_coor.cc +++ b/src/coordinator/coordinator_control_coor.cc @@ -2659,7 +2659,9 @@ butil::Status CoordinatorControl::SplitRegionWithTaskList(int64_t split_from_reg auto vector_index_version = region_metrics.vector_index_status().last_build_epoch_version(); auto region_version = split_from_region.definition().epoch().version(); - if (region_version != vector_index_version) { + // The mono store engine does not use snapshots, so the vector_index_version constantly remains zero. + if (region_version != vector_index_version && + split_from_region.definition().store_engine() == pb::common::STORE_ENG_RAFT_STORE) { DINGO_LOG(ERROR) << "SplitRegion split_from_region vector index epoch is not equal to region epoch, " "split_from_region_id = " << split_from_region_id << " from_state=" << split_from_region.state() @@ -2963,8 +2965,8 @@ butil::Status CoordinatorControl::MergeRegionWithTaskList(int64_t merge_from_reg merge_from_region.definition().index_parameter().has_vector_index_parameter()) { auto vector_index_version = merge_from_region_metrics.vector_index_status().last_build_epoch_version(); auto region_version = merge_from_region.definition().epoch().version(); - - if (region_version != vector_index_version) { + // The mono store engine does not use snapshots, so the vector_index_version constantly remains zero. + if (region_version != vector_index_version && merge_from_region.definition().store_engine() == pb::common::STORE_ENG_RAFT_STORE) { DINGO_LOG(ERROR) << "MergeRegion merge_from_region vector index epoch is not equal to region epoch, " "merge_from_region_id = " << merge_from_region_id << " from_state=" << merge_from_region.state() @@ -3049,7 +3051,11 @@ butil::Status CoordinatorControl::ChangePeerRegionWithTaskList( DINGO_LOG(ERROR) << "ChangePeerRegion region not exists, id = " << region_id; return butil::Status(pb::error::Errno::EREGION_NOT_FOUND, "ChangePeerRegion region not exists"); } - + if (region.definition().store_engine() != pb::common::STORE_ENG_RAFT_STORE) { + DINGO_LOG(ERROR) << "ChangePeerRegion, region_id" << region_id << "store engine not match"; + return butil::Status(pb::error::Errno::ECHANGE_PEER_STORE_ENGINE_NOT_MATCH, + "ChangePeerRegion region store engine not match"); + } // validate region has NORMAL status auto region_status = GetRegionStatus(region_id); if (region.state() != ::dingodb::pb::common::RegionState::REGION_NORMAL || @@ -3313,7 +3319,11 @@ butil::Status CoordinatorControl::TransferLeaderRegionWithTaskList( << region_id; return butil::Status(pb::error::Errno::EREGION_STATE, "TransferLeaderRegion region.state() != REGION_NORMAL"); } - + if (region.definition().store_engine() != pb::common::STORE_ENG_RAFT_STORE) { + DINGO_LOG(ERROR) << "TransferLeaderRegion, region_id" << region_id << "store engine not match"; + return butil::Status(pb::error::Errno::ECHANGE_PEER_STORE_ENGINE_NOT_MATCH, + "TransferLeaderRegion region store engine not match"); + } auto region_status = GetRegionStatus(region_id); if (region_status.heartbeat_status() != pb::common::RegionHeartbeatState::REGION_ONLINE) { DINGO_LOG(ERROR) << "TransferLeaderRegion region.heartbeat_state() " @@ -5352,7 +5362,12 @@ bool CoordinatorControl::DoTaskPreCheck(const pb::coordinator::TaskPreCheck& tas check_passed = false; } } - + pb::coordinator_internal::RegionInternal region_internal; + int ret = region_map_.Get(store_region_check.region_id(), region_internal); + if (ret < 0) { + DINGO_LOG(INFO) << "check vector_index faild, region_id= " << store_region_check.region_id() << "not exists"; + return false; + } if (store_region_check.vector_index_version() > 0) { if (!store_region_metrics.has_vector_index_status()) { DINGO_LOG(INFO) << "check vector_index faild, region.has_vector_index_status() is false, can't do check, wait " @@ -5360,7 +5375,9 @@ bool CoordinatorControl::DoTaskPreCheck(const pb::coordinator::TaskPreCheck& tas << store_region_check.store_id() << ", region_id=" << store_region_check.region_id(); check_passed = false; } else if (store_region_check.vector_index_version() != - store_region_metrics.vector_index_status().last_build_epoch_version()) { + store_region_metrics.vector_index_status().last_build_epoch_version() && + region_internal.definition().store_engine() == pb::common::STORE_ENG_RAFT_STORE) { + // The mono store engine does not use snapshots, so the vector_index_version constantly remains zero. DINGO_LOG(INFO) << "check vector_index failed, region_check.vector_index_version()=" << store_region_check.vector_index_version() << " region.vector_index_status().vector_index_version()=" diff --git a/src/handler/raft_apply_handler.cc b/src/handler/raft_apply_handler.cc index e00e76056..d67eb7e30 100644 --- a/src/handler/raft_apply_handler.cc +++ b/src/handler/raft_apply_handler.cc @@ -185,8 +185,14 @@ int DeleteBatchHandler::Handle(std::shared_ptr ctx, store::RegionPtr re static void LaunchAyncSaveSnapshot(store::RegionPtr region) { // NOLINT auto store_region_meta = GET_STORE_REGION_META; + if (region->GetStoreEngineType() == pb::common::STORE_ENG_MONO_STORE) { + if (region->Type() == pb::common::STORE_REGION) { + store_region_meta->UpdateTemporaryDisableChange(region, false); + } + return; + } store_region_meta->UpdateNeedBootstrapDoSnapshot(region, true); - auto engine = Server::GetInstance().GetEngine(); + auto engine = Server::GetInstance().GetEngine(region->GetStoreEngineType()); bool is_success = false; for (int i = 0; i < Constant::kSplitDoSnapshotRetryTimes; ++i) { @@ -218,7 +224,6 @@ void SplitHandler::SplitClosure::Run() { DINGO_LOG(INFO) << fmt::format("[split.spliting][region({})] not found region.", region_id_); return; } - if (!status().ok()) { DINGO_LOG(WARNING) << fmt::format("[split.spliting][region({})] finish snapshot failed, error: {}", region_id_, status().error_str()); @@ -700,9 +705,12 @@ static void LaunchCommitMergeCommand(const pb::raft::PrepareMergeRequest &reques assert(storage != nullptr); uint64_t start_time = Helper::TimestampMs(); - // Generate LogEntry. - auto log_entries = - GetRaftLogEntries(source_region_definition.id(), request.min_applied_log_id() + 1, prepare_merge_log_id); + std::vector log_entries; + if (target_region->GetStoreEngineType() == pb::common::STORE_ENG_RAFT_STORE) { + // Generate LogEntry. + log_entries = + GetRaftLogEntries(source_region_definition.id(), request.min_applied_log_id() + 1, prepare_merge_log_id); + } // Timing commit CommitMerge command to target region // Just target region leader node will success @@ -716,7 +724,7 @@ static void LaunchCommitMergeCommand(const pb::raft::PrepareMergeRequest &reques auto ctx = std::make_shared(); ctx->SetRegionId(request.target_region_id()); ctx->SetRegionEpoch(request.target_region_epoch()); - + ctx->SetStoreEngineType(target_region->GetStoreEngineType()); // Try to commit local target region raft. auto status = storage->CommitMerge(ctx, request.job_id(), source_region_definition, prepare_merge_log_id, log_entries); @@ -865,24 +873,25 @@ int CommitMergeHandler::Handle(std::shared_ptr, store::RegionPtr target return 0; } ADD_REGION_CHANGE_RECORD_TIMEPOINT(request.job_id(), "Apply target region CommitMerge"); - - // Catch up apply source region raft log. - auto node = raft_store_engine->GetNode(source_region->Id()); - if (node == nullptr) { - DINGO_LOG(FATAL) << fmt::format("[merge.merging][job_id({}).region({}/{})] Not found source node.", - request.job_id(), request.source_region_id(), target_region->Id()); - return 0; - } - auto state_machine = std::dynamic_pointer_cast(node->GetStateMachine()); - if (state_machine == nullptr) { - DINGO_LOG(FATAL) << fmt::format("[merge.merging][job_id({}).region({}/{})] Not found source state machine.", - request.job_id(), request.source_region_id(), target_region->Id()); - return 0; - } - int32_t actual_apply_log_count = 0; - if (!request.entries().empty()) { - actual_apply_log_count = state_machine->CatchUpApplyLog(Helper::PbRepeatedToVector(request.entries())); + if (target_region->GetStoreEngineType() == pb::common::STORE_ENG_RAFT_STORE) { + // Catch up apply source region raft log. + auto node = raft_store_engine->GetNode(source_region->Id()); + if (node == nullptr) { + DINGO_LOG(FATAL) << fmt::format("[merge.merging][job_id({}).region({}/{})] Not found source node.", + request.job_id(), request.source_region_id(), target_region->Id()); + return 0; + } + auto state_machine = std::dynamic_pointer_cast(node->GetStateMachine()); + if (state_machine == nullptr) { + DINGO_LOG(FATAL) << fmt::format("[merge.merging][job_id({}).region({}/{})] Not found source state machine.", + request.job_id(), request.source_region_id(), target_region->Id()); + return 0; + } + + if (!request.entries().empty()) { + actual_apply_log_count = state_machine->CatchUpApplyLog(Helper::PbRepeatedToVector(request.entries())); + } } FAIL_POINT("before_commit_merge_modify_epoch"); diff --git a/src/server/server.cc b/src/server/server.cc index be195cf85..0fbccb752 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -924,6 +924,17 @@ std::shared_ptr Server::GetRawEngine(pb::common::RawEngine type) { assert(raft_engine_ != nullptr); return raft_engine_->GetRawEngine(type); } +std::shared_ptr Server::GetEngine(pb::common::StorageEngine store_engine_type) { + if (store_engine_type == pb::common::StorageEngine::STORE_ENG_RAFT_STORE) { + assert(raft_engine_ != nullptr); + return raft_engine_; + } else if (store_engine_type == pb::common::StorageEngine::STORE_ENG_MONO_STORE) { + assert(rocks_engine_ != nullptr); + return rocks_engine_; + } + DINGO_LOG(FATAL) << fmt::format("GetEngine not support sotre engine:{}", pb::common::StorageEngine_Name(store_engine_type)); + return nullptr; +} std::shared_ptr Server::GetRaftStoreEngine() { auto engine = GetEngine(); diff --git a/src/server/server.h b/src/server/server.h index 29fa0cdf2..1937608de 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -140,6 +140,7 @@ class Server { std::shared_ptr GetEngine(); std::shared_ptr GetRawEngine(pb::common::RawEngine type); + std::shared_ptr GetEngine(pb::common::StorageEngine store_engine_type); std::shared_ptr GetRaftStoreEngine(); diff --git a/src/store/region_controller.cc b/src/store/region_controller.cc index 0c54d59e2..451ce0567 100644 --- a/src/store/region_controller.cc +++ b/src/store/region_controller.cc @@ -433,7 +433,9 @@ butil::Status SplitRegionTask::ValidateSplitRegion(std::shared_ptrState() != pb::common::NORMAL) { return butil::Status(pb::error::EREGION_STATE, "Parent region state is NORMAL, not allow split."); } - + if (parent_region->GetStoreEngineType() != pb::common::STORE_ENG_RAFT_STORE) { + return butil::Status(); + } auto raft_store_engine = Server::GetInstance().GetRaftStoreEngine(); if (raft_store_engine == nullptr) { return butil::Status(pb::error::EINTERNAL, "Not found raft store engine"); @@ -519,8 +521,10 @@ butil::Status SplitRegionTask::SplitRegion() { auto ctx = std::make_shared(); ctx->SetRegionId(region_cmd_->split_request().split_from_region_id()); ctx->SetRegionEpoch(parent_region->Epoch()); - status = Server::GetInstance().GetEngine()->Write( - ctx, WriteDataBuilder::BuildWrite(region_cmd_->job_id(), region_cmd_->split_request(), parent_region->Epoch())); + status = Server::GetInstance() + .GetEngine(parent_region->GetStoreEngineType()) + ->Write(ctx, WriteDataBuilder::BuildWrite(region_cmd_->job_id(), region_cmd_->split_request(), + parent_region->Epoch())); DINGO_LOG_IF(ERROR, !status.ok()) << fmt::format("[control.region][region()] commit split command failed, error: {}", status.error_str()); @@ -663,6 +667,10 @@ butil::Status MergeRegionTask::ValidateMergeRegion(std::shared_ptrGetStoreEngineType() != pb::common::STORE_ENG_RAFT_STORE) { + return butil::Status(); + } + // Check source region follower commit log progress. auto raft_store_engine = Server::GetInstance().GetRaftStoreEngine(); if (raft_store_engine == nullptr) { @@ -902,8 +910,10 @@ butil::Status ChangeRegionTask::ValidateChangeRegion(std::shared_ptrTemporaryDisableChange()) { return butil::Status(pb::error::EREGION_DISABLE_CHANGE, "Temporary disable region change."); } - - return CheckLeader(region_definition.id()); + if (region->GetStoreEngineType() == pb::common::STORE_ENG_RAFT_STORE) { + return CheckLeader(region_definition.id()); + } + return butil::Status(); } butil::Status ChangeRegionTask::ChangeRegion(std::shared_ptr ctx, RegionCmdPtr command) { @@ -1361,6 +1371,9 @@ butil::Status HoldVectorIndexTask::ValidateHoldVectorIndex(int64_t region_id) { if (region == nullptr) { return butil::Status(pb::error::EREGION_NOT_FOUND, fmt::format("Not found region {}", region_id)); } + if (region->GetStoreEngineType() != pb::common::STORE_ENG_RAFT_STORE) { + return butil::Status(); + } // Validate is follower auto raft_store_engine = Server::GetInstance().GetRaftStoreEngine(); if (raft_store_engine != nullptr) {