Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Introduce pipeline back-pressure #3152

Merged
merged 5 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/daily-builds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ jobs:
ctest -V -L DFLY

build-macos:
if: false
runs-on: macos-12
timeout-minutes: 45
steps:
Expand Down
136 changes: 98 additions & 38 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#endif

using namespace std;
using facade::operator""_MB;

ABSL_FLAG(bool, tcp_nodelay, true,
"Configures dragonfly connections with socket option TCP_NODELAY");
Expand All @@ -44,10 +45,13 @@ ABSL_FLAG(string, admin_bind, "",
"If set, the admin consol TCP connection would be bind the given address. "
"This supports both HTTP and RESP protocols");

ABSL_FLAG(uint64_t, request_cache_limit, 1ULL << 26, // 64MB
ABSL_FLAG(uint64_t, request_cache_limit, 64_MB,
"Amount of memory to use for request cache in bytes - per IO thread.");

ABSL_FLAG(uint64_t, subscriber_thread_limit, 1ULL << 27, // 128MB
ABSL_FLAG(uint64_t, pipeline_buffer_limit, 8_MB,
"Amount of memory to use for parsing pipeline requests - per IO thread.");

ABSL_FLAG(uint64_t, publish_buffer_limit, 128_MB,
"Amount of memory to use for storing pub commands in bytes - per IO thread");

ABSL_FLAG(bool, no_tls_on_admin_port, false, "Allow non-tls connections on admin port");
Expand Down Expand Up @@ -254,8 +258,7 @@ thread_local vector<Connection::PipelineMessagePtr> Connection::pipeline_req_poo
thread_local Connection::QueueBackpressure Connection::tl_queue_backpressure_;

void Connection::QueueBackpressure::EnsureBelowLimit() {
ec.await(
[this] { return subscriber_bytes.load(memory_order_relaxed) <= subscriber_thread_limit; });
ec.await([this] { return subscriber_bytes.load(memory_order_relaxed) <= publish_buffer_limit; });
}

struct Connection::Shutdown {
Expand Down Expand Up @@ -521,12 +524,6 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
last_interaction_ = creation_time_;
id_ = next_id.fetch_add(1, memory_order_relaxed);

queue_backpressure_ = &tl_queue_backpressure_;
if (queue_backpressure_->subscriber_thread_limit == 0) {
queue_backpressure_->subscriber_thread_limit = absl::GetFlag(FLAGS_subscriber_thread_limit);
queue_backpressure_->pipeline_cache_limit = absl::GetFlag(FLAGS_request_cache_limit);
}

migration_enabled_ = absl::GetFlag(FLAGS_migrate_connections);

// Create shared_ptr with empty value and associate it with `this` pointer (aliasing constructor).
Expand Down Expand Up @@ -596,16 +593,38 @@ void Connection::OnPostMigrateThread() {
}
}

void Connection::HandleRequests() {
void Connection::OnConnectionStart() {
DCHECK(queue_backpressure_ == nullptr);

ThisFiber::SetName("DflyConnection");

// We must initialize tl_queue_backpressure_ here and not in the c'tor because a connection object
// may be created in a differrent thread from where it runs.
if (tl_queue_backpressure_.publish_buffer_limit == 0) {
tl_queue_backpressure_.publish_buffer_limit = absl::GetFlag(FLAGS_publish_buffer_limit);
tl_queue_backpressure_.pipeline_cache_limit = absl::GetFlag(FLAGS_request_cache_limit);
tl_queue_backpressure_.pipeline_buffer_limit = absl::GetFlag(FLAGS_pipeline_buffer_limit);
if (tl_queue_backpressure_.publish_buffer_limit == 0 ||
tl_queue_backpressure_.pipeline_cache_limit == 0 ||
tl_queue_backpressure_.pipeline_buffer_limit == 0) {
LOG(ERROR) << "Buffer limit settings are 0";
exit(-1);
}
}

queue_backpressure_ = &tl_queue_backpressure_;
stats_ = &tl_facade_stats->conn_stats;
}

void Connection::HandleRequests() {
VLOG(1) << "[" << id_ << "] HandleRequests";

if (absl::GetFlag(FLAGS_tcp_nodelay) && !socket_->IsUDS()) {
int val = 1;
int res = setsockopt(socket_->native_handle(), IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
DCHECK_EQ(res, 0);
}

stats_ = &tl_facade_stats->conn_stats;
auto remote_ep = RemoteEndpointStr();

FiberSocketBase* peer = socket_.get();
Expand Down Expand Up @@ -882,7 +901,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {

// After the client disconnected.
cc_->conn_closing = true; // Signal dispatch to close.
evc_.notify();
cnd_.notify_one();
phase_ = SHUTTING_DOWN;
VLOG(2) << "Before dispatch_fb.join()";
dispatch_fb_.JoinIfNeeded();
Expand Down Expand Up @@ -934,34 +953,56 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
}
}

void Connection::DispatchCommand(bool has_more, absl::FunctionRef<void()> dispatch_sync,
absl::FunctionRef<MessageHandle()> dispatch_async) {
// Avoid sync dispatch if we can interleave with an ongoing async dispatch
bool can_dispatch_sync = !cc_->async_dispatch;
void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_cb,
absl::FunctionRef<MessageHandle()> cmd_msg_cb) {
DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
bool optimize_for_async = has_more;

// Avoid sync dispatch if we already have pending async messages or
// can potentially receive some (subscriptions > 0)
if (dispatch_q_.size() > 0 || cc_->subscriptions > 0)
can_dispatch_sync = false;
if (optimize_for_async &&
queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes)) {
fb2::NoOpLock noop;
queue_backpressure_->pipeline_cnd.wait(noop, [this] {
return !queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes) ||
(dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing;
});
if (cc_->conn_closing)
return;

// prefer synchronous dispatching to save memory.
optimize_for_async = false;
}

// Avoid sync dispatch if we can interleave with an ongoing async dispatch.
bool can_dispatch_sync = !cc_->async_dispatch && dispatch_q_.empty() && cc_->subscriptions == 0;

// Dispatch async if we're handling a pipeline or if we can't dispatch sync.
if (has_more || !can_dispatch_sync) {
SendAsync(dispatch_async());
if (optimize_for_async || !can_dispatch_sync) {
SendAsync(cmd_msg_cb());

if (dispatch_q_.size() > 10)
ThisFiber::Yield();
auto epoch = fb2::FiberSwitchEpoch();

if (async_fiber_epoch_ == epoch) {
// If we pushed too many items without context switching - yield
if (++async_streak_len_ >= 10 && !cc_->async_dispatch) {
async_streak_len_ = 0;
ThisFiber::Yield();
}
} else {
async_streak_len_ = 0;
async_fiber_epoch_ = epoch;
}
} else {
ShrinkPipelinePool(); // Gradually release pipeline request pool.
{
cc_->sync_dispatch = true;
dispatch_sync();
invoke_cb();
cc_->sync_dispatch = false;
}
last_interaction_ = time(nullptr);

// We might have blocked the dispatch queue from processing, wake it up.
if (dispatch_q_.size() > 0)
evc_.notify();
cnd_.notify_one();
}
}

Expand Down Expand Up @@ -993,7 +1034,8 @@ Connection::ParserStatus Connection::ParseRedis(SinkReplyBuilder* orig_builder)
if (tl_traffic_logger.log_file && IsMain() /* log only on the main interface */) {
LogTraffic(id_, has_more, absl::MakeSpan(parse_args), service_->GetContextInfo(cc_.get()));
}
DispatchCommand(has_more, dispatch_sync, dispatch_async);

DispatchSingle(has_more, dispatch_sync, dispatch_async);
}
io_buf_.ConsumeInput(consumed);
} while (RedisParser::OK == result && !orig_builder->GetError());
Expand Down Expand Up @@ -1049,7 +1091,7 @@ auto Connection::ParseMemcache() -> ParserStatus {
return NEED_MORE;
}
}
DispatchCommand(total_len < io_buf_.InputLen(), dispatch_sync, dispatch_async);
DispatchSingle(total_len < io_buf_.InputLen(), dispatch_sync, dispatch_async);
io_buf_.ConsumeInput(total_len);
} while (!builder->GetError());

