Skip to content

Commit

Permalink
[fix][sdk] Fix sdk txn scan batch count.
Browse files Browse the repository at this point in the history
Signed-off-by: Ketor <[email protected]>
  • Loading branch information
ketor authored and rock-git committed May 23, 2024
1 parent 3cd8b7d commit b2a4a9b
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 15 deletions.
6 changes: 6 additions & 0 deletions src/example/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,9 @@ add_executable(sdk_vector_example
target_link_libraries(sdk_vector_example
sdk
)

add_executable(txn_scan_test
txn_scan_test.cc)
target_link_libraries(txn_scan_test
sdk
)
217 changes: 217 additions & 0 deletions src/example/txn_scan_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Copyright (c) 2023 dingodb.com, Inc. All Rights Reserved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <unistd.h>

#include <cstdint>
#include <memory>
#include <unordered_map>
#include <utility>
#include <vector>

#include "common/logging.h"
#include "glog/logging.h"
#include "sdk/client.h"
#include "sdk/status.h"

using dingodb::sdk::Status;

DEFINE_string(coordinator_url, "", "coordinator url");
DEFINE_int64(scan_test_count, 5000, "scan test count");
DEFINE_string(range_start, "xa00000000", "range start key");
DEFINE_string(range_end, "xc00000000", "range end key");
DEFINE_bool(do_put, true, "do put operation");
DEFINE_bool(do_scan, true, "do scan operation");
DEFINE_bool(do_clean, true, "do clean operation");
DEFINE_bool(use_btree, false, "use btree engine");

static std::shared_ptr<dingodb::sdk::Client> g_client;

static std::vector<int64_t> g_region_ids;

static std::vector<std::string> keys;
static std::vector<std::string> values;
static std::unordered_map<std::string, std::string> key_values;
static void PrepareTxnData() {
for (int32_t i = 0; i < FLAGS_scan_test_count; i++) {
keys.push_back(FLAGS_range_start + std::to_string(i));
values.push_back(std::to_string(i));
}

for (auto i = 0; i < keys.size(); i++) {
key_values.emplace(std::make_pair(keys[i], values[i]));
}
}

static void CreateRegion(std::string name, std::string start_key, std::string end_key, int replicas = 3) {
CHECK(!name.empty()) << "name should not empty";
CHECK(!start_key.empty()) << "start_key should not empty";
CHECK(!end_key.empty()) << "end_key should not empty";
CHECK(start_key < end_key) << "start_key must < end_key";
CHECK(replicas > 0) << "replicas must > 0";

dingodb::sdk::RegionCreator* tmp_creator;
Status built = g_client->NewRegionCreator(&tmp_creator);
CHECK(built.IsOK()) << "dingo creator build fail";
std::shared_ptr<dingodb::sdk::RegionCreator> creator(tmp_creator);
CHECK_NOTNULL(creator.get());

int64_t region_id = -1;
auto engine_type = dingodb::sdk::EngineType::kLSM;
if (FLAGS_use_btree) {
engine_type = dingodb::sdk::EngineType::kBTree;
}

Status tmp = creator->SetRegionName(name)
.SetEngineType(engine_type)
.SetRange(start_key, end_key)
.SetReplicaNum(replicas)
.Wait(true)
.Create(region_id);
DINGO_LOG(INFO) << "Create region status: " << tmp.ToString() << ", region_id:" << region_id;

if (tmp.ok()) {
CHECK(region_id > 0);
bool inprogress = true;
g_client->IsCreateRegionInProgress(region_id, inprogress);
CHECK(!inprogress);
g_region_ids.push_back(region_id);
}
}

static void PostClean() {
for (const auto region_id : g_region_ids) {
Status tmp = g_client->DropRegion(region_id);
DINGO_LOG(INFO) << "drop region status: " << tmp.ToString() << ", region_id:" << region_id;
bool inprogress = true;
tmp = g_client->IsCreateRegionInProgress(region_id, inprogress);
DINGO_LOG(INFO) << "query region status: " << tmp.ToString() << ", region_id:" << region_id;
}
}

static std::shared_ptr<dingodb::sdk::Transaction> NewOptimisticTransaction(dingodb::sdk::TransactionIsolation isolation,
uint32_t keep_alive_ms = 0) {
dingodb::sdk::TransactionOptions options;
options.isolation = isolation;
options.kind = dingodb::sdk::kOptimistic;
options.keep_alive_ms = keep_alive_ms;

dingodb::sdk::Transaction* tmp;
Status built = g_client->NewTransaction(options, &tmp);
CHECK(built.ok()) << "dingo txn build fail";
std::shared_ptr<dingodb::sdk::Transaction> txn(tmp);
CHECK_NOTNULL(txn.get());
return txn;
}

void OptimisticTxnPut() {
// write data into store
auto txn = NewOptimisticTransaction(dingodb::sdk::kSnapshotIsolation);
if (FLAGS_do_put) {
auto start_time_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
.count();

for (const auto& [key, value] : key_values) {
txn->Put(key, value);
}

DINGO_LOG(INFO) << "prepare data time cost ms: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count() -
start_time_ms;

Status precommit = txn->PreCommit();
DINGO_LOG(INFO) << "precommit:" << precommit.ToString();
DINGO_LOG(INFO) << "prewrite data time cost ms: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count() -
start_time_ms;

Status commit = txn->Commit();
DINGO_LOG(INFO) << "txn commit:" << commit.ToString();

DINGO_LOG(INFO) << "commit time cost ms: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count() -
start_time_ms;
}
}

void OptimisticTxnScan() {
auto read_commit_txn = NewOptimisticTransaction(dingodb::sdk::kReadCommitted);
auto start_time_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
.count();

// readCommiited should read txn commit data
std::vector<dingodb::sdk::KVPair> kvs;
Status scan = read_commit_txn->Scan(FLAGS_range_start, FLAGS_range_end, 0, kvs);
DINGO_LOG(INFO) << "read_commit_txn scan:" << scan.ToString();

Status precommit = read_commit_txn->PreCommit();
DINGO_LOG(INFO) << "read_commit_txn precommit:" << precommit.ToString();
Status commit = read_commit_txn->Commit();
DINGO_LOG(INFO) << "read_commit_txn commit:" << commit.ToString();

DINGO_LOG(INFO) << " time cost ms: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count() -
start_time_ms;
}

int main(int argc, char* argv[]) {
FLAGS_minloglevel = google::GLOG_INFO;
FLAGS_logtostdout = true;
FLAGS_colorlogtostdout = true;
FLAGS_logbufsecs = 0;
// FLAGS_v = dingodb::kGlobalValueOfDebug;

google::InitGoogleLogging(argv[0]);
google::ParseCommandLineFlags(&argc, &argv, true);

if (FLAGS_coordinator_url.empty()) {
DINGO_LOG(ERROR) << "coordinator url is empty, try to use file://./coor_list";
FLAGS_coordinator_url = "file://./coor_list";
}

dingodb::sdk::Client* tmp;
Status built = dingodb::sdk::Client::Build(FLAGS_coordinator_url, &tmp);
if (!built.ok()) {
DINGO_LOG(ERROR) << "Fail to build client, please check parameter --url=" << FLAGS_coordinator_url;
return -1;
}
CHECK_NOTNULL(tmp);
g_client.reset(tmp);

if (FLAGS_do_put) {
PrepareTxnData();

CreateRegion("skd_example01", FLAGS_range_start, FLAGS_range_end, 3);

OptimisticTxnPut();
}

if (FLAGS_do_scan) {
OptimisticTxnScan();
}

if (FLAGS_do_clean) {
PostClean();
}
}
22 changes: 19 additions & 3 deletions src/sdk/common/helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,25 @@ namespace sdk {
// TODO: log in rpc when we support async
template <class StoreClientRpc>
static Status LogAndSendRpc(const ClientStub& stub, StoreClientRpc& rpc, std::shared_ptr<Region> region) {
StoreRpcController controller(stub, rpc, region);
Status s = controller.Call();
return s;
if (fLB::FLAGS_log_rpc_time) {
auto start_time_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
StoreRpcController controller(stub, rpc, region);
Status s = controller.Call();

DINGO_LOG(INFO) << "rpc: " << rpc.Method() << " region: " << region->RegionId() << " cost: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count() -
start_time_ms
<< "ms";
return s;
} else {
StoreRpcController controller(stub, rpc, region);
Status s = controller.Call();
return s;
}
}

} // namespace sdk
Expand Down
8 changes: 6 additions & 2 deletions src/sdk/common/param_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "sdk/common/param_config.h"
#include "gflags/gflags.h"

// sdk config
DEFINE_int64(actuator_thread_num, 8, "actuator thread num");
Expand All @@ -33,7 +33,7 @@ DEFINE_int64(rpc_time_out_ms, 500000, "rpc call timeout ms");
DEFINE_int64(store_rpc_max_retry, 5, "store rpc max retry times, use case: wrong leader or request range invalid");
DEFINE_int64(store_rpc_retry_delay_ms, 1000, "store rpc retry delay ms");

DEFINE_int64(scan_batch_size, 10, "scan batch size, use for region scanner");
DEFINE_int64(scan_batch_size, 1000, "scan batch size, use for region scanner");

DEFINE_int64(txn_op_delay_ms, 200, "txn op delay ms");
DEFINE_int64(txn_op_max_retry, 2, "txn op max retry times");
Expand All @@ -43,3 +43,7 @@ DEFINE_int64(raw_kv_max_retry, 5, "raw kv max retry times");

DEFINE_int64(vector_op_delay_ms, 500, "raw kv backoff delay ms");
DEFINE_int64(vector_op_max_retry, 10, "raw kv max retry times");

DEFINE_int64(txn_max_batch_count, 1000, "txn max batch count");

DEFINE_bool(log_rpc_time, false, "log rpc time");
3 changes: 3 additions & 0 deletions src/sdk/common/param_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,7 @@ DECLARE_int64(txn_op_max_retry);
DECLARE_int64(vector_op_delay_ms);
DECLARE_int64(vector_op_max_retry);

DECLARE_int64(txn_max_batch_count);
DECLARE_bool(log_rpc_time);

#endif // DINGODB_SDK_PARAM_CONFIG_H_
51 changes: 41 additions & 10 deletions src/sdk/transaction/txn_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "common/logging.h"
#include "fmt/core.h"
#include "gflags/gflags.h"
#include "glog/logging.h"
#include "proto/meta.pb.h"
#include "proto/store.pb.h"
Expand Down Expand Up @@ -654,17 +655,35 @@ Status Transaction::TxnImpl::PreCommit() {
CHECK(iter != region_id_to_region.end());
auto region = iter->second;

dingodb::pb::store::TxnPrewriteRequest txn_prewrite_request;
for (const auto& mutation : mutation_entry.second) {
TxnMutation2MutationPB(mutation, txn_prewrite_request.add_mutations());
}

auto rpc = PrepareTxnPrewriteRpc(region);

for (const auto& mutation : mutation_entry.second) {
TxnMutation2MutationPB(mutation, rpc->MutableRequest()->add_mutations());
uint32_t tmp_count = 0;
for (int i = 0; i < txn_prewrite_request.mutations_size(); i++) {
*rpc->MutableRequest()->add_mutations() = txn_prewrite_request.mutations(i);
tmp_count++;

if (tmp_count == FLAGS_txn_max_batch_count) {
sub_tasks.emplace_back(rpc.get(), region);
rpcs.push_back(std::move(rpc));
tmp_count = 0;
rpc = PrepareTxnPrewriteRpc(region);
}
}

sub_tasks.emplace_back(rpc.get(), region);
rpcs.push_back(std::move(rpc));
DCHECK_NOTNULL(rpc);

if (tmp_count > 0) {
sub_tasks.emplace_back(rpc.get(), region);
rpcs.push_back(std::move(rpc));
}
}

DCHECK_EQ(rpcs.size(), region_mutations.size());
// DCHECK_EQ(rpcs.size(), region_mutations.size());
DCHECK_EQ(rpcs.size(), sub_tasks.size());

std::vector<std::thread> thread_pool;
Expand Down Expand Up @@ -841,15 +860,27 @@ Status Transaction::TxnImpl::Commit() {
auto region = iter->second;

std::unique_ptr<TxnCommitRpc> rpc = PrepareTxnCommitRpc(region);

uint32_t tmp_count = 0;
for (const auto& key : entry.second) {
auto* fill = rpc->MutableRequest()->add_keys();
*fill = key;
rpc->MutableRequest()->add_keys(key);
tmp_count++;

if (tmp_count == FLAGS_txn_max_batch_count) {
sub_tasks.emplace_back(rpc.get(), region);
rpcs.push_back(std::move(rpc));
tmp_count = 0;
rpc = PrepareTxnCommitRpc(region);
}
}

if (tmp_count > 0) {
sub_tasks.emplace_back(rpc.get(), region);
rpcs.push_back(std::move(rpc));
}
sub_tasks.emplace_back(rpc.get(), region);
rpcs.push_back(std::move(rpc));
}

DCHECK_EQ(rpcs.size(), region_commit_keys.size());
// DCHECK_EQ(rpcs.size(), region_commit_keys.size());
DCHECK_EQ(rpcs.size(), sub_tasks.size());

std::vector<std::thread> thread_pool;
Expand Down

0 comments on commit b2a4a9b

Please sign in to comment.