Skip to content

Commit

Permalink
[fix][coordinator] Fix GetTables cannot get all indexes in some
Browse files Browse the repository at this point in the history
situation.

Signed-off-by: Ketor <[email protected]>
  • Loading branch information
ketor authored and rock-git committed Mar 12, 2024
1 parent 9403afc commit 0265380
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 113 deletions.
13 changes: 6 additions & 7 deletions proto/coordinator_internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -257,17 +257,16 @@ message MetaIncrementTable {
MetaIncrementOpType op_type = 3;
}

// @Deprecated
// message MetaIncrementIndex {
// int64 id = 1;
// IndexInternal index = 2;
// MetaIncrementOpType op_type = 3;
// }

// MetaIncrementTableIndex
// CAUTION: if op_type is UDPATE, there will only whole update using the table_indexes=3 or partial update using
// table_ids_to_add=4 or table_ids_to_del=5
// the UPDATE cannot do both whole update and partial update at the same time
message MetaIncrementTableIndex {
int64 id = 1;
TableIndexInternal table_indexes = 2;
MetaIncrementOpType op_type = 3;
repeated dingodb.pb.meta.DingoCommonId table_ids_to_add = 4;
repeated dingodb.pb.meta.DingoCommonId table_ids_to_del = 5;
}

message MetaIncrementCommonDisk {
Expand Down
4 changes: 2 additions & 2 deletions src/client/coordinator_client_function_meta.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1511,12 +1511,12 @@ void SendAddIndexOnTable(std::shared_ptr<dingodb::CoordinatorInteraction> coordi
}

if (FLAGS_index_id == 0) {
DINGO_LOG(WARNING) << "index_id is empty";
DINGO_LOG(WARNING) << "index_id is empty, please use CreateTableIds to get one id";
return;
}

if (FLAGS_def_version == 0) {
DINGO_LOG(WARNING) << "version is empty";
DINGO_LOG(WARNING) << "def_version is empty, must > 0";
return;
}

Expand Down
181 changes: 108 additions & 73 deletions src/coordinator/coordinator_control_fsm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1693,6 +1693,114 @@ void CoordinatorControl::ApplyMetaIncrement(pb::coordinator_internal::MetaIncrem
}
}

// 50.table_index map
// table_index_map_ must updated before table_map_, schema_map_ and index_map_
// GetTableIndexes/DropTableIndexes will first get from table_map_, and then get from table_index_map
// and the schema_map_ is updated in the table_map_ update, so it's available after table_map_, and table_map_ is
// available after table_index_map_
{
if (meta_increment.table_indexes_size() > 0) {
DINGO_LOG(INFO) << "ApplyMetaIncrement table_indexes size=" << meta_increment.table_indexes_size();
}

for (int i = 0; i < meta_increment.table_indexes_size(); i++) {
auto* table_index = meta_increment.mutable_table_indexes(i);
table_index->mutable_table_indexes()->set_revision(meta_revision);

if (table_index->op_type() == pb::coordinator_internal::MetaIncrementOpType::CREATE) {
auto ret = table_index_meta_->Put(table_index->id(), table_index->table_indexes());
if (!ret.ok()) {
DINGO_LOG(FATAL) << "ApplyMetaIncrement table_index CREATE, [id=" << table_index->id()
<< "] failed, errcode: " << ret.error_code() << ", errmsg: " << ret.error_str();
} else {
DINGO_LOG(INFO) << "ApplyMetaIncrement table_index CREATE, [id=" << table_index->id() << "] success";
}

// add event_list
pb::meta::MetaEvent event;
event.set_event_type(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_CREATE);
event.mutable_table_index()->set_id(table_index->id());
for (const auto& common_id : table_index->table_indexes().table_ids()) {
*event.mutable_table_index()->add_table_ids() = common_id;
}
event.mutable_table_index()->set_revision(meta_revision);
event_list->push_back(event);
watch_bitset.set(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_CREATE);

} else if (table_index->op_type() == pb::coordinator_internal::MetaIncrementOpType::UPDATE) {
pb::coordinator_internal::TableIndexInternal table_index_internal;
int ret1 = table_index_map_.Get(table_index->id(), table_index_internal);
if (ret1 < 0) {
DINGO_LOG(ERROR) << "ApplyMetaIncrement table_index UPDATE, [id=" << table_index->id()
<< "] failed, not found";
continue;
}

if (table_index->table_ids_to_add_size() > 0 || table_index->table_ids_to_del_size() > 0) {
std::map<int64_t, pb::meta::DingoCommonId> table_ids_old;
for (const auto& table_id : table_index_internal.table_ids()) {
table_ids_old.insert(std::make_pair(table_id.entity_id(), table_id));
}

if (table_index->table_ids_to_add_size() > 0) {
for (const auto& table_id : table_index->table_ids_to_add()) {
table_ids_old.insert(std::make_pair(table_id.entity_id(), table_id));
}
for (const auto& table_id : table_index->table_ids_to_del()) {
table_ids_old.erase(table_id.entity_id());
}
}

table_index_internal.clear_table_ids();
for (const auto& [id, table_id] : table_ids_old) {
*table_index_internal.add_table_ids() = table_id;
}
} else {
table_index_internal = table_index->table_indexes();
}

auto ret = table_index_meta_->Put(table_index->id(), table_index_internal);
if (!ret.ok()) {
DINGO_LOG(FATAL) << "ApplyMetaIncrement table_index UPDATE, [id=" << table_index->id()
<< "] failed, errcode: " << ret.error_code() << ", errmsg: " << ret.error_str();
} else {
DINGO_LOG(INFO) << "ApplyMetaIncrement table_index UPDATE, [id=" << table_index->id() << "] success";
}

// add event_list
pb::meta::MetaEvent event;
event.set_event_type(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_UPDATE);
event.mutable_table_index()->set_id(table_index->id());
for (const auto& common_id : table_index_internal.table_ids()) {
*event.mutable_table_index()->add_table_ids() = common_id;
}
event.mutable_table_index()->set_revision(meta_revision);
event_list->push_back(event);
watch_bitset.set(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_UPDATE);

} else if (table_index->op_type() == pb::coordinator_internal::MetaIncrementOpType::DELETE) {
auto ret = table_index_meta_->Erase(table_index->id());
if (!ret.ok()) {
DINGO_LOG(FATAL) << "ApplyMetaIncrement table_index DELETE, [id=" << table_index->id()
<< "] failed, errcode: " << ret.error_code() << ", errmsg: " << ret.error_str();
} else {
DINGO_LOG(INFO) << "ApplyMetaIncrement table_index DELETE, [id=" << table_index->id() << "] success";
}

// add event_list
pb::meta::MetaEvent event;
event.set_event_type(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_DELETE);
event.mutable_table_index()->set_id(table_index->id());
for (const auto& common_id : table_index->table_indexes().table_ids()) {
*event.mutable_table_index()->add_table_ids() = common_id;
}
event.mutable_table_index()->set_revision(meta_revision);
event_list->push_back(event);
watch_bitset.set(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_DELETE);
}
}
}

