Skip to content

Commit

Permalink
[fix][store] Fixup memory leak issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
rock-git authored and ketor committed Oct 9, 2023
1 parent c83ab51 commit aad3a50
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 188 deletions.
46 changes: 32 additions & 14 deletions src/common/runnable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,33 @@

#include "common/runnable.h"

#include <cstdint>
#include <memory>

#include "common/helper.h"
#include "common/logging.h"
#include "fmt/core.h"

namespace dingodb {

int ExecuteRoutine(void*, bthread::TaskIterator<TaskRunnable*>& iter) {
if (iter.is_queue_stopped()) {
return 0;
}
TaskRunnable::TaskRunnable() { DINGO_LOG(INFO) << "new exec task..."; }
TaskRunnable::~TaskRunnable() { DINGO_LOG(INFO) << "delete exec task..."; }

{
std::unique_ptr<TaskRunnable> self_guard(*iter);
for (; iter; ++iter) {
(*iter)->Run();
int ExecuteRoutine(void*, bthread::TaskIterator<TaskRunnablePtr>& iter) { // NOLINT
for (; iter; ++iter) {
if (iter.is_queue_stopped()) {
DINGO_LOG(INFO) << fmt::format("[execqueue][type({})] task is stopped.", (*iter)->Type());
continue;
}
if (*iter == nullptr) {
DINGO_LOG(WARNING) << fmt::format("[execqueue][type({})] task is nullptr.", (*iter)->Type());
continue;
}

uint64_t start_time = Helper::TimestampMs();
(*iter)->Run();
DINGO_LOG(INFO) << fmt::format("[execqueue][type({})] run task elapsed time {}(ms).", (*iter)->Type(),
Helper::TimestampMs() - start_time);
}

return 0;
Expand All @@ -38,7 +51,7 @@ bool Worker::Init() {
options.bthread_attr = BTHREAD_ATTR_NORMAL;

if (bthread::execution_queue_start(&queue_id_, &options, ExecuteRoutine, nullptr) != 0) {
DINGO_LOG(ERROR) << "Start worker execution queue failed";
DINGO_LOG(ERROR) << "[execqueue] start worker execution queue failed";
return false;
}

Expand All @@ -51,23 +64,28 @@ void Worker::Destroy() {
is_available_.store(false, std::memory_order_relaxed);

if (bthread::execution_queue_stop(queue_id_) != 0) {
DINGO_LOG(ERROR) << "Worker execution queue stop failed";
DINGO_LOG(ERROR) << "[execqueue] worker execution queue stop failed";
return;
}

if (bthread::execution_queue_join(queue_id_) != 0) {
DINGO_LOG(ERROR) << "Worker execution queue join failed";
DINGO_LOG(ERROR) << "[execqueue] worker execution queue join failed";
}
}

bool Worker::Execute(TaskRunnable* task) {
bool Worker::Execute(TaskRunnablePtr task) {
if (task == nullptr) {
DINGO_LOG(ERROR) << fmt::format("[execqueue][type({})] task is nullptr.", task->Type());
return false;
}

if (!is_available_.load(std::memory_order_relaxed)) {
DINGO_LOG(ERROR) << "Worker execute queue is not available.";
DINGO_LOG(ERROR) << fmt::format("[execqueue][type({})] worker execute queue is not available.", task->Type());
return false;
}

if (bthread::execution_queue_execute(queue_id_, task) != 0) {
DINGO_LOG(ERROR) << "Worker execution queue execute failed";
DINGO_LOG(ERROR) << fmt::format("[execqueue][type({})] worker execution queue execute failed", task->Type());
return false;
}

Expand Down
15 changes: 10 additions & 5 deletions src/common/runnable.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,25 @@
#define DINGODB_COMMON_RUNNABLE_H_

#include <memory>
#include <string>

#include "bthread/execution_queue.h"

namespace dingodb {

class TaskRunnable {
public:
TaskRunnable() = default;
virtual ~TaskRunnable() = default;
TaskRunnable();
virtual ~TaskRunnable();

virtual std::string Type() = 0;

virtual void Run() = 0;
};

int ExecuteRoutine(void*, bthread::TaskIterator<TaskRunnable*>& iter);
using TaskRunnablePtr = std::shared_ptr<TaskRunnable>;

int ExecuteRoutine(void*, bthread::TaskIterator<TaskRunnablePtr>& iter);

// Run task worker
class Worker {
Expand All @@ -42,12 +47,12 @@ class Worker {
bool Init();
void Destroy();

bool Execute(TaskRunnable* task);
bool Execute(TaskRunnablePtr task);

private:
// Execution queue is available.
std::atomic<bool> is_available_;
bthread::ExecutionQueueId<TaskRunnable*> queue_id_; // NOLINT
bthread::ExecutionQueueId<TaskRunnablePtr> queue_id_; // NOLINT
};

using WorkerPtr = std::shared_ptr<Worker>;
Expand Down
13 changes: 6 additions & 7 deletions src/split/split_checker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ void SplitCheckWorkers::Destroy() {
}
}

bool SplitCheckWorkers::Execute(TaskRunnable* task) {
bool SplitCheckWorkers::Execute(TaskRunnablePtr task) {
if (!workers_[offset_]->Execute(task)) {
return false;
}
Expand Down Expand Up @@ -395,7 +395,8 @@ void PreSplitCheckTask::PreSplitCheck() {
continue;
}

if (split_check_workers_->Execute(new SplitCheckTask(split_check_workers_, region, region_metric, split_checker))) {
auto task = std::make_shared<SplitCheckTask>(split_check_workers_, region, region_metric, split_checker);
if (split_check_workers_->Execute(task)) {
split_check_workers_->AddRegionChecking(region->Id());
}
}
Expand All @@ -418,14 +419,12 @@ void PreSplitChecker::Destroy() {
split_check_workers_->Destroy();
}

bool PreSplitChecker::Execute(TaskRunnable* task) { return worker_->Execute(task); }
bool PreSplitChecker::Execute(TaskRunnablePtr task) { return worker_->Execute(task); }

void PreSplitChecker::TriggerPreSplitCheck(void*) {
// Free at ExecuteRoutine()
TaskRunnable* task = new PreSplitCheckTask(Server::GetInstance()->GetPreSplitChecker()->GetSplitCheckWorkers());
if (!Server::GetInstance()->GetPreSplitChecker()->Execute(task)) {
delete task;
}
auto task = std::make_shared<PreSplitCheckTask>(Server::GetInstance()->GetPreSplitChecker()->GetSplitCheckWorkers());
Server::GetInstance()->GetPreSplitChecker()->Execute(task);
}

} // namespace dingodb
14 changes: 9 additions & 5 deletions src/split/split_checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class SplitCheckWorkers {
bool Init(uint32_t num);
void Destroy();

bool Execute(TaskRunnable* task);
bool Execute(TaskRunnablePtr task);

bool IsExistRegionChecking(uint64_t region_id);
void AddRegionChecking(uint64_t region_id);
Expand All @@ -145,7 +145,7 @@ class SplitCheckWorkers {

// Indicate workers offset for round-robin.
uint32_t offset_;
std::vector<std::shared_ptr<Worker>> workers_;
std::vector<WorkerPtr> workers_;
};

// Check region whether need to split.
Expand All @@ -159,6 +159,8 @@ class SplitCheckTask : public TaskRunnable {
split_checker_(split_checker) {}
~SplitCheckTask() override = default;

std::string Type() override { return "SPLIT_CHECK"; }

void Run() override {
SplitCheck();
if (region_ != nullptr && split_check_workers_ != nullptr) {
Expand All @@ -182,6 +184,8 @@ class PreSplitCheckTask : public TaskRunnable {
: split_check_workers_(split_check_workers) {}
~PreSplitCheckTask() override = default;

std::string Type() override { return "PRE_SPLIT_CHECK"; }

void Run() override { PreSplitCheck(); }

private:
Expand All @@ -193,7 +197,7 @@ class PreSplitCheckTask : public TaskRunnable {
class PreSplitChecker {
public:
PreSplitChecker() {
worker_ = std::make_shared<Worker>();
worker_ = Worker::New();
split_check_workers_ = std::make_shared<SplitCheckWorkers>();
}
~PreSplitChecker() = default;
Expand All @@ -207,10 +211,10 @@ class PreSplitChecker {
std::shared_ptr<SplitCheckWorkers> GetSplitCheckWorkers() { return split_check_workers_; }

private:
bool Execute(TaskRunnable* task);
bool Execute(TaskRunnablePtr task);

// For pre split check.
std::shared_ptr<Worker> worker_;
WorkerPtr worker_;
// For split check.
std::shared_ptr<SplitCheckWorkers> split_check_workers_;
};
Expand Down
99 changes: 23 additions & 76 deletions src/store/heartbeat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ void CoordinatorTaskListProcessTask::CoordinatorTaskListProcess(
coordinator_control->ProcessTaskList();
}

bool CheckStoreOperationResult(pb::coordinator::RegionCmdType cmd_type, pb::error::Errno errcode) {
bool CheckStoreOperationResult(pb::coordinator::RegionCmdType cmd_type, pb::error::Errno errcode) { // NOLINT
using pb::coordinator::RegionCmdType;
using pb::error::Errno;

Expand Down Expand Up @@ -726,118 +726,65 @@ void VectorIndexScrubTask::ScrubVectorIndex() {
}
}

bool Heartbeat::Init() {
bthread::ExecutionQueueOptions options;
options.bthread_attr = BTHREAD_ATTR_NORMAL;
bool Heartbeat::Init() { return worker_->Init(); }

if (bthread::execution_queue_start(&queue_id_, &options, ExecuteRoutine, nullptr) != 0) {
DINGO_LOG(ERROR) << "Start heartbeat execution queue failed";
return false;
}

is_available_.store(true, std::memory_order_relaxed);

return true;
}

void Heartbeat::Destroy() {
is_available_.store(false, std::memory_order_relaxed);

if (bthread::execution_queue_stop(queue_id_) != 0) {
DINGO_LOG(ERROR) << "heartbeat execution queue stop failed";
return;
}

if (bthread::execution_queue_join(queue_id_) != 0) {
DINGO_LOG(ERROR) << "heartbeat execution queue join failed";
}
}

bool Heartbeat::Execute(TaskRunnable* task) {
if (!is_available_.load(std::memory_order_relaxed)) {
DINGO_LOG(ERROR) << "Heartbeat execute queue is not available.";
return false;
}
void Heartbeat::Destroy() { worker_->Destroy(); }

if (bthread::execution_queue_execute(queue_id_, task) != 0) {
DINGO_LOG(ERROR) << "heartbeat execution queue execute failed";
return false;
}

return true;
}
bool Heartbeat::Execute(TaskRunnablePtr task) { return worker_->Execute(task); }

void Heartbeat::TriggerStoreHeartbeat(std::vector<uint64_t> region_ids, bool is_update_epoch_version) {
// Free at ExecuteRoutine()
TaskRunnable* task =
new HeartbeatTask(Server::GetInstance()->GetCoordinatorInteraction(), region_ids, is_update_epoch_version);
if (!Server::GetInstance()->GetHeartbeat()->Execute(task)) {
delete task;
}
auto task = std::make_shared<HeartbeatTask>(Server::GetInstance()->GetCoordinatorInteraction(), region_ids,
is_update_epoch_version);
Server::GetInstance()->GetHeartbeat()->Execute(task);
}

void Heartbeat::TriggerCoordinatorPushToStore(void*) {
// Free at ExecuteRoutine()
TaskRunnable* task = new CoordinatorPushTask(Server::GetInstance()->GetCoordinatorControl());
if (!Server::GetInstance()->GetHeartbeat()->Execute(task)) {
delete task;
}
auto task = std::make_shared<CoordinatorPushTask>(Server::GetInstance()->GetCoordinatorControl());
Server::GetInstance()->GetHeartbeat()->Execute(task);
}

void Heartbeat::TriggerCoordinatorUpdateState(void*) {
// Free at ExecuteRoutine()
TaskRunnable* task = new CoordinatorUpdateStateTask(Server::GetInstance()->GetCoordinatorControl());
if (!Server::GetInstance()->GetHeartbeat()->Execute(task)) {
delete task;
}
auto task = std::make_shared<CoordinatorUpdateStateTask>(Server::GetInstance()->GetCoordinatorControl());
Server::GetInstance()->GetHeartbeat()->Execute(task);
}

void Heartbeat::TriggerCoordinatorTaskListProcess(void*) {
// Free at ExecuteRoutine()
TaskRunnable* task = new CoordinatorTaskListProcessTask(Server::GetInstance()->GetCoordinatorControl());
if (!Server::GetInstance()->GetHeartbeat()->Execute(task)) {
delete task;
}
auto task = std::make_shared<CoordinatorTaskListProcessTask>(Server::GetInstance()->GetCoordinatorControl());
Server::GetInstance()->GetHeartbeat()->Execute(task);
}

void Heartbeat::TriggerCoordinatorRecycleOrphan(void*) {
// Free at ExecuteRoutine()
TaskRunnable* task = new CoordinatorRecycleOrphanTask(Server::GetInstance()->GetCoordinatorControl());
if (!Server::GetInstance()->GetHeartbeat()->Execute(task)) {
delete task;
}
auto task = std::make_shared<CoordinatorRecycleOrphanTask>(Server::GetInstance()->GetCoordinatorControl());
Server::GetInstance()->GetHeartbeat()->Execute(task);
}

void Heartbeat::TriggerCalculateTableMetrics(void*) {
// Free at ExecuteRoutine()
TaskRunnable* task = new CalculateTableMetricsTask(Server::GetInstance()->GetCoordinatorControl());
if (!Server::GetInstance()->GetHeartbeat()->Execute(task)) {
delete task;
}
auto task = std::make_shared<CalculateTableMetricsTask>(Server::GetInstance()->GetCoordinatorControl());
Server::GetInstance()->GetHeartbeat()->Execute(task);
}

void Heartbeat::TriggerLeaseTask(void*) {
// Free at ExecuteRoutine()
TaskRunnable* task = new LeaseTask(Server::GetInstance()->GetCoordinatorControl());
if (!Server::GetInstance()->GetHeartbeat()->Execute(task)) {
delete task;
}
auto task = std::make_shared<LeaseTask>(Server::GetInstance()->GetCoordinatorControl());
Server::GetInstance()->GetHeartbeat()->Execute(task);
}

void Heartbeat::TriggerCompactionTask(void*) {
// Free at ExecuteRoutine()
TaskRunnable* task = new CompactionTask(Server::GetInstance()->GetCoordinatorControl());
if (!Server::GetInstance()->GetHeartbeat()->Execute(task)) {
delete task;
}
auto task = std::make_shared<CompactionTask>(Server::GetInstance()->GetCoordinatorControl());
Server::GetInstance()->GetHeartbeat()->Execute(task);
}

void Heartbeat::TriggerScrubVectorIndex(void*) {
// Free at ExecuteRoutine()
TaskRunnable* task = new VectorIndexScrubTask();
if (!Server::GetInstance()->GetHeartbeat()->Execute(task)) {
delete task;
}
auto task = std::make_shared<VectorIndexScrubTask>();
Server::GetInstance()->GetHeartbeat()->Execute(task);
}

butil::Status Heartbeat::RpcSendPushStoreOperation(const pb::common::Location& location,
Expand Down
Loading

0 comments on commit aad3a50

Please sign in to comment.