Skip to content

Commit

Permalink
[feat][store] Add CheckNotExists in prewrite.
Browse files Browse the repository at this point in the history
Signed-off-by: Ketor <[email protected]>
  • Loading branch information
ketor authored and rock-git committed Mar 5, 2024
1 parent 64b82d9 commit 012d2f9
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 8 deletions.
13 changes: 7 additions & 6 deletions proto/store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -431,12 +431,13 @@ message WriteInfo {
}

enum Op {
None = 0; // just a placeholder
Put = 1; // prewrite: do data upsert, like KvPut or VectorAdd
Delete = 2; // prewrite: do data delete, like KvDelete or VectorDelete
PutIfAbsent = 3; // prewrite: do insert, if id is exists, return AlreadyExist error
Rollback = 4; // rollback: only do rollback
Lock = 5; // pessimistic lock: do lock
None = 0; // just a placeholder
Put = 1; // prewrite: do data upsert, like KvPut or VectorAdd
Delete = 2; // prewrite: do data delete, like KvDelete or VectorDelete
PutIfAbsent = 3; // prewrite: do insert, if id is exists, return AlreadyExist error
Rollback = 4; // rollback: only do rollback
Lock = 5; // pessimistic lock: do lock
CheckNotExists = 6; // prewrite: check if the key is not exists
}

message Mutation {
Expand Down
19 changes: 18 additions & 1 deletion src/client/store_client_function_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ DEFINE_int64(max_commit_ts, 0, "max_commit_ts");
DEFINE_bool(key_only, false, "key_only");
DEFINE_bool(with_start, true, "with_start");
DEFINE_bool(with_end, false, "with_end");
DEFINE_string(mutation_op, "", "mutation_op, [put, delete, putifabsent, lock]");
DEFINE_string(mutation_op, "", "mutation_op, [put, delete, putifabsent, lock, check_not_exists]");
DEFINE_string(key2, "", "key2");
DEFINE_string(value2, "value2", "value2");
DEFINE_bool(rc, false, "read commited");
Expand Down Expand Up @@ -281,6 +281,18 @@ void StoreSendTxnPrewrite(int64_t region_id, const dingodb::pb::common::Region&
mutation->set_key(key2);
DINGO_LOG(INFO) << "key2: " << FLAGS_key2;
}
} else if (FLAGS_mutation_op == "check_not_exists") {
auto* mutation = request.add_mutations();
mutation->set_op(::dingodb::pb::store::Op::CheckNotExists);
mutation->set_key(key);
DINGO_LOG(INFO) << "key: " << FLAGS_key;

if (!FLAGS_key2.empty()) {
auto* mutation = request.add_mutations();
mutation->set_op(::dingodb::pb::store::Op::CheckNotExists);
mutation->set_key(key2);
DINGO_LOG(INFO) << "key2: " << FLAGS_key2;
}
} else if (FLAGS_mutation_op == "insert") {
if (FLAGS_value.empty()) {
DINGO_LOG(ERROR) << "value is empty";
Expand Down Expand Up @@ -434,6 +446,11 @@ void IndexSendTxnPrewrite(int64_t region_id, const dingodb::pb::common::Region&
mutation->set_op(::dingodb::pb::store::Op::Delete);
mutation->set_key(
dingodb::Helper::EncodeVectorIndexRegionHeader(region.definition().range().start_key()[0], part_id, vector_id));
} else if (FLAGS_mutation_op == "check_not_exists") {
auto* mutation = request.add_mutations();
mutation->set_op(::dingodb::pb::store::Op::CheckNotExists);
mutation->set_key(
dingodb::Helper::EncodeVectorIndexRegionHeader(region.definition().range().start_key()[0], part_id, vector_id));
} else if (FLAGS_mutation_op == "insert") {
auto* mutation = request.add_mutations();
mutation->set_op(::dingodb::pb::store::Op::PutIfAbsent);
Expand Down
18 changes: 18 additions & 0 deletions src/engine/txn_engine_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2066,6 +2066,24 @@ butil::Status TxnEngineHelper::Prewrite(RawEnginePtr raw_engine, std::shared_ptr
kv_puts_lock.push_back(kv);
}
}

} else if (mutation.op() == pb::store::Op::CheckNotExists) {
// For CheckNotExists, this op is equal to PutIfAbsent, but we do not need to write anything, just check if key is
// exist. So it is more likely to be a get op in prewrite.

DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
<< fmt::format("[txn][region({})] Prewrite", region->Id()) << ", key: " << Helper::StringToHex(mutation.key())
<< " is CheckNotExists, start_ts: " << start_ts << ", region_epoch: " << ctx->RegionEpoch().ShortDebugString()
<< ", mutations_size: " << mutations.size() << ", prev_write_info is: " << write_info.ShortDebugString();

// check if key is exist
if (write_info.op() == pb::store::Op::Put) {
response->add_keys_already_exist()->set_key(mutation.key());
}

// CheckNotExists only do check and return keys_already_exists, nothing to write.
continue;

} else if (mutation.op() == pb::store::Op::Delete) {
if (BAIDU_UNLIKELY(is_repeated_prewrite)) {
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail)
Expand Down
2 changes: 1 addition & 1 deletion src/server/index_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1557,7 +1557,7 @@ static butil::Status ValidateIndexTxnPrewriteRequest(StoragePtr storage, const p
"Param vector binary dimension is error, correct dimension is " + std::to_string(dimension));
}
}
} else if (mutation.op() == pb::store::Op::Delete) {
} else if (mutation.op() == pb::store::Op::Delete || mutation.op() == pb::store::Op::CheckNotExists) {
if (BAIDU_UNLIKELY(!VectorCodec::IsLegalVectorId(vector_id))) {
return butil::Status(pb::error::EILLEGAL_PARAMTETERS,
"Param vector id is not allowed to be zero, INT64_MAX or negative, please check the "
Expand Down

0 comments on commit 012d2f9

Please sign in to comment.