diff --git a/proto/coordinator_internal.proto b/proto/coordinator_internal.proto index 27a8b7f47..e7301763e 100644 --- a/proto/coordinator_internal.proto +++ b/proto/coordinator_internal.proto @@ -257,17 +257,16 @@ message MetaIncrementTable { MetaIncrementOpType op_type = 3; } -// @Deprecated -// message MetaIncrementIndex { -// int64 id = 1; -// IndexInternal index = 2; -// MetaIncrementOpType op_type = 3; -// } - +// MetaIncrementTableIndex +// CAUTION: if op_type is UDPATE, there will only whole update using the table_indexes=3 or partial update using +// table_ids_to_add=4 or table_ids_to_del=5 +// the UPDATE cannot do both whole update and partial update at the same time message MetaIncrementTableIndex { int64 id = 1; TableIndexInternal table_indexes = 2; MetaIncrementOpType op_type = 3; + repeated dingodb.pb.meta.DingoCommonId table_ids_to_add = 4; + repeated dingodb.pb.meta.DingoCommonId table_ids_to_del = 5; } message MetaIncrementCommonDisk { diff --git a/src/client/coordinator_client_function_meta.cc b/src/client/coordinator_client_function_meta.cc index 81cad5913..ba89ffd31 100644 --- a/src/client/coordinator_client_function_meta.cc +++ b/src/client/coordinator_client_function_meta.cc @@ -1511,12 +1511,12 @@ void SendAddIndexOnTable(std::shared_ptr coordi } if (FLAGS_index_id == 0) { - DINGO_LOG(WARNING) << "index_id is empty"; + DINGO_LOG(WARNING) << "index_id is empty, please use CreateTableIds to get one id"; return; } if (FLAGS_def_version == 0) { - DINGO_LOG(WARNING) << "version is empty"; + DINGO_LOG(WARNING) << "def_version is empty, must > 0"; return; } diff --git a/src/coordinator/coordinator_control_fsm.cc b/src/coordinator/coordinator_control_fsm.cc index 01b213ae4..07c667ace 100644 --- a/src/coordinator/coordinator_control_fsm.cc +++ b/src/coordinator/coordinator_control_fsm.cc @@ -1693,6 +1693,114 @@ void CoordinatorControl::ApplyMetaIncrement(pb::coordinator_internal::MetaIncrem } } + // 50.table_index map + // table_index_map_ must updated before table_map_, schema_map_ and index_map_ + // GetTableIndexes/DropTableIndexes will first get from table_map_, and then get from table_index_map + // and the schema_map_ is updated in the table_map_ update, so it's available after table_map_, and table_map_ is + // available after table_index_map_ + { + if (meta_increment.table_indexes_size() > 0) { + DINGO_LOG(INFO) << "ApplyMetaIncrement table_indexes size=" << meta_increment.table_indexes_size(); + } + + for (int i = 0; i < meta_increment.table_indexes_size(); i++) { + auto* table_index = meta_increment.mutable_table_indexes(i); + table_index->mutable_table_indexes()->set_revision(meta_revision); + + if (table_index->op_type() == pb::coordinator_internal::MetaIncrementOpType::CREATE) { + auto ret = table_index_meta_->Put(table_index->id(), table_index->table_indexes()); + if (!ret.ok()) { + DINGO_LOG(FATAL) << "ApplyMetaIncrement table_index CREATE, [id=" << table_index->id() + << "] failed, errcode: " << ret.error_code() << ", errmsg: " << ret.error_str(); + } else { + DINGO_LOG(INFO) << "ApplyMetaIncrement table_index CREATE, [id=" << table_index->id() << "] success"; + } + + // add event_list + pb::meta::MetaEvent event; + event.set_event_type(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_CREATE); + event.mutable_table_index()->set_id(table_index->id()); + for (const auto& common_id : table_index->table_indexes().table_ids()) { + *event.mutable_table_index()->add_table_ids() = common_id; + } + event.mutable_table_index()->set_revision(meta_revision); + event_list->push_back(event); + watch_bitset.set(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_CREATE); + + } else if (table_index->op_type() == pb::coordinator_internal::MetaIncrementOpType::UPDATE) { + pb::coordinator_internal::TableIndexInternal table_index_internal; + int ret1 = table_index_map_.Get(table_index->id(), table_index_internal); + if (ret1 < 0) { + DINGO_LOG(ERROR) << "ApplyMetaIncrement table_index UPDATE, [id=" << table_index->id() + << "] failed, not found"; + continue; + } + + if (table_index->table_ids_to_add_size() > 0 || table_index->table_ids_to_del_size() > 0) { + std::map table_ids_old; + for (const auto& table_id : table_index_internal.table_ids()) { + table_ids_old.insert(std::make_pair(table_id.entity_id(), table_id)); + } + + if (table_index->table_ids_to_add_size() > 0) { + for (const auto& table_id : table_index->table_ids_to_add()) { + table_ids_old.insert(std::make_pair(table_id.entity_id(), table_id)); + } + for (const auto& table_id : table_index->table_ids_to_del()) { + table_ids_old.erase(table_id.entity_id()); + } + } + + table_index_internal.clear_table_ids(); + for (const auto& [id, table_id] : table_ids_old) { + *table_index_internal.add_table_ids() = table_id; + } + } else { + table_index_internal = table_index->table_indexes(); + } + + auto ret = table_index_meta_->Put(table_index->id(), table_index_internal); + if (!ret.ok()) { + DINGO_LOG(FATAL) << "ApplyMetaIncrement table_index UPDATE, [id=" << table_index->id() + << "] failed, errcode: " << ret.error_code() << ", errmsg: " << ret.error_str(); + } else { + DINGO_LOG(INFO) << "ApplyMetaIncrement table_index UPDATE, [id=" << table_index->id() << "] success"; + } + + // add event_list + pb::meta::MetaEvent event; + event.set_event_type(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_UPDATE); + event.mutable_table_index()->set_id(table_index->id()); + for (const auto& common_id : table_index_internal.table_ids()) { + *event.mutable_table_index()->add_table_ids() = common_id; + } + event.mutable_table_index()->set_revision(meta_revision); + event_list->push_back(event); + watch_bitset.set(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_UPDATE); + + } else if (table_index->op_type() == pb::coordinator_internal::MetaIncrementOpType::DELETE) { + auto ret = table_index_meta_->Erase(table_index->id()); + if (!ret.ok()) { + DINGO_LOG(FATAL) << "ApplyMetaIncrement table_index DELETE, [id=" << table_index->id() + << "] failed, errcode: " << ret.error_code() << ", errmsg: " << ret.error_str(); + } else { + DINGO_LOG(INFO) << "ApplyMetaIncrement table_index DELETE, [id=" << table_index->id() << "] success"; + } + + // add event_list + pb::meta::MetaEvent event; + event.set_event_type(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_DELETE); + event.mutable_table_index()->set_id(table_index->id()); + for (const auto& common_id : table_index->table_indexes().table_ids()) { + *event.mutable_table_index()->add_table_ids() = common_id; + } + event.mutable_table_index()->set_revision(meta_revision); + event_list->push_back(event); + watch_bitset.set(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_DELETE); + } + } + } + // 6.table map { if (meta_increment.tables_size() > 0) { @@ -2373,79 +2481,6 @@ void CoordinatorControl::ApplyMetaIncrement(pb::coordinator_internal::MetaIncrem // 13.index_metrics map // index_metrics_map is a temp map, only used in memory, so we don't need to update it in raft apply - // 50.table_index map - { - if (meta_increment.table_indexes_size() > 0) { - DINGO_LOG(INFO) << "ApplyMetaIncrement table_indexes size=" << meta_increment.table_indexes_size(); - } - - for (int i = 0; i < meta_increment.table_indexes_size(); i++) { - auto* table_index = meta_increment.mutable_table_indexes(i); - table_index->mutable_table_indexes()->set_revision(meta_revision); - - if (table_index->op_type() == pb::coordinator_internal::MetaIncrementOpType::CREATE) { - auto ret = table_index_meta_->Put(table_index->id(), table_index->table_indexes()); - if (!ret.ok()) { - DINGO_LOG(FATAL) << "ApplyMetaIncrement table_index CREATE, [id=" << table_index->id() - << "] failed, errcode: " << ret.error_code() << ", errmsg: " << ret.error_str(); - } else { - DINGO_LOG(INFO) << "ApplyMetaIncrement table_index CREATE, [id=" << table_index->id() << "] success"; - } - - // add event_list - pb::meta::MetaEvent event; - event.set_event_type(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_CREATE); - event.mutable_table_index()->set_id(table_index->id()); - for (const auto& common_id : table_index->table_indexes().table_ids()) { - *event.mutable_table_index()->add_table_ids() = common_id; - } - event.mutable_table_index()->set_revision(meta_revision); - event_list->push_back(event); - watch_bitset.set(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_CREATE); - - } else if (table_index->op_type() == pb::coordinator_internal::MetaIncrementOpType::UPDATE) { - auto ret = table_index_meta_->Put(table_index->id(), table_index->table_indexes()); - if (!ret.ok()) { - DINGO_LOG(FATAL) << "ApplyMetaIncrement table_index UPDATE, [id=" << table_index->id() - << "] failed, errcode: " << ret.error_code() << ", errmsg: " << ret.error_str(); - } else { - DINGO_LOG(INFO) << "ApplyMetaIncrement table_index UPDATE, [id=" << table_index->id() << "] success"; - } - - // add event_list - pb::meta::MetaEvent event; - event.set_event_type(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_UPDATE); - event.mutable_table_index()->set_id(table_index->id()); - for (const auto& common_id : table_index->table_indexes().table_ids()) { - *event.mutable_table_index()->add_table_ids() = common_id; - } - event.mutable_table_index()->set_revision(meta_revision); - event_list->push_back(event); - watch_bitset.set(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_UPDATE); - - } else if (table_index->op_type() == pb::coordinator_internal::MetaIncrementOpType::DELETE) { - auto ret = table_index_meta_->Erase(table_index->id()); - if (!ret.ok()) { - DINGO_LOG(FATAL) << "ApplyMetaIncrement table_index DELETE, [id=" << table_index->id() - << "] failed, errcode: " << ret.error_code() << ", errmsg: " << ret.error_str(); - } else { - DINGO_LOG(INFO) << "ApplyMetaIncrement table_index DELETE, [id=" << table_index->id() << "] success"; - } - - // add event_list - pb::meta::MetaEvent event; - event.set_event_type(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_DELETE); - event.mutable_table_index()->set_id(table_index->id()); - for (const auto& common_id : table_index->table_indexes().table_ids()) { - *event.mutable_table_index()->add_table_ids() = common_id; - } - event.mutable_table_index()->set_revision(meta_revision); - event_list->push_back(event); - watch_bitset.set(pb::meta::MetaEventType::META_EVENT_TABLE_INDEX_DELETE); - } - } - } - // 51.common_disk_map { if (meta_increment.common_disk_s_size() > 0) { diff --git a/src/coordinator/coordinator_control_meta.cc b/src/coordinator/coordinator_control_meta.cc index 8b6965f05..92e61896b 100644 --- a/src/coordinator/coordinator_control_meta.cc +++ b/src/coordinator/coordinator_control_meta.cc @@ -1512,7 +1512,6 @@ butil::Status CoordinatorControl::GetTables(int64_t schema_id, } { - // BAIDU_SCOPED_LOCK(schema_map_mutex_); pb::coordinator_internal::SchemaInternal schema_internal; int ret = schema_map_.Get(schema_id, schema_internal); if (ret < 0) { @@ -2578,31 +2577,44 @@ void CoordinatorControl::CreateTableIndexesMap(pb::coordinator_internal::TableIn butil::Status CoordinatorControl::GetTableIndexes(int64_t schema_id, int64_t table_id, pb::meta::GetTablesResponse* response) { - pb::meta::TableDefinitionWithId definition_with_id; - butil::Status ret = GetTable(schema_id, table_id, definition_with_id); + pb::meta::TableDefinitionWithId main_definition_with_id; + butil::Status ret = GetTable(schema_id, table_id, main_definition_with_id); if (!ret.ok()) { + DINGO_LOG(ERROR) << "error while get table_indexes GetTable failed, schema_id: " << schema_id + << ", table_id: " << table_id << ", error_code: " << ret.error_code() + << ", error_msg: " << ret.error_str(); return ret; } - *(response->add_table_definition_with_ids()) = definition_with_id; - pb::coordinator_internal::TableIndexInternal table_index_internal; int result = table_index_map_.Get(table_id, table_index_internal); if (result >= 0) { - // found - for (const auto& temp_id : table_index_internal.table_ids()) { - pb::meta::TableDefinitionWithId temp_definition_with_id; - ret = GetIndex(schema_id, temp_id.entity_id(), false, temp_definition_with_id); + // found table_index, first add main table definition to response + *(response->add_table_definition_with_ids()) = main_definition_with_id; + + // get all index's definition and add to response + for (const auto& index_id : table_index_internal.table_ids()) { + pb::meta::TableDefinitionWithId index_definition_with_id; + ret = GetIndex(schema_id, index_id.entity_id(), false, index_definition_with_id); if (!ret.ok()) { + DINGO_LOG(ERROR) << "error while get table_indexes GetIndex failed, schema_id: " << schema_id + << ", index_id: " << index_id.entity_id() << ", error_code: " << ret.error_code() + << ", error_msg: " << ret.error_str(); return ret; } - *(response->add_table_definition_with_ids()) = temp_definition_with_id; + *(response->add_table_definition_with_ids()) = index_definition_with_id; } + + // set revision response->set_revision(table_index_internal.revision()); + } else { - // not found - DINGO_LOG(INFO) << "cannot find indexes, schema_id: " << schema_id << ", table_id: " << table_id; + // not found, return error + std::string s = + "cannot find indexes, schema_id: " + std::to_string(schema_id) + ", table_id: " + std::to_string(table_id); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::Errno::EILLEGAL_PARAMTETERS, s); } return butil::Status::OK(); @@ -2610,14 +2622,17 @@ butil::Status CoordinatorControl::GetTableIndexes(int64_t schema_id, int64_t tab butil::Status CoordinatorControl::DropTableIndexes(int64_t schema_id, int64_t table_id, pb::coordinator_internal::MetaIncrement& meta_increment) { - if (!ValidateSchema(schema_id)) { - DINGO_LOG(ERROR) << "ERRROR: schema_id not valid" << schema_id; - return butil::Status(pb::error::Errno::EILLEGAL_PARAMTETERS, "schema_id not valid"); + pb::meta::TableDefinitionWithId definition_with_id; + butil::Status ret = GetTable(schema_id, table_id, definition_with_id); + if (!ret.ok()) { + DINGO_LOG(ERROR) << "error while dropping index GetTable failed, schema_id: " << schema_id + << ", table_id: " << table_id << ", error_code: " << ret.error_code() + << ", error_msg: " << ret.error_str(); + return ret; } // drop indexes of the table pb::coordinator_internal::TableIndexInternal table_index_internal; - butil::Status ret; int result = table_index_map_.Get(table_id, table_index_internal); if (result >= 0) { // find in map @@ -2633,9 +2648,13 @@ butil::Status CoordinatorControl::DropTableIndexes(int64_t schema_id, int64_t ta auto* table_index_increment = meta_increment.add_table_indexes(); table_index_increment->set_id(table_id); table_index_increment->set_op_type(pb::coordinator_internal::MetaIncrementOpType::DELETE); + *table_index_increment->mutable_table_indexes() = table_index_internal; } else { // not find in map - DINGO_LOG(INFO) << "cannot find indexes, schema_id: " << schema_id << ", table_id: " << table_id; + std::string s = "cannot find in table_index_map_, schema_id: " + std::to_string(schema_id) + + ", table_id: " + std::to_string(table_id); + DINGO_LOG(ERROR) << s; + return butil::Status(pb::error::Errno::EILLEGAL_PARAMTETERS, s); } // drop table finally @@ -2650,7 +2669,7 @@ butil::Status CoordinatorControl::DropTableIndexes(int64_t schema_id, int64_t ta butil::Status CoordinatorControl::RemoveTableIndex(int64_t table_id, int64_t index_id, pb::coordinator_internal::MetaIncrement& meta_increment) { - butil::Status ret = DropIndex(table_id, index_id, false, meta_increment); + auto ret = DropIndex(table_id, index_id, false, meta_increment); if (!ret.ok()) { DINGO_LOG(ERROR) << "error while dropping index, table_id: " << table_id << ", index_id: " << index_id; return ret; @@ -2668,19 +2687,18 @@ butil::Status CoordinatorControl::RemoveTableIndex(int64_t table_id, int64_t ind for (size_t i = 0; i < table_index_internal.table_ids_size(); i++) { if (table_index_internal.table_ids(i).entity_id() == index_id) { found_index = true; - table_index_internal.mutable_table_ids()->DeleteSubrange(i, 1); break; } } if (found_index) { DINGO_LOG(INFO) << "remove success, table_id: " << table_id << ", index_id: " << index_id - << ", size: " << source_size << " --> " << table_index_internal.table_ids_size(); + << ", origin_ids_size: " << source_size; auto* table_index_increment = meta_increment.add_table_indexes(); table_index_increment->set_id(table_id); - *(table_index_increment->mutable_table_indexes()) = table_index_internal; table_index_increment->set_op_type(pb::coordinator_internal::MetaIncrementOpType::UPDATE); + table_index_increment->add_table_ids_to_del()->set_entity_id(index_id); } else { DINGO_LOG(WARNING) << "cannot find index, table_id: " << table_id << ", index_id: " << index_id; } @@ -3056,14 +3074,15 @@ butil::Status CoordinatorControl::AddIndexOnTable(int64_t table_id, int64_t inde } // add index to table_index_map_ - auto* new_index_id = table_index_internal.add_table_ids(); - new_index_id->set_entity_id(index_id); - new_index_id->set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_INDEX); - new_index_id->set_parent_entity_id(table_internal.schema_id()); + pb::meta::DingoCommonId new_index_id; + new_index_id.set_entity_id(index_id); + new_index_id.set_entity_type(::dingodb::pb::meta::EntityType::ENTITY_TYPE_INDEX); + new_index_id.set_parent_entity_id(table_internal.schema_id()); + auto* table_index_increment = meta_increment.add_table_indexes(); table_index_increment->set_id(table_id); table_index_increment->set_op_type(pb::coordinator_internal::MetaIncrementOpType::UPDATE); - *(table_index_increment->mutable_table_indexes()) = table_index_internal; + *table_index_increment->add_table_ids_to_add() = new_index_id; return butil::Status::OK(); } @@ -3112,19 +3131,18 @@ butil::Status CoordinatorControl::DropIndexOnTable(int64_t table_id, int64_t ind } // del index from table_index_map_ - pb::coordinator_internal::TableIndexInternal table_index_internal_new; - table_index_internal_new.set_id(table_index_internal.id()); + pb::meta::DingoCommonId del_index_id; for (const auto& id : table_index_internal.table_ids()) { if (id.entity_id() != index_id) { - auto* new_index_id = table_index_internal_new.add_table_ids(); - *new_index_id = id; + del_index_id = id; + break; } } auto* table_index_increment = meta_increment.add_table_indexes(); table_index_increment->set_id(table_id); table_index_increment->set_op_type(pb::coordinator_internal::MetaIncrementOpType::UPDATE); - *(table_index_increment->mutable_table_indexes()) = table_index_internal_new; + *table_index_increment->add_table_ids_to_del() = del_index_id; return butil::Status::OK(); }