From 39c7420fcb33db453d5bebaa601c3dfa76136ead Mon Sep 17 00:00:00 2001 From: Ketor Date: Fri, 15 Mar 2024 22:37:58 +0800 Subject: [PATCH] [fix][common] Implement pthread PriorWorkerSet. Signed-off-by: Ketor --- conf/coordinator-gflags.conf | 1 + conf/coordinator.template.yaml | 3 +- conf/index-gflags.conf | 1 + conf/index.template.yaml | 10 +- conf/store-gflags.conf | 1 + conf/store.template.yaml | 12 +- src/common/runnable.cc | 58 ++--- src/common/runnable.h | 14 +- src/common/threadpool.cc | 5 +- src/engine/bdb_raw_engine.cc | 6 +- src/engine/bdb_raw_engine.h | 1 - src/raft/store_state_machine.cc | 8 +- src/server/main.cc | 392 ++++++++++++++++++------------ test/unit_test/test_worker_set.cc | 6 +- 14 files changed, 309 insertions(+), 209 deletions(-) diff --git a/conf/coordinator-gflags.conf b/conf/coordinator-gflags.conf index 24a701f5b..f32375f1c 100644 --- a/conf/coordinator-gflags.conf +++ b/conf/coordinator-gflags.conf @@ -15,3 +15,4 @@ -dingo_log_switch_coor_watch=true -dingo_log_switch_coor_lease=true -default_replica_num=3 +-use_pthread_prior_worker_set=true diff --git a/conf/coordinator.template.yaml b/conf/coordinator.template.yaml index 9c075ac22..506814c72 100644 --- a/conf/coordinator.template.yaml +++ b/conf/coordinator.template.yaml @@ -5,8 +5,9 @@ cluster: server: host: $SERVER_HOST$ port: $SERVER_PORT$ - worker_thread_num: 128 # must >4, worker_thread_num priority worker_thread_ratio + # worker_thread_num: 128 # must >4, worker_thread_num priority worker_thread_ratio # worker_thread_ratio: 1.0 # cpu core * ratio + brpc_common_worker_num: 32 # must > 4, the totol bthread_concurrency of brpc is sum of all other worker_num + brpc_common_worker_num coordinator_service_worker_num: 32 # must < server.worker_thread_num coordinator_service_worker_max_pending_num: 1024 # 0 is unlimited meta_service_worker_num: 32 # must < server.worker_thread_num diff --git a/conf/index-gflags.conf b/conf/index-gflags.conf index 4d5e85486..1f44b1233 100644 --- a/conf/index-gflags.conf +++ b/conf/index-gflags.conf @@ -24,3 +24,4 @@ -enable_dir_service=false -enable_threads_service=false -dingo_log_switch_txn_detail=true +-use_pthread_prior_worker_set=true diff --git a/conf/index.template.yaml b/conf/index.template.yaml index 2051f853f..cca13af1f 100644 --- a/conf/index.template.yaml +++ b/conf/index.template.yaml @@ -9,16 +9,14 @@ server: metrics_collect_interval_s: 300 approximate_size_metrics_collect_interval_s: 300 scrub_vector_index_interval_s: 60 - # for vector index, there is a limit: - # read_worker_num + write_worker_num + raft_apply_worker_num + background_thread_num + fast_background_thread_num - # must < server.worker_thread_num - worker_thread_num: 128 # must >4, worker_thread_num priority worker_thread_ratio + # worker_thread_num: 128 # must >4, worker_thread_num priority worker_thread_ratio # worker_thread_ratio: 2 # cpu core * ratio + brpc_common_worker_num: 32 # must > 4, the totol bthread_concurrency of brpc is sum of all other worker_num + brpc_common_worker_num read_worker_num: 32 # the number of read worker used by index_service read_worker_max_pending_num: 1024 # 0 is unlimited - write_worker_num: 24 # the number of write worker used by index_service + write_worker_num: 32 # the number of write worker used by index_service write_worker_max_pending_num: 1024 # 0 is unlimited - raft_apply_worker_num: 24 # the number of raft apply worker used by store_state_machine + raft_apply_worker_num: 16 # the number of raft apply worker used by store_state_machine raft_apply_worker_max_pending_num: 1024 # 0 is unlimited region: region_max_size: 536870912 # 512MB diff --git a/conf/store-gflags.conf b/conf/store-gflags.conf index 9c560a12d..56b77d12c 100644 --- a/conf/store-gflags.conf +++ b/conf/store-gflags.conf @@ -18,3 +18,4 @@ -enable_dir_service=false -enable_threads_service=false -dingo_log_switch_txn_detail=true +-use_pthread_prior_worker_set=true diff --git a/conf/store.template.yaml b/conf/store.template.yaml index bff69fe9e..7cd2687bf 100644 --- a/conf/store.template.yaml +++ b/conf/store.template.yaml @@ -8,17 +8,15 @@ server: heartbeat_interval_s: 6 metrics_collect_interval_s: 300 approximate_size_metrics_collect_interval_s: 300 - # for store, there is a limit: - # read_worker_num + write_worker_num + raft_apply_worker_num - # must < server.worker_thread_num - worker_thread_num: 128 # must >4, worker_thread_num priority worker_thread_ratio + # worker_thread_num: 128 # must >4, worker_thread_num priority worker_thread_ratio # worker_thread_ratio: 1 # cpu core * ratio - read_worker_num: 48 # # the number of read worker used by store_service + brpc_common_worker_num: 32 # must > 4, the totol bthread_concurrency of brpc is sum of all other worker_num + brpc_common_worker_num + read_worker_num: 32 # # the number of read worker used by store_service read_worker_max_pending_num: 1024 # 0 is unlimited write_worker_num: 32 # the number of write worker used by store_service write_worker_max_pending_num: 1024 # 0 is unlimited - raft_apply_worker_num: 32 # the number of raft apply worker used by store_state_machine - raft_apply_worker_max_pending_num: 1024 # 0 is unlimited + raft_apply_worker_num: 16 # the number of raft apply worker used by store_state_machine + raft_apply_worker_max_pending_num: 1024 # 0 is unlimited, this is a soft limit for raft_apply, but hard limit for client write region: region_max_size: 268435456 # 256MB enable_auto_split: true diff --git a/src/common/runnable.cc b/src/common/runnable.cc index c1975eb46..f8f3fe46a 100644 --- a/src/common/runnable.cc +++ b/src/common/runnable.cc @@ -16,6 +16,7 @@ #include #include +#include #include "butil/compiler_specific.h" #include "common/helper.h" @@ -291,9 +292,10 @@ std::vector> WorkerSet::GetPendingTaskTrace() { return traces; } -PriorWorkerSet::PriorWorkerSet(std::string name, uint32_t worker_num, int64_t max_pending_task_count) +PriorWorkerSet::PriorWorkerSet(std::string name, uint32_t worker_num, int64_t max_pending_task_count, bool use_pthread) : name_(name), worker_num_(worker_num), + use_pthread_(use_pthread), max_pending_task_count_(max_pending_task_count), total_task_count_metrics_(fmt::format("dingo_prior_worker_set_{}_total_task_count", name)), pending_task_count_metrics_(fmt::format("dingo_prior_worker_set_{}_pending_task_count", name)), @@ -309,11 +311,13 @@ PriorWorkerSet::~PriorWorkerSet() { } bool PriorWorkerSet::Init() { - for (uint32_t i = 0; i < worker_num_; ++i) { - workers_.push_back(Bthread()); - } + uint32_t i = 0; + + auto worker_function = [this, &i]() { + if (use_pthread_) { + pthread_setname_np(pthread_self(), (name_ + ":" + std::to_string(i)).c_str()); + } - auto worker_function = [this]() { while (true) { bthread_mutex_lock(&mutex_); while (pending_task_count_.load(std::memory_order_relaxed) == 0) { @@ -333,7 +337,7 @@ bool PriorWorkerSet::Init() { bthread_mutex_unlock(&mutex_); - if (task != nullptr) { + if (BAIDU_UNLIKELY(task != nullptr)) { task->Run(); queue_run_metrics_ << Helper::TimestampUs() - now_time_us; DecPendingTaskCount(); @@ -342,16 +346,28 @@ bool PriorWorkerSet::Init() { } }; - for (auto& bthread : workers_) { - bthread.Run(worker_function); + if (use_pthread_) { + for (i = 0; i < worker_num_; ++i) { + pthread_workers_.push_back(std::thread(worker_function)); + } + } else { + for (i = 0; i < worker_num_; ++i) { + bthread_workers_.push_back(Bthread(worker_function)); + } } return true; } void PriorWorkerSet::Destroy() { - for (const auto& bthread : workers_) { - bthread.Join(); + if (use_pthread_) { + for (auto& std_thread : pthread_workers_) { + std_thread.join(); + } + } else { + for (auto& bthread : bthread_workers_) { + bthread.Join(); + } } } @@ -367,24 +383,10 @@ bool PriorWorkerSet::Execute(TaskRunnablePtr task) { IncPendingTaskCount(); IncTotalTaskCount(); - // if the pending task count is less than the worker number, execute the task directly - // else push the task to the task queue - // the total count of pending task will be decreased in the worker function - // and the total concurrency is limited by the worker number - if (pend_task_count < worker_num_) { - int64_t now_time_us = Helper::TimestampUs(); - task->Run(); - queue_run_metrics_ << Helper::TimestampUs() - now_time_us; - - DecPendingTaskCount(); - Notify(WorkerEventType::kFinishTask); - - } else { - bthread_mutex_lock(&mutex_); - tasks_.push(task); - bthread_cond_signal(&cond_); - bthread_mutex_unlock(&mutex_); - } + bthread_mutex_lock(&mutex_); + tasks_.push(task); + bthread_cond_signal(&cond_); + bthread_mutex_unlock(&mutex_); return true; } diff --git a/src/common/runnable.h b/src/common/runnable.h index f163cac31..e3e9cbc84 100644 --- a/src/common/runnable.h +++ b/src/common/runnable.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include "bthread/execution_queue.h" @@ -66,7 +67,7 @@ using TaskRunnablePtr = std::shared_ptr; // Custom Comparator for priority_queue struct CompareTaskRunnable { - bool operator()(const TaskRunnablePtr& lhs, TaskRunnablePtr& rhs) const { return lhs < rhs; } + bool operator()(const TaskRunnablePtr& lhs, TaskRunnablePtr& rhs) const { return lhs.get() < rhs.get(); } }; int ExecuteRoutine(void*, bthread::TaskIterator& iter); @@ -171,11 +172,12 @@ using WorkerSetPtr = std::shared_ptr; class PriorWorkerSet { public: - PriorWorkerSet(std::string name, uint32_t worker_num, int64_t max_pending_task_count); + PriorWorkerSet(std::string name, uint32_t worker_num, int64_t max_pending_task_count, bool use_pthread); ~PriorWorkerSet(); - static std::shared_ptr New(std::string name, uint32_t worker_num, uint32_t max_pending_task_count) { - return std::make_shared(name, worker_num, max_pending_task_count); + static std::shared_ptr New(std::string name, uint32_t worker_num, uint32_t max_pending_task_count, + bool use_pthead) { + return std::make_shared(name, worker_num, max_pending_task_count, use_pthead); } bool Init(); @@ -206,7 +208,9 @@ class PriorWorkerSet { bthread_cond_t cond_; std::priority_queue, CompareTaskRunnable> tasks_; - std::vector workers_; + bool use_pthread_; + std::vector bthread_workers_; + std::vector pthread_workers_; int64_t max_pending_task_count_; uint32_t worker_num_; diff --git a/src/common/threadpool.cc b/src/common/threadpool.cc index 76de4bb16..72f77cba4 100644 --- a/src/common/threadpool.cc +++ b/src/common/threadpool.cc @@ -15,6 +15,7 @@ #include "common/threadpool.h" #include +#include #include "fmt/core.h" @@ -26,8 +27,8 @@ ThreadPool::ThreadPool(const std::string &thread_name, uint32_t thread_num) total_task_count_metrics_(fmt::format("dingo_threadpool_{}_total_task_count", thread_name)), pending_task_count_metrics_(fmt::format("dingo_threadpool_{}_pending_task_count", thread_name)) { for (size_t i = 0; i < thread_num; ++i) - workers_.emplace_back([this] { - pthread_setname_np(pthread_self(), this->thread_name_.c_str()); + workers_.emplace_back([this, &i] { + pthread_setname_np(pthread_self(), (thread_name_ + ":" + std::to_string(i)).c_str()); for (;;) { TaskPtr task; diff --git a/src/engine/bdb_raw_engine.cc b/src/engine/bdb_raw_engine.cc index de3ab8c09..679730c38 100644 --- a/src/engine/bdb_raw_engine.cc +++ b/src/engine/bdb_raw_engine.cc @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -2113,7 +2114,10 @@ bool BdbRawEngine::Init(std::shared_ptr config, const std::vector(GetSelfPtr()); DINGO_LOG(INFO) << fmt::format("[bdb] db path: {}", db_path_); - std::thread checkpoint_thread([this]() { bdb::BdbHelper::CheckpointThread(envp_, db_, is_close_); }); + std::thread checkpoint_thread([this]() { + pthread_setname_np(pthread_self(), "bdb_chkpt"); + bdb::BdbHelper::CheckpointThread(envp_, db_, is_close_); + }); checkpoint_thread.detach(); DINGO_LOG(INFO) << fmt::format("[bdb] db path: {}", db_path_); diff --git a/src/engine/bdb_raw_engine.h b/src/engine/bdb_raw_engine.h index 480fa3a83..8dc581f31 100644 --- a/src/engine/bdb_raw_engine.h +++ b/src/engine/bdb_raw_engine.h @@ -20,7 +20,6 @@ #include #include #include -#include #include #include diff --git a/src/raft/store_state_machine.cc b/src/raft/store_state_machine.cc index d43a6f7ad..5f79caacb 100644 --- a/src/raft/store_state_machine.cc +++ b/src/raft/store_state_machine.cc @@ -129,9 +129,9 @@ void StoreStateMachine::on_apply(braft::Iterator& iter) { bool need_apply = true; // Check region state auto region_state = region_->State(); - if (region_state == pb::common::StoreRegionState::DELETING || - region_state == pb::common::StoreRegionState::DELETED || - region_state == pb::common::StoreRegionState::TOMBSTONE) { + if (BAIDU_UNLIKELY(region_state == pb::common::StoreRegionState::DELETING || + region_state == pb::common::StoreRegionState::DELETED || + region_state == pb::common::StoreRegionState::TOMBSTONE)) { std::string s = fmt::format("Region({}) is {} state, abandon apply log", region_->Id(), pb::common::StoreRegionState_Name(region_state)); DINGO_LOG(WARNING) << fmt::format("[raft.sm][region({})] {}", region_->Id(), s); @@ -142,7 +142,7 @@ void StoreStateMachine::on_apply(braft::Iterator& iter) { } // Check region epoch - if (need_apply && !Helper::IsEqualRegionEpoch(raft_cmd->header().epoch(), region_->Epoch())) { + if (BAIDU_UNLIKELY(need_apply && !Helper::IsEqualRegionEpoch(raft_cmd->header().epoch(), region_->Epoch()))) { std::string s = fmt::format("Region({}) epoch is not match, region_epoch({}) raft_cmd_epoch({})", region_->Id(), region_->EpochToString(), Helper::RegionEpochToString(raft_cmd->header().epoch())); DINGO_LOG(WARNING) << fmt::format("[raft.sm][region({})] {}", region_->Id(), s); diff --git a/src/server/main.cc b/src/server/main.cc index b09e3181c..48552f191 100644 --- a/src/server/main.cc +++ b/src/server/main.cc @@ -82,6 +82,9 @@ DEFINE_uint32(h2_server_max_frame_size, 16384, "max frame size"); DEFINE_uint32(h2_server_max_header_list_size, UINT32_MAX, "max header list size"); DEFINE_int32(omp_num_threads, 1, "omp num threads"); + +DEFINE_bool(use_pthread_prior_worker_set, true, "use pthread prior worker set"); +DEFINE_int32(brpc_common_worker_num, 10, "brpc common worker num"); DEFINE_int32(read_worker_num, 10, "read service worker num"); DEFINE_int32(write_worker_num, 10, "write service worker num"); DEFINE_int64(read_worker_max_pending_num, 0, "read service worker num"); @@ -430,99 +433,134 @@ int InitBthreadWorkerThreadNum(std::shared_ptr config) { } } - bthread::FLAGS_bthread_concurrency = num; - - return bthread::FLAGS_bthread_concurrency; -} - -int InitServiceWorkerParameters(std::shared_ptr config, dingodb::pb::common::ClusterRole role) { - // init service_worker_num - int read_worker_num = config->GetInt("server.read_worker_num"); - if (read_worker_num <= 0) { - DINGO_LOG(WARNING) << "server.read_worker_num is not set, use dingodb::FLAGS_read_worker_num"; - } else { - FLAGS_read_worker_num = read_worker_num; + if (num > 4) { + bthread::FLAGS_bthread_concurrency = num; } - if (FLAGS_read_worker_num <= 0) { - DINGO_LOG(ERROR) << "server.read_worker_num is less than 0"; - return -1; - } - DINGO_LOG(INFO) << "server.read_worker_num is set to " << FLAGS_read_worker_num; - int write_worker_num = config->GetInt("server.write_worker_num"); - if (write_worker_num <= 0) { - DINGO_LOG(WARNING) << "server.write_worker_num is not set, use dingodb::FLAGS_write_worker_num"; + int brpc_common_worker_num = config->GetInt("server.brpc_common_worker_num"); + if (brpc_common_worker_num <= 0) { + DINGO_LOG(WARNING) << "server.brpc_common_worker_num is not set, use dingodb::FLAGS_brpc_common_worker_num"; } else { - FLAGS_write_worker_num = write_worker_num; + FLAGS_brpc_common_worker_num = brpc_common_worker_num; } - if (FLAGS_write_worker_num <= 0) { - DINGO_LOG(ERROR) << "server.write_worker_num is less than 0"; + if (FLAGS_brpc_common_worker_num <= 0) { + DINGO_LOG(ERROR) << "server.brpc_common_worker_num is less than 0"; return -1; } - DINGO_LOG(INFO) << "server.write_worker_num is set to " << FLAGS_write_worker_num; + DINGO_LOG(INFO) << "server.brpc_common_worker_num is set to " << FLAGS_brpc_common_worker_num; - // if raft_apply_worker_num is zero, means do not use raft apply worker - int raft_apply_worker_num = config->GetInt("server.raft_apply_worker_num"); - if (raft_apply_worker_num < 0) { - DINGO_LOG(WARNING) << "server.raft_apply_worker_num is not set, use dingodb::FLAGS_raft_apply_worker_num"; - } else { - FLAGS_raft_apply_worker_num = raft_apply_worker_num; - } - if (FLAGS_raft_apply_worker_num < 0) { - DINGO_LOG(ERROR) << "server.raft_apply_worker_num is less than 0"; - return -1; - } - DINGO_LOG(INFO) << "server.raft_apply_worker_num is set to " << FLAGS_raft_apply_worker_num; + return bthread::FLAGS_bthread_concurrency; +} - if (FLAGS_read_worker_num + FLAGS_write_worker_num + FLAGS_raft_apply_worker_num > - bthread::FLAGS_bthread_concurrency) { - DINGO_LOG(ERROR) << "server.read_worker_num[" << FLAGS_read_worker_num << "] + server.write_worker_num[" - << FLAGS_write_worker_num << "] + server.raft_apply_worker_num[" << FLAGS_raft_apply_worker_num - << "] is greater than server.worker_thread_num[" << bthread::FLAGS_bthread_concurrency << "]"; - return -1; +int InitServiceWorkerParameters(std::shared_ptr config, dingodb::pb::common::ClusterRole role) { + // init service_worker_num + { + int read_worker_num = config->GetInt("server.read_worker_num"); + if (read_worker_num <= 0) { + DINGO_LOG(WARNING) << "server.read_worker_num is not set, use dingodb::FLAGS_read_worker_num"; + } else { + FLAGS_read_worker_num = read_worker_num; + } + if (FLAGS_read_worker_num <= 0) { + DINGO_LOG(ERROR) << "server.read_worker_num is less than 0"; + return -1; + } + DINGO_LOG(INFO) << "server.read_worker_num is set to " << FLAGS_read_worker_num; + + int write_worker_num = config->GetInt("server.write_worker_num"); + if (write_worker_num <= 0) { + DINGO_LOG(WARNING) << "server.write_worker_num is not set, use dingodb::FLAGS_write_worker_num"; + } else { + FLAGS_write_worker_num = write_worker_num; + } + if (FLAGS_write_worker_num <= 0) { + DINGO_LOG(ERROR) << "server.write_worker_num is less than 0"; + return -1; + } + DINGO_LOG(INFO) << "server.write_worker_num is set to " << FLAGS_write_worker_num; + + // if raft_apply_worker_num is zero, means do not use raft apply worker + int raft_apply_worker_num = config->GetInt("server.raft_apply_worker_num"); + if (raft_apply_worker_num < 0) { + DINGO_LOG(WARNING) << "server.raft_apply_worker_num is not set, use dingodb::FLAGS_raft_apply_worker_num"; + } else { + FLAGS_raft_apply_worker_num = raft_apply_worker_num; + } + if (FLAGS_raft_apply_worker_num < 0) { + DINGO_LOG(ERROR) << "server.raft_apply_worker_num is less than 0"; + return -1; + } + DINGO_LOG(INFO) << "server.raft_apply_worker_num is set to " << FLAGS_raft_apply_worker_num; } // init max pending num - auto read_max_pending_num = config->GetInt64("server.read_worker_max_pending_num"); - if (read_max_pending_num <= 0) { - DINGO_LOG(WARNING) - << "server.read_worker_max_pending_num is not set, use dingodb::FLAGS_read_worker_max_pending_num"; - } else { - FLAGS_read_worker_max_pending_num = read_max_pending_num; - } - if (FLAGS_read_worker_max_pending_num < 0) { - DINGO_LOG(ERROR) << "server.read_worker_max_pending_num is less than 0"; - return -1; - } - DINGO_LOG(INFO) << "server.read_worker_max_pending_num is set to " << FLAGS_read_worker_max_pending_num; + { + auto read_max_pending_num = config->GetInt64("server.read_worker_max_pending_num"); + if (read_max_pending_num <= 0) { + DINGO_LOG(WARNING) + << "server.read_worker_max_pending_num is not set, use dingodb::FLAGS_read_worker_max_pending_num"; + } else { + FLAGS_read_worker_max_pending_num = read_max_pending_num; + } + if (FLAGS_read_worker_max_pending_num < 0) { + DINGO_LOG(ERROR) << "server.read_worker_max_pending_num is less than 0"; + return -1; + } + DINGO_LOG(INFO) << "server.read_worker_max_pending_num is set to " << FLAGS_read_worker_max_pending_num; - auto write_max_pending_num = config->GetInt64("server.write_worker_max_pending_num"); - if (write_max_pending_num <= 0) { - DINGO_LOG(WARNING) - << "server.write_worker_max_pending_num is not set, use dingodb::FLAGS_write_worker_max_pending_num"; - } else { - FLAGS_write_worker_max_pending_num = write_max_pending_num; - } - if (FLAGS_write_worker_max_pending_num < 0) { - DINGO_LOG(ERROR) << "server.write_worker_max_pending_num is less than 0"; - return -1; + auto write_max_pending_num = config->GetInt64("server.write_worker_max_pending_num"); + if (write_max_pending_num <= 0) { + DINGO_LOG(WARNING) + << "server.write_worker_max_pending_num is not set, use dingodb::FLAGS_write_worker_max_pending_num"; + } else { + FLAGS_write_worker_max_pending_num = write_max_pending_num; + } + if (FLAGS_write_worker_max_pending_num < 0) { + DINGO_LOG(ERROR) << "server.write_worker_max_pending_num is less than 0"; + return -1; + } + DINGO_LOG(INFO) << "server.write_worker_max_pending_num is set to " << FLAGS_write_worker_max_pending_num; + + auto raft_apply_max_pending_num = config->GetInt64("server.raft_apply_worker_max_pending_num"); + if (raft_apply_max_pending_num <= 0) { + DINGO_LOG(WARNING) << "server.raft_apply_worker_max_pending_num is not set, use " + "dingodb::FLAGS_raft_apply_worker_max_pending_num"; + } else { + FLAGS_raft_apply_worker_max_pending_num = raft_apply_max_pending_num; + } + if (FLAGS_raft_apply_worker_max_pending_num < 0) { + DINGO_LOG(ERROR) << "server.raft_apply_worker_max_pending_num is less than 0"; + return -1; + } + DINGO_LOG(INFO) << "server.raft_apply_worker_max_pending_num is set to " << FLAGS_raft_apply_worker_max_pending_num; } - DINGO_LOG(INFO) << "server.write_worker_max_pending_num is set to " << FLAGS_write_worker_max_pending_num; - auto raft_apply_max_pending_num = config->GetInt64("server.raft_apply_worker_max_pending_num"); - if (raft_apply_max_pending_num <= 0) { - DINGO_LOG(WARNING) << "server.raft_apply_worker_max_pending_num is not set, use " - "dingodb::FLAGS_raft_apply_worker_max_pending_num"; + // calc new bthread_concurrency by brpc_common_worker_num + if (role != dingodb::pb::common::ClusterRole::INDEX) { + if (FLAGS_use_pthread_prior_worker_set) { + if (FLAGS_brpc_common_worker_num > bthread::FLAGS_bthread_concurrency) { + bthread::FLAGS_bthread_concurrency = FLAGS_brpc_common_worker_num; + + DINGO_LOG(INFO) << "server.brpc_common_worker_num[" << FLAGS_brpc_common_worker_num + << "] is greater than server.worker_thread_num, bump up to [" + << bthread::FLAGS_bthread_concurrency << "]"; + } + } else { + if (FLAGS_read_worker_num + FLAGS_write_worker_num + FLAGS_raft_apply_worker_num + FLAGS_brpc_common_worker_num > + bthread::FLAGS_bthread_concurrency) { + bthread::FLAGS_bthread_concurrency = + FLAGS_read_worker_num + FLAGS_write_worker_num + FLAGS_raft_apply_worker_num + FLAGS_brpc_common_worker_num; + + DINGO_LOG(INFO) << "server.read_worker_num[" << FLAGS_read_worker_num << "] + server.write_worker_num[" + << FLAGS_write_worker_num << "] + server.raft_apply_worker_num[" << FLAGS_raft_apply_worker_num + << "] + server.brpc_common_worker_num[" << FLAGS_brpc_common_worker_num + << "] is greater than server.worker_thread_num, bump up to [" + << bthread::FLAGS_bthread_concurrency << "]"; + } + } } else { - FLAGS_raft_apply_worker_max_pending_num = raft_apply_max_pending_num; - } - if (FLAGS_raft_apply_worker_max_pending_num < 0) { - DINGO_LOG(ERROR) << "server.raft_apply_worker_max_pending_num is less than 0"; - return -1; - } - DINGO_LOG(INFO) << "server.raft_apply_worker_max_pending_num is set to " << FLAGS_raft_apply_worker_max_pending_num; + // This is VectorIndex process, need to calc vector background worker num - if (role == dingodb::pb::common::ClusterRole::INDEX) { // init vector index manager background worker num auto vector_background_worker_num = config->GetInt("vector.background_worker_num"); if (vector_background_worker_num <= 0) { @@ -541,15 +579,37 @@ int InitServiceWorkerParameters(std::shared_ptr config, dingodb } dingodb::FLAGS_vector_fast_background_worker_num = vector_fast_background_worker_num; - if (FLAGS_read_worker_num + FLAGS_write_worker_num + FLAGS_raft_apply_worker_num + - dingodb::FLAGS_vector_fast_background_worker_num + dingodb::FLAGS_vector_background_worker_num > - bthread::FLAGS_bthread_concurrency) { - DINGO_LOG(ERROR) << "server.read_worker_num[" << FLAGS_read_worker_num << "] + server.write_worker_num[" - << FLAGS_write_worker_num << "] + server.raft_apply_worker_num[" << FLAGS_raft_apply_worker_num - << "] + vector.fast_background_worker_num[" << dingodb::FLAGS_vector_fast_background_worker_num - << "] + vector.background_worker_num[" << dingodb::FLAGS_vector_background_worker_num - << "] is greater than server.worker_thread_num[" << bthread::FLAGS_bthread_concurrency << "]"; - return -1; + if (FLAGS_use_pthread_prior_worker_set) { + if (dingodb::FLAGS_vector_fast_background_worker_num + dingodb::FLAGS_vector_background_worker_num + + FLAGS_brpc_common_worker_num > + bthread::FLAGS_bthread_concurrency) { + bthread::FLAGS_bthread_concurrency = dingodb::FLAGS_vector_fast_background_worker_num + + dingodb::FLAGS_vector_background_worker_num + FLAGS_brpc_common_worker_num; + + DINGO_LOG(ERROR) << "vector.fast_background_worker_num[" << dingodb::FLAGS_vector_fast_background_worker_num + << "] + vector.background_worker_num[" << dingodb::FLAGS_vector_background_worker_num + << "] + server.brpc_common_worker_num[" << FLAGS_brpc_common_worker_num + << "] is greater than server.worker_thread_num, bump up to [" + << bthread::FLAGS_bthread_concurrency << "]"; + } + } else { + if (FLAGS_read_worker_num + FLAGS_write_worker_num + FLAGS_raft_apply_worker_num + + dingodb::FLAGS_vector_fast_background_worker_num + dingodb::FLAGS_vector_background_worker_num + + FLAGS_brpc_common_worker_num > + bthread::FLAGS_bthread_concurrency) { + bthread::FLAGS_bthread_concurrency = FLAGS_read_worker_num + FLAGS_write_worker_num + + FLAGS_raft_apply_worker_num + + dingodb::FLAGS_vector_fast_background_worker_num + + dingodb::FLAGS_vector_background_worker_num + FLAGS_brpc_common_worker_num; + + DINGO_LOG(ERROR) << "server.read_worker_num[" << FLAGS_read_worker_num << "] + server.write_worker_num[" + << FLAGS_write_worker_num << "] + server.raft_apply_worker_num[" << FLAGS_raft_apply_worker_num + << "] + vector.fast_background_worker_num[" << dingodb::FLAGS_vector_fast_background_worker_num + << "] + vector.background_worker_num[" << dingodb::FLAGS_vector_background_worker_num + << "] + server.brpc_common_worker_num[" << FLAGS_brpc_common_worker_num + << "] is greater than server.worker_thread_num, bump up to [" + << bthread::FLAGS_bthread_concurrency << "]"; + } } auto vector_max_background_task_count = config->GetInt("vector.max_background_task_count"); @@ -575,12 +635,6 @@ int InitCoordinatorServiceWorkerParameters(std::shared_ptr conf } DINGO_LOG(INFO) << "server.coordinator_service_worker_num is set to " << FLAGS_coordinator_service_worker_num; - if (FLAGS_coordinator_service_worker_num > bthread::FLAGS_bthread_concurrency) { - DINGO_LOG(ERROR) << "server.coordinator_service_worker_num[" << FLAGS_coordinator_service_worker_num - << "] is greater than server.worker_thread_num[" << bthread::FLAGS_bthread_concurrency << "]"; - return -1; - } - auto coor_max_pending_num = config->GetInt64("server.coordinator_service_worker_max_pending_num"); if (coor_max_pending_num <= 0) { DINGO_LOG(WARNING) << "server.coordinator_service_worker_max_pending_num is not set, use " @@ -599,13 +653,6 @@ int InitCoordinatorServiceWorkerParameters(std::shared_ptr conf } DINGO_LOG(INFO) << "server.meta_service_worker_num is set to " << FLAGS_meta_service_worker_num; - // meta num - if (FLAGS_meta_service_worker_num > bthread::FLAGS_bthread_concurrency) { - DINGO_LOG(ERROR) << "server.meta_service_worker_num[" << FLAGS_meta_service_worker_num - << "] is greater than server.worker_thread_num[" << bthread::FLAGS_bthread_concurrency << "]"; - return -1; - } - auto meta_max_pending_num = config->GetInt64("server.meta_service_worker_max_pending_num"); if (meta_max_pending_num <= 0) { DINGO_LOG(WARNING) << "server.meta_service_worker_max_pending_num is not set, use " @@ -625,12 +672,6 @@ int InitCoordinatorServiceWorkerParameters(std::shared_ptr conf } DINGO_LOG(INFO) << "server.version_service_worker_num is set to " << FLAGS_version_service_worker_num; - if (FLAGS_version_service_worker_num > bthread::FLAGS_bthread_concurrency) { - DINGO_LOG(ERROR) << "server.version_service_worker_num[" << FLAGS_version_service_worker_num - << "] is greater than server.worker_thread_num[" << bthread::FLAGS_bthread_concurrency << "]"; - return -1; - } - auto version_max_pending_num = config->GetInt64("server.version_service_worker_max_pending_num"); if (version_max_pending_num <= 0) { DINGO_LOG(WARNING) << "server.version_service_worker_max_pending_num is not set, use " @@ -641,6 +682,27 @@ int InitCoordinatorServiceWorkerParameters(std::shared_ptr conf DINGO_LOG(INFO) << "server.version_service_worker_max_pending_num is set to " << FLAGS_version_service_worker_max_pending_num; + if (FLAGS_use_pthread_prior_worker_set) { + if (FLAGS_brpc_common_worker_num >= bthread::FLAGS_bthread_concurrency) { + bthread::FLAGS_bthread_concurrency = FLAGS_brpc_common_worker_num; + DINGO_LOG(INFO) << "service_worker_num[" << FLAGS_brpc_common_worker_num + << "] is greater than worker_thread_num, bump up to [" << bthread::FLAGS_bthread_concurrency + << "]"; + } + } else { + if (FLAGS_coordinator_service_worker_num + FLAGS_meta_service_worker_num + FLAGS_version_service_worker_num + + FLAGS_brpc_common_worker_num >= + bthread::FLAGS_bthread_concurrency) { + bthread::FLAGS_bthread_concurrency = FLAGS_coordinator_service_worker_num + FLAGS_meta_service_worker_num + + FLAGS_version_service_worker_num + FLAGS_brpc_common_worker_num; + DINGO_LOG(INFO) << "service_worker_num[" + << FLAGS_coordinator_service_worker_num + FLAGS_meta_service_worker_num + + FLAGS_version_service_worker_num + FLAGS_brpc_common_worker_num + << "] is greater than worker_thread_num, bump up to [" << bthread::FLAGS_bthread_concurrency + << "]"; + } + } + return 0; } @@ -840,26 +902,33 @@ int main(int argc, char *argv[]) { DINGO_LOG(INFO) << "h2_settings.max_frame_size: " << options.h2_settings.max_frame_size; DINGO_LOG(INFO) << "h2_settings.max_header_list_size: " << options.h2_settings.max_header_list_size; - // setup bthread worker thread num into bthread::FLAGS_bthread_concurrency - InitBthreadWorkerThreadNum(config); + if (role == dingodb::pb::common::ClusterRole::COORDINATOR) { + // setup bthread worker thread num into bthread::FLAGS_bthread_concurrency + InitBthreadWorkerThreadNum(config); - // options.num_threads = bthread::FLAGS_bthread_concurrency; + // init coordinator service worker parameters + auto ret2 = InitCoordinatorServiceWorkerParameters(config); + if (ret2 < 0) { + DINGO_LOG(ERROR) << "InitCoordinatorServiceWorkerParameters failed!"; + return -1; + } - DINGO_LOG(INFO) << "bthread worker_thread_num: " << bthread::FLAGS_bthread_concurrency; + if (!dingo_server.InitCoordinatorInteractionForAutoIncrement()) { + DINGO_LOG(ERROR) << "InitCoordinatorInteractionForAutoIncrement failed!"; + return -1; + } - if (brpc_server.AddService(&node_service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { - DINGO_LOG(ERROR) << "Fail to add node service to brpc_server!"; - return -1; - } + options.num_threads = bthread::FLAGS_bthread_concurrency; - if (raft_server.AddService(&node_service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { - DINGO_LOG(ERROR) << "Fail to add node service raft_server!"; - return -1; - } + DINGO_LOG(INFO) << "bthread worker_thread_num: " << bthread::FLAGS_bthread_concurrency; - if (role == dingodb::pb::common::ClusterRole::COORDINATOR) { - if (!dingo_server.InitCoordinatorInteractionForAutoIncrement()) { - DINGO_LOG(ERROR) << "InitCoordinatorInteractionForAutoIncrement failed!"; + if (brpc_server.AddService(&node_service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + DINGO_LOG(ERROR) << "Fail to add node service to brpc_server!"; + return -1; + } + + if (raft_server.AddService(&node_service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + DINGO_LOG(ERROR) << "Fail to add node service raft_server!"; return -1; } @@ -880,13 +949,6 @@ int main(int argc, char *argv[]) { meta_service.SetKvEngine(engine); version_service.SetKvEngine(engine); - // init coordinator service worker parameters - auto ret2 = InitCoordinatorServiceWorkerParameters(config); - if (ret2 < 0) { - DINGO_LOG(ERROR) << "InitCoordinatorServiceWorkerParameters failed!"; - return -1; - } - // get service worker nums auto coordinator_service_worker_num = FLAGS_coordinator_service_worker_num; if (coordinator_service_worker_num < 0) { @@ -901,7 +963,8 @@ int main(int argc, char *argv[]) { } dingodb::PriorWorkerSetPtr coordinator_worker_set = dingodb::PriorWorkerSet::New( - "CoordinatorService", FLAGS_coordinator_service_worker_num, FLAGS_coordinator_service_worker_max_pending_num); + "coor_wkr", FLAGS_coordinator_service_worker_num, FLAGS_coordinator_service_worker_max_pending_num, + FLAGS_use_pthread_prior_worker_set); if (!coordinator_worker_set->Init()) { DINGO_LOG(ERROR) << "Init CoordinatorService PriorWorkerSet failed!"; return -1; @@ -919,8 +982,9 @@ int main(int argc, char *argv[]) { return -1; } - dingodb::PriorWorkerSetPtr meta_worker_set = dingodb::PriorWorkerSet::New( - "MetaService", FLAGS_meta_service_worker_num, FLAGS_meta_service_worker_max_pending_num); + dingodb::PriorWorkerSetPtr meta_worker_set = + dingodb::PriorWorkerSet::New("meta_wkr", FLAGS_meta_service_worker_num, + FLAGS_meta_service_worker_max_pending_num, FLAGS_use_pthread_prior_worker_set); if (!meta_worker_set->Init()) { DINGO_LOG(ERROR) << "Init MetaService PriorWorkerSet failed!"; return -1; @@ -938,22 +1002,14 @@ int main(int argc, char *argv[]) { return -1; } - dingodb::PriorWorkerSetPtr version_worker_set = dingodb::PriorWorkerSet::New( - "VersionService", FLAGS_version_service_worker_num, FLAGS_version_service_worker_max_pending_num); + dingodb::PriorWorkerSetPtr version_worker_set = + dingodb::PriorWorkerSet::New("version_wkr", FLAGS_version_service_worker_num, + FLAGS_version_service_worker_max_pending_num, FLAGS_use_pthread_prior_worker_set); if (!version_worker_set->Init()) { DINGO_LOG(ERROR) << "Init VersionService PriorWorkerSet failed!"; return -1; } - if (FLAGS_coordinator_service_worker_num + FLAGS_meta_service_worker_num + FLAGS_version_service_worker_num >= - bthread::FLAGS_bthread_concurrency) { - DINGO_LOG(ERROR) << "service_worker_num[" - << FLAGS_coordinator_service_worker_num + FLAGS_meta_service_worker_num + - FLAGS_version_service_worker_num - << "] is greater than worker_thread_num[" << bthread::FLAGS_bthread_concurrency << "]"; - return -1; - } - coordinator_service.SetWorkSet(coordinator_worker_set); meta_service.SetWorkSet(meta_worker_set); version_service.SetWorkSet(version_worker_set); @@ -1074,6 +1130,9 @@ int main(int argc, char *argv[]) { DINGO_LOG(INFO) << "Auto Increment region start"; } else if (role == dingodb::pb::common::ClusterRole::STORE) { + // setup bthread worker thread num into bthread::FLAGS_bthread_concurrency + InitBthreadWorkerThreadNum(config); + // init service workers auto ret1 = InitServiceWorkerParameters(config, role); if (ret1 < 0) { @@ -1081,8 +1140,22 @@ int main(int argc, char *argv[]) { return -1; } - dingodb::PriorWorkerSetPtr read_worker_set = - dingodb::PriorWorkerSet::New("StoreServiceRead", FLAGS_read_worker_num, FLAGS_read_worker_max_pending_num); + options.num_threads = bthread::FLAGS_bthread_concurrency; + + DINGO_LOG(INFO) << "bthread worker_thread_num: " << bthread::FLAGS_bthread_concurrency; + + if (brpc_server.AddService(&node_service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + DINGO_LOG(ERROR) << "Fail to add node service to brpc_server!"; + return -1; + } + + if (raft_server.AddService(&node_service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + DINGO_LOG(ERROR) << "Fail to add node service raft_server!"; + return -1; + } + + dingodb::PriorWorkerSetPtr read_worker_set = dingodb::PriorWorkerSet::New( + "read_wkr", FLAGS_read_worker_num, FLAGS_read_worker_max_pending_num, FLAGS_use_pthread_prior_worker_set); if (!read_worker_set->Init()) { DINGO_LOG(ERROR) << "Init StoreServiceRead PriorWorkerSet failed!"; return -1; @@ -1090,8 +1163,8 @@ int main(int argc, char *argv[]) { store_service.SetReadWorkSet(read_worker_set); dingo_server.SetStoreServiceReadWorkerSet(read_worker_set); - dingodb::PriorWorkerSetPtr write_worker_set = - dingodb::PriorWorkerSet::New("StoreServiceWrite", FLAGS_write_worker_num, FLAGS_write_worker_max_pending_num); + dingodb::PriorWorkerSetPtr write_worker_set = dingodb::PriorWorkerSet::New( + "write_wkr", FLAGS_write_worker_num, FLAGS_write_worker_max_pending_num, FLAGS_use_pthread_prior_worker_set); if (!write_worker_set->Init()) { DINGO_LOG(ERROR) << "Init StoreServiceWrite PriorWorkerSet failed!"; return -1; @@ -1101,7 +1174,7 @@ int main(int argc, char *argv[]) { if (FLAGS_raft_apply_worker_num > 0) { dingodb::PriorWorkerSetPtr raft_apply_worker_set = - dingodb::PriorWorkerSet::New("RaftApply", FLAGS_raft_apply_worker_num, 0); + dingodb::PriorWorkerSet::New("apply_wkr", FLAGS_raft_apply_worker_num, 0, FLAGS_use_pthread_prior_worker_set); if (!raft_apply_worker_set->Init()) { DINGO_LOG(ERROR) << "Init RaftApply PriorWorkerSet failed!"; return -1; @@ -1185,6 +1258,9 @@ int main(int argc, char *argv[]) { } DINGO_LOG(INFO) << "Raft server is running on " << raft_server.listen_address(); } else if (role == dingodb::pb::common::ClusterRole::INDEX) { + // setup bthread worker thread num into bthread::FLAGS_bthread_concurrency + InitBthreadWorkerThreadNum(config); + // init service workers auto ret1 = InitServiceWorkerParameters(config, role); if (ret1 < 0) { @@ -1192,8 +1268,22 @@ int main(int argc, char *argv[]) { return -1; } - dingodb::PriorWorkerSetPtr read_worker_set = - dingodb::PriorWorkerSet::New("IndexServiceRead", FLAGS_read_worker_num, FLAGS_read_worker_max_pending_num); + options.num_threads = bthread::FLAGS_bthread_concurrency; + + DINGO_LOG(INFO) << "bthread worker_thread_num: " << bthread::FLAGS_bthread_concurrency; + + if (brpc_server.AddService(&node_service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + DINGO_LOG(ERROR) << "Fail to add node service to brpc_server!"; + return -1; + } + + if (raft_server.AddService(&node_service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + DINGO_LOG(ERROR) << "Fail to add node service raft_server!"; + return -1; + } + + dingodb::PriorWorkerSetPtr read_worker_set = dingodb::PriorWorkerSet::New( + "read_wkr", FLAGS_read_worker_num, FLAGS_read_worker_max_pending_num, FLAGS_use_pthread_prior_worker_set); if (!read_worker_set->Init()) { DINGO_LOG(ERROR) << "Init IndexServiceRead PriorWorkerSet failed!"; return -1; @@ -1202,8 +1292,8 @@ int main(int argc, char *argv[]) { util_service.SetReadWorkSet(read_worker_set); dingo_server.SetIndexServiceReadWorkerSet(read_worker_set); - dingodb::PriorWorkerSetPtr write_worker_set = - dingodb::PriorWorkerSet::New("IndexServiceWrite", FLAGS_write_worker_num, FLAGS_write_worker_max_pending_num); + dingodb::PriorWorkerSetPtr write_worker_set = dingodb::PriorWorkerSet::New( + "write_wkr", FLAGS_write_worker_num, FLAGS_write_worker_max_pending_num, FLAGS_use_pthread_prior_worker_set); if (!write_worker_set->Init()) { DINGO_LOG(ERROR) << "Init IndexServiceWrite PriorWorkerSet failed!"; return -1; @@ -1213,7 +1303,7 @@ int main(int argc, char *argv[]) { if (FLAGS_raft_apply_worker_num > 0) { dingodb::PriorWorkerSetPtr raft_apply_worker_set = - dingodb::PriorWorkerSet::New("RaftApply", FLAGS_raft_apply_worker_num, 0); + dingodb::PriorWorkerSet::New("apply_wkr", FLAGS_raft_apply_worker_num, 0, FLAGS_use_pthread_prior_worker_set); if (!raft_apply_worker_set->Init()) { DINGO_LOG(ERROR) << "Init RaftApply PriorWorkerSet failed!"; return -1; diff --git a/test/unit_test/test_worker_set.cc b/test/unit_test/test_worker_set.cc index 84dc91dc7..79b9e7e20 100644 --- a/test/unit_test/test_worker_set.cc +++ b/test/unit_test/test_worker_set.cc @@ -68,7 +68,7 @@ TEST(DingoWorkerSetTest, init) { ret = test_worker_set->Init(); EXPECT_TRUE(ret); - dingodb::PriorWorkerSetPtr test_prior_worker_set = dingodb::PriorWorkerSet::New("TestPriorWorkerSet", 10, 100); + dingodb::PriorWorkerSetPtr test_prior_worker_set = dingodb::PriorWorkerSet::New("TestPriorWorkerSet", 10, 100, false); ret = test_worker_set->Init(); EXPECT_TRUE(ret); } @@ -126,8 +126,8 @@ TEST(DingoWorkerSetTest, perf) { TEST(DingoWorkerSetTest, perf_prior) { bool ret = false; - dingodb::PriorWorkerSetPtr test_worker_set = - dingodb::PriorWorkerSet::New("TestPriorWorkerSet", FLAGS_worker_set_worker_num, FLAGS_worker_set_max_pending_num); + dingodb::PriorWorkerSetPtr test_worker_set = dingodb::PriorWorkerSet::New( + "TestPriorWorkerSet", FLAGS_worker_set_worker_num, FLAGS_worker_set_max_pending_num, false); ret = test_worker_set->Init(); EXPECT_TRUE(ret);