Skip to content

Commit

Permalink
[feat][coordinator] Implement document region_type.
Browse files Browse the repository at this point in the history
Signed-off-by: Ketor <[email protected]>
  • Loading branch information
ketor authored and rock-git committed May 20, 2024
1 parent 743957c commit f533e5d
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 141 deletions.
1 change: 1 addition & 0 deletions src/client/store_client_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ void SendTxnDump(int64_t region_id);

void StoreSendTxnPrewrite(int64_t region_id, const dingodb::pb::common::Region& region);
void IndexSendTxnPrewrite(int64_t region_id, const dingodb::pb::common::Region& region);
void DocumentSendTxnPrewrite(int64_t region_id, const dingodb::pb::common::Region& region);

// region
void SendAddRegion(int64_t region_id, const std::string& raft_group, std::vector<std::string> raft_addrs);
Expand Down
159 changes: 159 additions & 0 deletions src/client/store_client_function_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ std::string GetServiceName(const dingodb::pb::common::Region& region) {
} else if (region.region_type() == dingodb::pb::common::INDEX_REGION &&
region.definition().index_parameter().has_vector_index_parameter()) {
service_name = "IndexService";
} else if (region.region_type() == dingodb::pb::common::DOCUMENT_REGION &&
region.definition().index_parameter().has_document_index_parameter()) {
service_name = "DocumentService";
} else {
DINGO_LOG(ERROR) << "region_type is invalid";
exit(-1);
Expand Down Expand Up @@ -502,6 +505,156 @@ void IndexSendTxnPrewrite(int64_t region_id, const dingodb::pb::common::Region&
DINGO_LOG(INFO) << "Response: " << response.DebugString();
}

// document
void DocumentSendTxnPrewrite(int64_t region_id, const dingodb::pb::common::Region& region) {
dingodb::pb::store::TxnPrewriteRequest request;
dingodb::pb::store::TxnPrewriteResponse response;

request.mutable_context()->set_region_id(region_id);
*request.mutable_context()->mutable_region_epoch() = region.definition().epoch();
if (FLAGS_rc) {
request.mutable_context()->set_isolation_level(dingodb::pb::store::IsolationLevel::ReadCommitted);
} else {
request.mutable_context()->set_isolation_level(dingodb::pb::store::IsolationLevel::SnapshotIsolation);
}

if (FLAGS_primary_lock.empty()) {
DINGO_LOG(ERROR) << "primary_lock is empty";
return;
}
request.set_primary_lock(FLAGS_primary_lock);

if (FLAGS_start_ts == 0) {
DINGO_LOG(ERROR) << "start_ts is empty";
return;
}
request.set_start_ts(FLAGS_start_ts);

if (FLAGS_lock_ttl == 0) {
DINGO_LOG(ERROR) << "lock_ttl is empty";
return;
}
request.set_lock_ttl(FLAGS_lock_ttl);

if (FLAGS_txn_size == 0) {
DINGO_LOG(ERROR) << "txn_size is empty";
return;
}
request.set_txn_size(FLAGS_txn_size);

request.set_try_one_pc(FLAGS_try_one_pc);
request.set_max_commit_ts(FLAGS_max_commit_ts);

if (FLAGS_mutation_op.empty()) {
DINGO_LOG(ERROR) << "mutation_op is empty, mutation MUST be one of [put, delete, insert]";
return;
}

if (FLAGS_vector_id == 0) {
DINGO_LOG(ERROR) << "vector_id is empty";
return;
}

int64_t part_id = region.definition().part_id();
int64_t vector_id = FLAGS_vector_id;
int64_t dimension = 0;

const auto& para = region.definition().index_parameter().vector_index_parameter();
if (para.vector_index_type() == dingodb::pb::common::VectorIndexType::VECTOR_INDEX_TYPE_FLAT) {
dimension = para.flat_parameter().dimension();
} else if (para.vector_index_type() == dingodb::pb::common::VectorIndexType::VECTOR_INDEX_TYPE_IVF_FLAT) {
dimension = para.ivf_flat_parameter().dimension();
} else if (para.vector_index_type() == dingodb::pb::common::VectorIndexType::VECTOR_INDEX_TYPE_IVF_PQ) {
dimension = para.ivf_pq_parameter().dimension();
} else if (para.vector_index_type() == dingodb::pb::common::VectorIndexType::VECTOR_INDEX_TYPE_HNSW) {
dimension = para.hnsw_parameter().dimension();
} else if (para.vector_index_type() == dingodb::pb::common::VectorIndexType::VECTOR_INDEX_TYPE_DISKANN) {
dimension = para.diskann_parameter().dimension();
} else {
DINGO_LOG(ERROR) << "vector_index_type is empty";
return;
}

if (FLAGS_mutation_op == "put") {
auto* mutation = request.add_mutations();
mutation->set_op(::dingodb::pb::store::Op::Put);

mutation->set_key(
dingodb::Helper::EncodeVectorIndexRegionHeader(region.definition().range().start_key()[0], part_id, vector_id));

dingodb::pb::common::VectorWithId vector_with_id;

std::mt19937 rng;
std::uniform_real_distribution<> distrib(0.0, 1.0);

vector_with_id.set_id(vector_id);
vector_with_id.mutable_vector()->set_dimension(dimension);
vector_with_id.mutable_vector()->set_value_type(::dingodb::pb::common::ValueType::FLOAT);
for (int j = 0; j < dimension; j++) {
vector_with_id.mutable_vector()->add_float_values(distrib(rng));
}
*mutation->mutable_vector() = vector_with_id;
} else if (FLAGS_mutation_op == "delete") {
auto* mutation = request.add_mutations();
mutation->set_op(::dingodb::pb::store::Op::Delete);
mutation->set_key(
dingodb::Helper::EncodeVectorIndexRegionHeader(region.definition().range().start_key()[0], part_id, vector_id));
} else if (FLAGS_mutation_op == "check_not_exists") {
auto* mutation = request.add_mutations();
mutation->set_op(::dingodb::pb::store::Op::CheckNotExists);
mutation->set_key(
dingodb::Helper::EncodeVectorIndexRegionHeader(region.definition().range().start_key()[0], part_id, vector_id));
} else if (FLAGS_mutation_op == "insert") {
auto* mutation = request.add_mutations();
mutation->set_op(::dingodb::pb::store::Op::PutIfAbsent);
mutation->set_key(
dingodb::Helper::EncodeVectorIndexRegionHeader(region.definition().range().start_key()[0], part_id, vector_id));

dingodb::pb::common::VectorWithId vector_with_id;

std::mt19937 rng;
std::uniform_real_distribution<> distrib(0.0, 1.0);

vector_with_id.set_id(vector_id);
vector_with_id.mutable_vector()->set_dimension(dimension);
vector_with_id.mutable_vector()->set_value_type(::dingodb::pb::common::ValueType::FLOAT);
for (int j = 0; j < dimension; j++) {
vector_with_id.mutable_vector()->add_float_values(distrib(rng));
}
*mutation->mutable_vector() = vector_with_id;
} else {
DINGO_LOG(ERROR) << "mutation_op MUST be one of [put, delete, insert]";
return;
}

if (!FLAGS_extra_data.empty()) {
DINGO_LOG(INFO) << "extra_data is: " << FLAGS_extra_data;
}

if (FLAGS_for_update_ts > 0) {
DINGO_LOG(INFO) << "for_update_ts > 0, do pessimistic check : " << FLAGS_for_update_ts;
for (int i = 0; i < request.mutations_size(); i++) {
request.add_pessimistic_checks(::dingodb::pb::store::TxnPrewriteRequest_PessimisticCheck::
TxnPrewriteRequest_PessimisticCheck_DO_PESSIMISTIC_CHECK);
auto* for_update_ts_check = request.add_for_update_ts_checks();
for_update_ts_check->set_expected_for_update_ts(FLAGS_for_update_ts);
for_update_ts_check->set_index(i);

if (!FLAGS_extra_data.empty()) {
auto* extra_data = request.add_lock_extra_datas();
extra_data->set_index(i);
extra_data->set_extra_data(FLAGS_extra_data);
}
}
}

DINGO_LOG(INFO) << "Request: " << request.DebugString();

InteractionManager::GetInstance().SendRequestWithContext("IndexService", "TxnPrewrite", request, response);

DINGO_LOG(INFO) << "Response: " << response.DebugString();
}

// unified

void SendTxnGet(int64_t region_id) {
Expand Down Expand Up @@ -725,6 +878,9 @@ void SendTxnPessimisticRollback(int64_t region_id) {
} else if (region.region_type() == dingodb::pb::common::INDEX_REGION &&
region.definition().index_parameter().has_vector_index_parameter()) {
service_name = "IndexService";
} else if (region.region_type() == dingodb::pb::common::DOCUMENT_REGION &&
region.definition().index_parameter().has_document_index_parameter()) {
service_name = "DocumentService";
} else {
DINGO_LOG(ERROR) << "region_type is invalid";
return;
Expand Down Expand Up @@ -782,6 +938,9 @@ void SendTxnPrewrite(int64_t region_id) {
} else if (region.region_type() == dingodb::pb::common::INDEX_REGION &&
region.definition().index_parameter().has_vector_index_parameter()) {
IndexSendTxnPrewrite(region_id, region);
} else if (region.region_type() == dingodb::pb::common::DOCUMENT_REGION &&
region.definition().index_parameter().has_document_index_parameter()) {
DocumentSendTxnPrewrite(region_id, region);
} else {
DINGO_LOG(ERROR) << "region_type is invalid";
}
Expand Down
80 changes: 70 additions & 10 deletions src/coordinator/coordinator_control_coor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1882,6 +1882,33 @@ butil::Status CoordinatorControl::CreateShadowRegion(
}
}

if (region_type == pb::common::RegionType::DOCUMENT_REGION && new_index_parameter.has_document_index_parameter()) {
store_type = pb::common::StoreType::NODE_TYPE_DOCUMENT;

// validate document index region range
// range's start_key and end_key must be less than 16 bytes
if (region_range.start_key().size() != Constant::kDocumentKeyMinLenWithPrefix &&
region_range.start_key().size() != Constant::kDocumentKeyMaxLenWithPrefix) {
DINGO_LOG(ERROR) << "CreateRegion document index region range start_key size is not 8 or 16, start_key="
<< Helper::StringToHex(region_range.start_key());
return butil::Status(pb::error::Errno::EREGION_UNAVAILABLE,
"document index region range start_key size is not 8 or 16 bytes");
}

if (region_range.end_key().size() != Constant::kDocumentKeyMinLenWithPrefix &&
region_range.end_key().size() != Constant::kDocumentKeyMaxLenWithPrefix) {
DINGO_LOG(ERROR) << "CreateRegion document index region range end_key size is not 8 or 16, end_key="
<< Helper::StringToHex(region_range.end_key());
return butil::Status(pb::error::Errno::EREGION_UNAVAILABLE,
"document index region range end_key size is not 8 or 16 bytes");
}

if (part_id <= 0) {
DINGO_LOG(ERROR) << "CreateRegion document index region part_id is not legal, part_id=" << part_id;
return butil::Status(pb::error::Errno::EREGION_UNAVAILABLE, "document index region part_id is not legal");
}
}

// select store for region
butil::FlatMap<int64_t, pb::common::Store> store_map_copy;
store_map_copy.init(100);
Expand Down Expand Up @@ -2046,13 +2073,42 @@ butil::Status CoordinatorControl::CreateRegionFinal(
return ret1;
}
}
} else if (new_index_parameter.has_document_index_parameter()) {
} else {
DINGO_LOG(ERROR) << "CreateRegionFinal index_parameter is not legal, not vector, index_parameter="
<< new_index_parameter.ShortDebugString();
return butil::Status(pb::error::Errno::EREGION_UNAVAILABLE, "index_parameter is not legal, not vector");
}
} else if (region_type == pb::common::RegionType::DOCUMENT_REGION) {
if (new_index_parameter.has_document_index_parameter()) {
store_type = pb::common::StoreType::NODE_TYPE_DOCUMENT;

// validate document index region range
// range's start_key and end_key must be less than 16 bytes
if (region_range.start_key().size() != Constant::kDocumentKeyMinLenWithPrefix &&
region_range.start_key().size() != Constant::kDocumentKeyMaxLenWithPrefix) {
DINGO_LOG(ERROR) << "CreateRegion document index region range start_key size is not 8 or 16, start_key="
<< Helper::StringToHex(region_range.start_key());
return butil::Status(pb::error::Errno::EREGION_UNAVAILABLE,
"document index region range start_key size is not 8 or 16 bytes");
}

if (region_range.end_key().size() != Constant::kDocumentKeyMinLenWithPrefix &&
region_range.end_key().size() != Constant::kDocumentKeyMaxLenWithPrefix) {
DINGO_LOG(ERROR) << "CreateRegion document index region range end_key size is not 8 or 16, end_key="
<< Helper::StringToHex(region_range.end_key());
return butil::Status(pb::error::Errno::EREGION_UNAVAILABLE,
"document index region range end_key size is not 8 or 16 bytes");
}

if (part_id <= 0) {
DINGO_LOG(ERROR) << "CreateRegion document index region part_id is not legal, part_id=" << part_id;
return butil::Status(pb::error::Errno::EREGION_UNAVAILABLE, "document index region part_id is not legal");
}

} else {
DINGO_LOG(ERROR) << "CreateRegionFinal index_parameter is not legal, not vector or document, index_parameter="
DINGO_LOG(ERROR) << "CreateRegionFinal index_parameter is not legal, not document, index_parameter="
<< new_index_parameter.ShortDebugString();
return butil::Status(pb::error::Errno::EREGION_UNAVAILABLE,
"index_parameter is not legal, not vector or document");
return butil::Status(pb::error::Errno::EREGION_UNAVAILABLE, "index_parameter is not legal, not document");
}
}