Expand Down Expand Up @@ -1084,7 +1126,7 @@ void Connection::OnBreakCb(int32_t mask) {

cc_->conn_closing = true;
BreakOnce(mask);
evc_.notify(); // Notify dispatch fiber.
cnd_.notify_one(); // Notify dispatch fiber.
}

void Connection::HandleMigrateRequest() {
Expand Down Expand Up @@ -1248,7 +1290,7 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) {
auto& pmsg = get<PipelineMessagePtr>(msg.handle);
squash_cmds.push_back(absl::MakeSpan(pmsg->args));
}

stats_->squashed_commands += squash_cmds.size();
cc_->async_dispatch = true;

size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get());
Expand Down Expand Up @@ -1286,6 +1328,7 @@ void Connection::ClearPipelinedMessages() {
}

dispatch_q_.clear();
queue_backpressure_->pipeline_cnd.notify_all();
queue_backpressure_->ec.notifyAll();
}

Expand Down Expand Up @@ -1318,18 +1361,21 @@ std::string Connection::DebugInfo() const {
// into the dispatch queue and DispatchFiber will run those commands asynchronously with
// InputLoop. Note: in some cases, InputLoop may decide to dispatch directly and bypass the
// DispatchFiber.
void Connection::DispatchFiber(util::FiberSocketBase* peer) {
ThisFiber::SetName("DispatchFiber");
void Connection::ExecutionFiber(util::FiberSocketBase* peer) {
ThisFiber::SetName("ExecutionFiber");
SinkReplyBuilder* builder = cc_->reply_builder();
DispatchOperations dispatch_op{builder, this};

size_t squashing_threshold = absl::GetFlag(FLAGS_pipeline_squash);

uint64_t prev_epoch = fb2::FiberSwitchEpoch();
fb2::NoOpLock noop_lk;

while (!builder->GetError()) {
DCHECK_EQ(socket()->proactor(), ProactorBase::me());
evc_.await(
[this] { return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch); });
cnd_.wait(noop_lk, [this] {
return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch);
});
if (cc_->conn_closing)
break;

Expand All @@ -1351,6 +1397,9 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {

builder->SetBatchMode(dispatch_q_.size() > 1);

bool subscriber_over_limit =
stats_->dispatch_queue_subscriber_bytes >= queue_backpressure_->publish_buffer_limit;

// Special case: if the dispatch queue accumulated a big number of commands,
// we can try to squash them
// It is only enabled if the threshold is reached and the whole dispatch queue
Expand All @@ -1374,6 +1423,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
if (ShouldEndDispatchFiber(msg)) {
RecycleMessage(std::move(msg));
CHECK(dispatch_q_.empty()) << DebugInfo();
queue_backpressure_->pipeline_cnd.notify_all();
return; // don't set conn closing flag
}

Expand All @@ -1383,11 +1433,20 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
RecycleMessage(std::move(msg));
}

queue_backpressure_->ec.notify();
DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
if (!queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes) ||
dispatch_q_.empty()) {
queue_backpressure_->pipeline_cnd.notify_all(); // very cheap if noone is waiting on it.
}

if (subscriber_over_limit &&
stats_->dispatch_queue_subscriber_bytes < queue_backpressure_->publish_buffer_limit)
queue_backpressure_->ec.notify();
}

