Skip to content

Commit

Permalink
[fix][store] Fixup change peer tasks earily end issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
rock-git authored and ketor committed Jan 18, 2024
1 parent 0983df7 commit 7bcabe8
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 16 deletions.
2 changes: 2 additions & 0 deletions src/coordinator/coordinator_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,8 @@ class CoordinatorControl : public MetaControl {
void AddLoadVectorIndexTask(pb::coordinator::TaskList *task_list, int64_t store_id, int64_t region_id,
pb::coordinator_internal::MetaIncrement &meta_increment);
static void AddCheckStoreRegionTask(pb::coordinator::TaskList *task_list, int64_t store_id, int64_t region_id);
static void AddCheckChangePeerResultTask(pb::coordinator::TaskList *task_list, int64_t region_id,
const pb::common::RegionDefinition &region_definition);
static void AddCheckMergeResultTask(pb::coordinator::TaskList *task_list, int64_t merge_to_region_id,
const pb::common::Range &range);
static void AddCheckTombstoneRegionTask(pb::coordinator::TaskList *task_list, int64_t store_id, int64_t region_id);
Expand Down
12 changes: 12 additions & 0 deletions src/coordinator/coordinator_control_coor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2927,6 +2927,8 @@ butil::Status CoordinatorControl::ChangePeerRegionWithTaskList(
AddDeleteTaskWithCheck(increment_task_list, new_store_ids_diff_less.at(0), region_id, new_region_definition.peers(),
meta_increment);

AddCheckChangePeerResultTask(increment_task_list, region_id, new_region_definition);

// this is purge_region task
// AddPurgeTask(increment_task_list, new_store_ids_diff_less.at(0),
// region_id, meta_increment);
Expand Down Expand Up @@ -3000,6 +3002,8 @@ butil::Status CoordinatorControl::ChangePeerRegionWithTaskList(
// this is change peer task
AddChangePeerTask(increment_task_list, leader_store_id, region_id, new_region_definition, meta_increment);

AddCheckChangePeerResultTask(increment_task_list, region_id, new_region_definition);

} else {
DINGO_LOG(ERROR) << "ChangePeerRegion new_store_ids not match, region_id = " << region_id;
return butil::Status(pb::error::Errno::EILLEGAL_PARAMTETERS, "ChangePeerRegion new_store_ids not match");
Expand Down Expand Up @@ -4771,6 +4775,14 @@ void CoordinatorControl::AddCheckStoreRegionTask(pb::coordinator::TaskList* task
region_check->mutable_store_region_check()->set_region_id(region_id);
}

void CoordinatorControl::AddCheckChangePeerResultTask(pb::coordinator::TaskList* task_list, int64_t region_id,
const pb::common::RegionDefinition& region_definition) {
auto* pre_check = task_list->add_tasks()->mutable_pre_check();
pre_check->set_type(pb::coordinator::TaskPreCheckType::REGION_CHECK);
pre_check->mutable_region_check()->set_region_id(region_id);
*(pre_check->mutable_region_check()->mutable_peers()) = region_definition.peers();
}

bool CoordinatorControl::DoTaskPreCheck(const pb::coordinator::TaskPreCheck& task_pre_check) {
if (task_pre_check.type() == pb::coordinator::TaskPreCheckType::REGION_CHECK) {
pb::coordinator_internal::RegionInternal region;
Expand Down
20 changes: 4 additions & 16 deletions src/store/heartbeat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,15 @@ void HeartbeatTask::SendStoreHeartbeat(std::shared_ptr<CoordinatorInteraction> c
auto store_meta_manager = Server::GetInstance().GetStoreMetaManager();

request.set_self_storemap_epoch(store_meta_manager->GetStoreServerMeta()->GetEpoch());
// request.set_self_regionmap_epoch(store_meta_manager->GetStoreRegionMeta()->GetEpoch());

// store
// CAUTION: may coredump here, so we cannot delete self store meta.
*(request.mutable_store()) = (*store_meta_manager->GetStoreServerMeta()->GetStore(Server::GetInstance().Id()));

// only partial heartbeat or heartbeat_counter % FLAGS_store_heartbeat_report_region_multiple == 0 will report
// region_metrics, this is for reduce heartbeat size and cpu usage.
bool need_report_region_metrics = false;
if (region_ids.empty()) {
heartbeat_counter++;
if (heartbeat_counter % FLAGS_store_heartbeat_report_region_multiple == 0) {
need_report_region_metrics = true;
}
} else {
need_report_region_metrics = true;
}

bool need_report_region_metrics =
!region_ids.empty() || (++heartbeat_counter % FLAGS_store_heartbeat_report_region_multiple == 0);
if (need_report_region_metrics) {
DINGO_LOG(INFO) << fmt::format("[heartbeat.store] heartbeat_counter: {}", heartbeat_counter);

Expand Down Expand Up @@ -246,21 +237,18 @@ void HeartbeatTask::HandleStoreHeartbeatResponse(std::shared_ptr<dingodb::StoreM
auto remote_stores = response.storemap().stores();

auto new_stores = GetNewStore(local_stores, remote_stores);
DINGO_LOG(INFO) << fmt::format("[heartbeat.store] new store size: {} / {}", new_stores.size(), local_stores.size());
for (const auto& store : new_stores) {
store_server_meta->AddStore(store);
}

auto changed_stores = GetChangedStore(local_stores, remote_stores);
DINGO_LOG(INFO) << fmt::format("[heartbeat.store] changed store size: {} / {}", changed_stores.size(),
local_stores.size());
for (const auto& store : changed_stores) {
store_server_meta->UpdateStore(store);
}

auto deleted_stores = GetDeletedStore(local_stores, remote_stores);
DINGO_LOG(INFO) << fmt::format("[heartbeat.store] deleted store size: {} / {}", deleted_stores.size(),
local_stores.size());
DINGO_LOG(INFO) << fmt::format("[heartbeat.store] store stats new({}) change({}) delete({}) local({})",
new_stores.size(), changed_stores.size(), deleted_stores.size(), local_stores.size());
for (const auto& store : deleted_stores) {
// if deleted store is self, skip, else will coredump in next heartbeat.
if (store->id() == Server::GetInstance().Id()) {
Expand Down

0 comments on commit 7bcabe8

Please sign in to comment.