Expand Down Expand Up @@ -2187,14 +2243,18 @@ butil::Status CoordinatorControl::GetCreateRegionStoreIds(pb::common::RegionType
if (region_type == pb::common::RegionType::INDEX_REGION) {
if (index_parameter.has_vector_index_parameter()) {
store_type = pb::common::StoreType::NODE_TYPE_INDEX;
} else if (index_parameter.has_document_index_parameter()) {
} else {
DINGO_LOG(ERROR) << "GetCreateRegionStoreIds index_parameter is not legal, not vector, index_parameter="
<< index_parameter.ShortDebugString();
return butil::Status(pb::error::Errno::EREGION_UNAVAILABLE, "index_parameter is not legal, not vector");
}
} else if (region_type == pb::common::RegionType::DOCUMENT_REGION) {
if (index_parameter.has_document_index_parameter()) {
store_type = pb::common::StoreType::NODE_TYPE_DOCUMENT;
} else {
DINGO_LOG(ERROR)
<< "GetCreateRegionStoreIds index_parameter is not legal, not vector or document, index_parameter="
<< index_parameter.ShortDebugString();
return butil::Status(pb::error::Errno::EREGION_UNAVAILABLE,
"index_parameter is not legal, not vector or document");
DINGO_LOG(ERROR) << "GetCreateRegionStoreIds index_parameter is not legal, not document, index_parameter="
<< index_parameter.ShortDebugString();
return butil::Status(pb::error::Errno::EREGION_UNAVAILABLE, "index_parameter is not legal, not document");
}
}

Expand Down
17 changes: 12 additions & 5 deletions src/coordinator/coordinator_control_meta.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1303,8 +1303,15 @@ butil::Status CoordinatorControl::CreateIndex(int64_t schema_id, const pb::meta:
pb::common::StorageEngine region_store_engine_type = table_definition.store_engine();
GetStoreEngine(region_store_engine_type);

pb::common::RegionType region_type = pb::common::RegionType::STORE_REGION;
if (table_definition.index_parameter().has_vector_index_parameter()) {
region_type = pb::common::RegionType::INDEX_REGION;
} else if (table_definition.index_parameter().has_document_index_parameter()) {
region_type = pb::common::RegionType::DOCUMENT_REGION;
}

std::vector<int64_t> store_ids;
auto ret4 = GetCreateRegionStoreIds(pb::common::RegionType::INDEX_REGION, region_raw_engine_type, "", replica,
auto ret4 = GetCreateRegionStoreIds(region_type, region_raw_engine_type, "", replica,
table_definition.index_parameter(), store_ids);
if (!ret4.ok()) {
DINGO_LOG(ERROR) << "GetCreateRegionStoreIds error:" << ret4.error_str()
Expand All @@ -1321,10 +1328,10 @@ butil::Status CoordinatorControl::CreateIndex(int64_t schema_id, const pb::meta:
table_definition.name() + std::string("_part_") + std::to_string(new_part_id);

std::vector<pb::coordinator::StoreOperation> store_operations;
auto ret = CreateRegionFinal(region_name, pb::common::RegionType::INDEX_REGION, region_raw_engine_type,
region_store_engine_type, "", replica, new_part_range, schema_id, 0, new_index_id,
new_part_id, tenant_id, table_definition.index_parameter(), store_ids, 0,
new_region_id, store_operations, meta_increment);
auto ret = CreateRegionFinal(region_name, region_type, region_raw_engine_type, region_store_engine_type, "",
replica, new_part_range, schema_id, 0, new_index_id, new_part_id, tenant_id,
table_definition.index_parameter(), store_ids, 0, new_region_id, store_operations,
meta_increment);
if (!ret.ok()) {
DINGO_LOG(ERROR) << "CreateRegion failed in CreateIndex index_name=" << table_definition.name();
return ret;
Expand Down
Loading

0 comments on commit f533e5d

Please sign in to comment.