DCHECK(cc_->conn_closing || builder->GetError());
cc_->conn_closing = true;
queue_backpressure_->pipeline_cnd.notify_all();
}

Connection::PipelineMessagePtr Connection::FromArgs(RespVec args, mi_heap_t* heap) {
Expand Down Expand Up @@ -1527,7 +1586,7 @@ void Connection::LaunchDispatchFiberIfNeeded() {
if (!dispatch_fb_.IsJoinable() && !migration_in_process_) {
VLOG(1) << "[" << id_ << "] LaunchDispatchFiberIfNeeded ";
dispatch_fb_ = fb2::Fiber(fb2::Launch::post, "connection_dispatch",
[&, peer = socket_.get()]() { DispatchFiber(peer); });
[this, peer = socket_.get()]() { ExecutionFiber(peer); });
}
}

Expand Down Expand Up @@ -1573,7 +1632,7 @@ void Connection::SendAsync(MessageHandle msg) {

// Don't notify if a sync dispatch is in progress, it will wake after finishing.
if (dispatch_q_.size() == 1 && !cc_->sync_dispatch) {
evc_.notify();
cnd_.notify_one();
}
}

Expand Down Expand Up @@ -1699,6 +1758,7 @@ void Connection::BreakOnce(uint32_t ev_mask) {
Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, QueueBackpressure* backpressure,
unsigned thread, uint32_t client_id)
: ptr_{ptr}, backpressure_{backpressure}, thread_{thread}, client_id_{client_id} {
DCHECK(backpressure);
}

unsigned Connection::WeakRef::Thread() const {
Expand Down
Loading
Loading