// 6.table map
{
if (meta_increment.tables_size() > 0) {
Expand Down Expand Up @@ -2373,79 +2481,6 @@ void CoordinatorControl::ApplyMetaIncrement(pb::coordinator_internal::MetaIncrem
// 13.index_metrics map
// index_metrics_map is a temp map, only used in memory, so we don't need to update it in raft apply

// 50.table_index map
{
if (meta_increment.table_indexes_size() > 0) {
DINGO_LOG(INFO) << "ApplyMetaIncrement table_indexes size=" << meta_increment.table_indexes_size();
}

for (int i = 0; i < meta_increment.table_indexes_size(); i++) {
auto* table_index = meta_increment.mutable_table_indexes(i);
table_index->mutable_table_indexes()->set_revision(meta_revision);

if (table_index->op_type() == pb::coordinator_internal::MetaIncrementOpType::CREATE) {
auto ret = table_index_meta_->Put(table_index->id(), table_index->table_indexes());
if (!ret.ok()) {
DINGO_LOG(FATAL) << "ApplyMetaIncrement table_index CREATE, [id=" << table_index->id()
<< "] failed, errcode: " << ret.error_code() << ", errmsg: " << ret.error_str();
} else {
DINGO_LOG(INFO) << "ApplyMetaIncrement table_index CREATE, [id=" << table_index->id() << "] success";
}

// add event_list
pb::meta::MetaEvent event;
event.set_event_type(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_CREATE);
event.mutable_table_index()->set_id(table_index->id());
for (const auto& common_id : table_index->table_indexes().table_ids()) {
*event.mutable_table_index()->add_table_ids() = common_id;
}
event.mutable_table_index()->set_revision(meta_revision);
event_list->push_back(event);
watch_bitset.set(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_CREATE);

} else if (table_index->op_type() == pb::coordinator_internal::MetaIncrementOpType::UPDATE) {
auto ret = table_index_meta_->Put(table_index->id(), table_index->table_indexes());
if (!ret.ok()) {
DINGO_LOG(FATAL) << "ApplyMetaIncrement table_index UPDATE, [id=" << table_index->id()
<< "] failed, errcode: " << ret.error_code() << ", errmsg: " << ret.error_str();
} else {
DINGO_LOG(INFO) << "ApplyMetaIncrement table_index UPDATE, [id=" << table_index->id() << "] success";
}

// add event_list
pb::meta::MetaEvent event;
event.set_event_type(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_UPDATE);
event.mutable_table_index()->set_id(table_index->id());
for (const auto& common_id : table_index->table_indexes().table_ids()) {
*event.mutable_table_index()->add_table_ids() = common_id;
}
event.mutable_table_index()->set_revision(meta_revision);
event_list->push_back(event);
watch_bitset.set(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_UPDATE);

} else if (table_index->op_type() == pb::coordinator_internal::MetaIncrementOpType::DELETE) {
auto ret = table_index_meta_->Erase(table_index->id());
if (!ret.ok()) {
DINGO_LOG(FATAL) << "ApplyMetaIncrement table_index DELETE, [id=" << table_index->id()
<< "] failed, errcode: " << ret.error_code() << ", errmsg: " << ret.error_str();
} else {
DINGO_LOG(INFO) << "ApplyMetaIncrement table_index DELETE, [id=" << table_index->id() << "] success";
}

// add event_list
pb::meta::MetaEvent event;
event.set_event_type(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_DELETE);
event.mutable_table_index()->set_id(table_index->id());
for (const auto& common_id : table_index->table_indexes().table_ids()) {
*event.mutable_table_index()->add_table_ids() = common_id;
}
event.mutable_table_index()->set_revision(meta_revision);
event_list->push_back(event);
watch_bitset.set(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_DELETE);
}
}
}

// 51.common_disk_map
{
if (meta_increment.common_disk_s_size() > 0) {
Expand Down
Loading

0 comments on commit 0265380

Please sign in to comment.