From 012d2f942ab67c92bb2815f969a46d9ca255e221 Mon Sep 17 00:00:00 2001 From: Ketor Date: Tue, 5 Mar 2024 01:54:17 +0800 Subject: [PATCH] [feat][store] Add CheckNotExists in prewrite. Signed-off-by: Ketor --- proto/store.proto | 13 +++++++------ src/client/store_client_function_txn.cc | 19 ++++++++++++++++++- src/engine/txn_engine_helper.cc | 18 ++++++++++++++++++ src/server/index_service.cc | 2 +- 4 files changed, 44 insertions(+), 8 deletions(-) diff --git a/proto/store.proto b/proto/store.proto index 130c3f13c..3c1c6d5cc 100644 --- a/proto/store.proto +++ b/proto/store.proto @@ -431,12 +431,13 @@ message WriteInfo { } enum Op { - None = 0; // just a placeholder - Put = 1; // prewrite: do data upsert, like KvPut or VectorAdd - Delete = 2; // prewrite: do data delete, like KvDelete or VectorDelete - PutIfAbsent = 3; // prewrite: do insert, if id is exists, return AlreadyExist error - Rollback = 4; // rollback: only do rollback - Lock = 5; // pessimistic lock: do lock + None = 0; // just a placeholder + Put = 1; // prewrite: do data upsert, like KvPut or VectorAdd + Delete = 2; // prewrite: do data delete, like KvDelete or VectorDelete + PutIfAbsent = 3; // prewrite: do insert, if id is exists, return AlreadyExist error + Rollback = 4; // rollback: only do rollback + Lock = 5; // pessimistic lock: do lock + CheckNotExists = 6; // prewrite: check if the key is not exists } message Mutation { diff --git a/src/client/store_client_function_txn.cc b/src/client/store_client_function_txn.cc index 5254515b6..56acaa9a9 100644 --- a/src/client/store_client_function_txn.cc +++ b/src/client/store_client_function_txn.cc @@ -58,7 +58,7 @@ DEFINE_int64(max_commit_ts, 0, "max_commit_ts"); DEFINE_bool(key_only, false, "key_only"); DEFINE_bool(with_start, true, "with_start"); DEFINE_bool(with_end, false, "with_end"); -DEFINE_string(mutation_op, "", "mutation_op, [put, delete, putifabsent, lock]"); +DEFINE_string(mutation_op, "", "mutation_op, [put, delete, putifabsent, lock, check_not_exists]"); DEFINE_string(key2, "", "key2"); DEFINE_string(value2, "value2", "value2"); DEFINE_bool(rc, false, "read commited"); @@ -281,6 +281,18 @@ void StoreSendTxnPrewrite(int64_t region_id, const dingodb::pb::common::Region& mutation->set_key(key2); DINGO_LOG(INFO) << "key2: " << FLAGS_key2; } + } else if (FLAGS_mutation_op == "check_not_exists") { + auto* mutation = request.add_mutations(); + mutation->set_op(::dingodb::pb::store::Op::CheckNotExists); + mutation->set_key(key); + DINGO_LOG(INFO) << "key: " << FLAGS_key; + + if (!FLAGS_key2.empty()) { + auto* mutation = request.add_mutations(); + mutation->set_op(::dingodb::pb::store::Op::CheckNotExists); + mutation->set_key(key2); + DINGO_LOG(INFO) << "key2: " << FLAGS_key2; + } } else if (FLAGS_mutation_op == "insert") { if (FLAGS_value.empty()) { DINGO_LOG(ERROR) << "value is empty"; @@ -434,6 +446,11 @@ void IndexSendTxnPrewrite(int64_t region_id, const dingodb::pb::common::Region& mutation->set_op(::dingodb::pb::store::Op::Delete); mutation->set_key( dingodb::Helper::EncodeVectorIndexRegionHeader(region.definition().range().start_key()[0], part_id, vector_id)); + } else if (FLAGS_mutation_op == "check_not_exists") { + auto* mutation = request.add_mutations(); + mutation->set_op(::dingodb::pb::store::Op::CheckNotExists); + mutation->set_key( + dingodb::Helper::EncodeVectorIndexRegionHeader(region.definition().range().start_key()[0], part_id, vector_id)); } else if (FLAGS_mutation_op == "insert") { auto* mutation = request.add_mutations(); mutation->set_op(::dingodb::pb::store::Op::PutIfAbsent); diff --git a/src/engine/txn_engine_helper.cc b/src/engine/txn_engine_helper.cc index 9582cf060..9bf81f604 100644 --- a/src/engine/txn_engine_helper.cc +++ b/src/engine/txn_engine_helper.cc @@ -2066,6 +2066,24 @@ butil::Status TxnEngineHelper::Prewrite(RawEnginePtr raw_engine, std::shared_ptr kv_puts_lock.push_back(kv); } } + + } else if (mutation.op() == pb::store::Op::CheckNotExists) { + // For CheckNotExists, this op is equal to PutIfAbsent, but we do not need to write anything, just check if key is + // exist. So it is more likely to be a get op in prewrite. + + DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail) + << fmt::format("[txn][region({})] Prewrite", region->Id()) << ", key: " << Helper::StringToHex(mutation.key()) + << " is CheckNotExists, start_ts: " << start_ts << ", region_epoch: " << ctx->RegionEpoch().ShortDebugString() + << ", mutations_size: " << mutations.size() << ", prev_write_info is: " << write_info.ShortDebugString(); + + // check if key is exist + if (write_info.op() == pb::store::Op::Put) { + response->add_keys_already_exist()->set_key(mutation.key()); + } + + // CheckNotExists only do check and return keys_already_exists, nothing to write. + continue; + } else if (mutation.op() == pb::store::Op::Delete) { if (BAIDU_UNLIKELY(is_repeated_prewrite)) { DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail) diff --git a/src/server/index_service.cc b/src/server/index_service.cc index cb71eab1d..5a110da4a 100644 --- a/src/server/index_service.cc +++ b/src/server/index_service.cc @@ -1557,7 +1557,7 @@ static butil::Status ValidateIndexTxnPrewriteRequest(StoragePtr storage, const p "Param vector binary dimension is error, correct dimension is " + std::to_string(dimension)); } } - } else if (mutation.op() == pb::store::Op::Delete) { + } else if (mutation.op() == pb::store::Op::Delete || mutation.op() == pb::store::Op::CheckNotExists) { if (BAIDU_UNLIKELY(!VectorCodec::IsLegalVectorId(vector_id))) { return butil::Status(pb::error::EILLEGAL_PARAMTETERS, "Param vector id is not allowed to be zero, INT64_MAX or negative, please check the "