Skip to content

Commit

Permalink
[feat][coordinator] Add log switch for coordinator kv apis.
Browse files Browse the repository at this point in the history
Signed-off-by: Ketor <[email protected]>
  • Loading branch information
ketor committed Feb 29, 2024
1 parent 9c925ab commit 379941c
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 76 deletions.
3 changes: 3 additions & 0 deletions conf/coordinator-gflags.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@
-free_memory_to_system_interval=60
-enable_dir_service=false
-enable_threads_service=false
-dingo_log_switch_coor_kv=true
-dingo_log_switch_coor_watch=true
-dingo_log_switch_coor_lease=true
125 changes: 74 additions & 51 deletions src/coordinator/kv_control_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ DEFINE_int64(compaction_retention_rev_count, 1000, "max revision count retention
DEFINE_bool(auto_compaction, false, "auto compaction on/off");
DEFINE_int64(version_kv_max_count, 100000, "max kv count for version kv");

DEFINE_bool(dingo_log_switch_coor_kv, false, "log switch for kv control");

std::string KvControl::RevisionToString(const pb::coordinator_internal::RevisionInternal &revision) {
Buf buf(17);
buf.WriteLong(revision.main());
Expand Down Expand Up @@ -275,10 +277,10 @@ butil::Status KvControl::KvRange(const std::string &key, const std::string &rang

return_count = kv.size();

DINGO_LOG(INFO) << "KvRange finish, key: " << key << "(" << Helper::StringToHex(key) << "), range_end: " << range_end
<< "(" << Helper::StringToHex(range_end) << "), limit: " << limit << ", keys_only: " << keys_only
<< ", count_only: " << count_only << ", kv size: " << kv.size()
<< ", total_count_in_range: " << return_count;
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvRange finish, key: " << key << "(" << Helper::StringToHex(key) << "), range_end: " << range_end << "("
<< Helper::StringToHex(range_end) << "), limit: " << limit << ", keys_only: " << keys_only
<< ", count_only: " << count_only << ", kv size: " << kv.size() << ", total_count_in_range: " << return_count;

return butil::Status::OK();
}
Expand Down Expand Up @@ -446,9 +448,10 @@ butil::Status KvControl::KvPut(const pb::common::KeyValue &key_value_in, int64_t
}

// update kv_index
DINGO_LOG(INFO) << "KvPut will submit meta_increment, key: " << key_value_in.key() << "("
<< Helper::StringToHex(key_value_in.key()) << "), value: " << key_value_in.value() << "("
<< Helper::StringToHex(key_value_in.value()) << ")";
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvPut will submit meta_increment, key: " << key_value_in.key() << "("
<< Helper::StringToHex(key_value_in.key()) << "), value: " << Helper::StringToHex(key_value_in.value()) << "("
<< Helper::StringToHex(key_value_in.value()) << ")";

// add meta_increment
auto *kv_index_meta_increment = meta_increment.add_kv_indexes();
Expand Down Expand Up @@ -502,8 +505,9 @@ butil::Status KvControl::KvDeleteRange(const std::string &key, const std::string
// do kv_delete
for (const auto &kv_to_delete : kvs_to_delete) {
// update kv_index
DINGO_LOG(INFO) << "KvDelete will submit meta_increment, key: " << kv_to_delete.kv().key() << "("
<< Helper::StringToHex(kv_to_delete.kv().key()) << ")";
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvDelete will submit meta_increment, key: " << kv_to_delete.kv().key() << "("
<< Helper::StringToHex(kv_to_delete.kv().key()) << ")";

// add meta_increment
auto *kv_index_meta_increment = meta_increment.add_kv_indexes();
Expand Down Expand Up @@ -568,8 +572,9 @@ butil::Status KvControl::KvPutApply(const std::string &key,

auto ret = GetRawKvIndex(key, kv_index);
if (!ret.ok()) {
DINGO_LOG(INFO) << "KvPutApply GetRawKvIndex not found, will create key: " << key << "(" << Helper::StringToHex(key)
<< "), error: " << ret.error_str();
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvPutApply GetRawKvIndex not found, will create key: " << key << "(" << Helper::StringToHex(key)
<< "), error: " << ret.error_str();
kv_index.set_id(key);
kv_index.mutable_mod_revision()->set_main(op_revision.main());
kv_index.mutable_mod_revision()->set_sub(op_revision.sub());
Expand All @@ -579,11 +584,13 @@ butil::Status KvControl::KvPutApply(const std::string &key,
generation->set_verison(1);
*(generation->add_revisions()) = op_revision;

DINGO_LOG(INFO) << "KvPutApply GetRawKvIndex not found, will create new kv_index: key: " << key << "("
<< Helper::StringToHex(key) << "), kv_index generation: " << generation->ShortDebugString();
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvPutApply GetRawKvIndex not found, will create new kv_index: key: " << key << "("
<< Helper::StringToHex(key) << "), kv_index generation: " << generation->ShortDebugString();
} else {
DINGO_LOG(INFO) << "KvPutApply GetRawKvIndex found, will update key: " << key << "(" << Helper::StringToHex(key)
<< "), error: " << ret.error_str();
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvPutApply GetRawKvIndex found, will update key: " << key << "(" << Helper::StringToHex(key)
<< "), error: " << ret.error_str();

last_mod_revision = kv_index.mod_revision();

Expand All @@ -594,17 +601,19 @@ butil::Status KvControl::KvPutApply(const std::string &key,
generation->set_verison(1);
*(generation->add_revisions()) = op_revision;

DINGO_LOG(INFO) << "KvPutApply kv_index add generation: " << generation->ShortDebugString() << ", key: " << key
<< "(" << Helper::StringToHex(key) << ")";
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvPutApply kv_index add generation: " << generation->ShortDebugString() << ", key: " << key << "("
<< Helper::StringToHex(key) << ")";
} else {
// auto &latest_generation = *kv_index.mutable_generations()->rbegin();
auto *latest_generation = kv_index.mutable_generations(kv_index.generations_size() - 1);
if (latest_generation->has_create_revision()) {
*(latest_generation->add_revisions()) = op_revision;
latest_generation->set_verison(latest_generation->verison() + 1);

DINGO_LOG(INFO) << "KvPutApply latest_generation add revsion: " << latest_generation->ShortDebugString()
<< ", key: " << key << "(" << Helper::StringToHex(key) << ")";
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvPutApply latest_generation add revsion: " << latest_generation->ShortDebugString() << ", key: " << key
<< "(" << Helper::StringToHex(key) << ")";

// only in this situation, the prev_kv is meaningful
prev_kv.set_create_revision(latest_generation->create_revision().main());
Expand All @@ -616,8 +625,9 @@ butil::Status KvControl::KvPutApply(const std::string &key,
latest_generation->set_verison(1);
*(latest_generation->add_revisions()) = op_revision;

DINGO_LOG(INFO) << "KvPutApply latest_generation create revsion: " << latest_generation->ShortDebugString()
<< ", key: " << key << "(" << Helper::StringToHex(key) << ")";
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvPutApply latest_generation create revsion: " << latest_generation->ShortDebugString()
<< ", key: " << key << "(" << Helper::StringToHex(key) << ")";
}

// setup new_create_revision to last create_revision
Expand Down Expand Up @@ -681,16 +691,18 @@ butil::Status KvControl::KvPutApply(const std::string &key,
<< ", kv_rev: " << kv_rev.ShortDebugString() << ", error: " << ret.error_str();
return ret;
}
DINGO_LOG(INFO) << "KvPutApply PutRawKvRev success, revision: " << op_revision.ShortDebugString()
<< ", kv_rev: " << kv_rev.ShortDebugString();
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvPutApply PutRawKvRev success, revision: " << op_revision.ShortDebugString()
<< ", kv_rev: " << kv_rev.ShortDebugString();

ret = PutRawKvIndex(key, kv_index);
if (!ret.ok()) {
DINGO_LOG(ERROR) << "KvPutApply PutRawKvIndex failed, key: " << key << "(" << Helper::StringToHex(key)
<< "), kv_index: " << kv_index.ShortDebugString() << ", error: " << ret.error_str();
}
DINGO_LOG(INFO) << "KvPutApply PutRawKvIndex success, key: " << key << "(" << Helper::StringToHex(key)
<< "), kv_index: " << kv_index.ShortDebugString();
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvPutApply PutRawKvIndex success, key: " << key << "(" << Helper::StringToHex(key)
<< "), kv_index: " << kv_index.ShortDebugString();

// trigger watch
if (!one_time_watch_map_.empty()) {
Expand All @@ -712,9 +724,10 @@ butil::Status KvControl::KvPutApply(const std::string &key,
TriggerOneWatch(key, pb::version::Event::EventType::Event_EventType_PUT, new_kv, prev_kv);
}

DINGO_LOG(INFO) << "KvPutApply success after trigger watch, key: " << key << "(" << Helper::StringToHex(key)
<< "), op_revision: " << op_revision.ShortDebugString() << ", ignore_lease: " << ignore_lease
<< ", lease_id: " << lease_id << ", ignore_value: " << ignore_value << ", value: " << value;
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvPutApply success after trigger watch, key: " << key << "(" << Helper::StringToHex(key)
<< "), op_revision: " << op_revision.ShortDebugString() << ", ignore_lease: " << ignore_lease
<< ", lease_id: " << lease_id << ", ignore_value: " << ignore_value << ", value: " << value;

return butil::Status::OK();
}
Expand All @@ -737,20 +750,23 @@ butil::Status KvControl::KvDeleteApply(const std::string &key,

auto ret = GetRawKvIndex(key, kv_index);
if (!ret.ok()) {
DINGO_LOG(INFO) << "KvDeleteApply GetRawKvIndex not found, no need to delete: " << key << "("
<< Helper::StringToHex(key) << "), error: " << ret.error_str();
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvDeleteApply GetRawKvIndex not found, no need to delete: " << key << "(" << Helper::StringToHex(key)
<< "), error: " << ret.error_str();
return butil::Status::OK();
} else {
DINGO_LOG(INFO) << "KvDeleteApply GetRawKvIndex found, will delete key: " << key << "(" << Helper::StringToHex(key)
<< "), error: " << ret.error_str();
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvDeleteApply GetRawKvIndex found, will delete key: " << key << "(" << Helper::StringToHex(key)
<< "), error: " << ret.error_str();

last_mod_revision = kv_index.mod_revision();

if (kv_index.generations_size() == 0) {
// create a null generator means delete
auto *generation = kv_index.add_generations();
DINGO_LOG(INFO) << "KvDeleteApply kv_index add null generation[0]: " << generation->ShortDebugString()
<< ", key: " << key << "(" << Helper::StringToHex(key) << ")";
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvDeleteApply kv_index add null generation[0]: " << generation->ShortDebugString() << ", key: " << key
<< "(" << Helper::StringToHex(key) << ")";
} else {
// auto &latest_generation = *kv_index.mutable_generations()->rbegin();
auto *latest_generation = kv_index.mutable_generations(kv_index.generations_size() - 1);
Expand All @@ -761,8 +777,9 @@ butil::Status KvControl::KvDeleteApply(const std::string &key,

// create a null generator means delete
auto *generation = kv_index.add_generations();
DINGO_LOG(INFO) << "KvDeleteApply kv_index add null generation[1]: " << generation->ShortDebugString()
<< ", key: " << key << "(" << Helper::StringToHex(key) << ")";
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvDeleteApply kv_index add null generation[1]: " << generation->ShortDebugString() << ", key: " << key
<< "(" << Helper::StringToHex(key) << ")";

// only in this situation, the prev_kv is meaningful
prev_kv.set_create_revision(latest_generation->create_revision().main());
Expand All @@ -771,9 +788,9 @@ butil::Status KvControl::KvDeleteApply(const std::string &key,
} else {
// a null generation means delete
// so we do not need to add a new generation
DINGO_LOG(INFO) << "KvDeleteApply kv_index exist null generation[1], nothing to do: "
<< latest_generation->ShortDebugString() << ", key: " << key << "(" << Helper::StringToHex(key)
<< ")";
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvDeleteApply kv_index exist null generation[1], nothing to do: "
<< latest_generation->ShortDebugString() << ", key: " << key << "(" << Helper::StringToHex(key) << ")";
}

// setup new_create_revision to last create_revision
Expand Down Expand Up @@ -825,8 +842,9 @@ butil::Status KvControl::KvDeleteApply(const std::string &key,
return ret;
}

DINGO_LOG(INFO) << "KvDeleteApply success, key: " << key << "(" << Helper::StringToHex(key)
<< "), revision: " << op_revision.ShortDebugString();
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvDeleteApply success, key: " << key << "(" << Helper::StringToHex(key)
<< "), revision: " << op_revision.ShortDebugString();

// trigger watch
if (!one_time_watch_map_.empty()) {
Expand All @@ -848,14 +866,15 @@ butil::Status KvControl::KvDeleteApply(const std::string &key,
TriggerOneWatch(key, pb::version::Event::EventType::Event_EventType_DELETE, new_kv, prev_kv);
}

DINGO_LOG(INFO) << "KvDeleteApply success after trigger watch, key: " << key << "(" << Helper::StringToHex(key)
<< "), revision: " << op_revision.ShortDebugString();
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvDeleteApply success after trigger watch, key: " << key << "(" << Helper::StringToHex(key)
<< "), revision: " << op_revision.ShortDebugString();

return butil::Status::OK();
}

void KvControl::CompactionTask() {
DINGO_LOG(INFO) << "compaction task start";
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv) << "compaction task start";

if (!FLAGS_auto_compaction) {
DINGO_LOG(INFO) << "compaction task skip, auto_compaction is false";
Expand Down Expand Up @@ -924,7 +943,8 @@ butil::Status KvControl::KvCompact(const std::vector<std::string> &keys,
}

for (const auto &key : keys) {
DINGO_LOG(INFO) << "KvCompact, will compact key: " << key << "(" << Helper::StringToHex(key) << ")";
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvCompact, will compact key: " << key << "(" << Helper::StringToHex(key) << ")";
}

pb::coordinator_internal::MetaIncrement meta_increment;
Expand Down Expand Up @@ -965,8 +985,9 @@ butil::Status KvControl::KvCompactApply(const std::string &key,

// iterate kv_index generations, find revisions less than compact_revision
if (kv_index.generations_size() == 0) {
DINGO_LOG(INFO) << "KvCompactApply generations_size == 0, no need to compact, key: " << key << "("
<< Helper::StringToHex(key) << ")";
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvCompactApply generations_size == 0, no need to compact, key: " << key << "(" << Helper::StringToHex(key)
<< ")";
return butil::Status::OK();
}

Expand Down Expand Up @@ -1047,12 +1068,14 @@ butil::Status KvControl::KvCompactApply(const std::string &key,
// if new_kv_index has no generations, delete it
// else put new_kv_index
if (new_kv_index.generations_size() == 0) {
DINGO_LOG(INFO) << "KvCompactApply new_kv_index has no generations, delete it, key: " << key << "("
<< Helper::StringToHex(key) << ")";
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvCompactApply new_kv_index has no generations, delete it, key: " << key << "(" << Helper::StringToHex(key)
<< ")";
DeleteRawKvIndex(key);
} else {
DINGO_LOG(INFO) << "KvCompactApply new_kv_index has generations, put it, key: " << key << "("
<< Helper::StringToHex(key) << "), new_kv_index: " << new_kv_index.ShortDebugString();
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_kv)
<< "KvCompactApply new_kv_index has generations, put it, key: " << key << "(" << Helper::StringToHex(key)
<< "), new_kv_index: " << new_kv_index.ShortDebugString();
PutRawKvIndex(key, new_kv_index);
}

Expand Down
6 changes: 4 additions & 2 deletions src/coordinator/kv_control_lease.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ DEFINE_int64(version_lease_min_ttl_seconds, 3, "min ttl seconds for version leas
DEFINE_int64(version_lease_max_count, 50000, "max lease count");
DEFINE_int64(version_lease_print_ttl_remaining_seconds, 10, "print ttl remaining seconds if value is less than this");

DEFINE_bool(dingo_log_switch_coor_lease, false, "switch for dingo log of kv control lease");

butil::Status KvControl::LeaseGrant(int64_t lease_id, int64_t ttl_seconds, int64_t &granted_id,
int64_t &granted_ttl_seconds,
pb::coordinator_internal::MetaIncrement &meta_increment) {
Expand Down Expand Up @@ -209,7 +211,7 @@ butil::Status KvControl::LeaseQuery(int64_t lease_id, bool get_keys, int64_t &gr

auto it = lease_to_key_map_temp_.find(lease_id);
if (it == lease_to_key_map_temp_.end()) {
DINGO_LOG(INFO) << "lease id " << lease_id << " not found";
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_lease) << "lease id " << lease_id << " not found";
return butil::Status(pb::error::Errno::ELEASE_NOT_EXISTS_OR_EXPIRED, "lease id %lu not found", lease_id);
}

Expand All @@ -229,7 +231,7 @@ butil::Status KvControl::LeaseQuery(int64_t lease_id, bool get_keys, int64_t &gr
}

void KvControl::LeaseTask() {
DINGO_LOG(INFO) << "lease task start";
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_coor_lease) << "lease task start";

std::vector<int64_t> lease_ids_to_revoke;
pb::coordinator_internal::MetaIncrement meta_increment;
Expand Down
Loading

0 comments on commit 379941c

Please sign in to comment.