Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Introduce pipeline back-pressure #3152

Merged
merged 5 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 86 additions & 39 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#endif

using namespace std;
using facade::operator""_MB;

ABSL_FLAG(bool, tcp_nodelay, true,
"Configures dragonfly connections with socket option TCP_NODELAY");
Expand All @@ -44,10 +45,13 @@ ABSL_FLAG(string, admin_bind, "",
"If set, the admin consol TCP connection would be bind the given address. "
"This supports both HTTP and RESP protocols");

ABSL_FLAG(uint64_t, request_cache_limit, 1ULL << 26, // 64MB
ABSL_FLAG(uint64_t, request_cache_limit, 64_MB,
"Amount of memory to use for request cache in bytes - per IO thread.");

ABSL_FLAG(uint64_t, subscriber_thread_limit, 1ULL << 27, // 128MB
ABSL_FLAG(uint64_t, pipeline_buffer_limit, 8_MB,
"Amount of memory to use for parsing pipeline requests - per IO thread.");

ABSL_FLAG(uint64_t, publish_buffer_limit, 128_MB,
"Amount of memory to use for storing pub commands in bytes - per IO thread");

ABSL_FLAG(bool, no_tls_on_admin_port, false, "Allow non-tls connections on admin port");
Expand Down Expand Up @@ -254,8 +258,7 @@ thread_local vector<Connection::PipelineMessagePtr> Connection::pipeline_req_poo
thread_local Connection::QueueBackpressure Connection::tl_queue_backpressure_;

void Connection::QueueBackpressure::EnsureBelowLimit() {
ec.await(
[this] { return subscriber_bytes.load(memory_order_relaxed) <= subscriber_thread_limit; });
ec.await([this] { return subscriber_bytes.load(memory_order_relaxed) <= publish_buffer_limit; });
}

struct Connection::Shutdown {
Expand Down Expand Up @@ -521,12 +524,6 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
last_interaction_ = creation_time_;
id_ = next_id.fetch_add(1, memory_order_relaxed);

queue_backpressure_ = &tl_queue_backpressure_;
if (queue_backpressure_->subscriber_thread_limit == 0) {
queue_backpressure_->subscriber_thread_limit = absl::GetFlag(FLAGS_subscriber_thread_limit);
queue_backpressure_->pipeline_cache_limit = absl::GetFlag(FLAGS_request_cache_limit);
}

migration_enabled_ = absl::GetFlag(FLAGS_migrate_connections);

// Create shared_ptr with empty value and associate it with `this` pointer (aliasing constructor).
Expand Down Expand Up @@ -596,16 +593,38 @@ void Connection::OnPostMigrateThread() {
}
}

void Connection::HandleRequests() {
void Connection::OnConnectionStart() {
DCHECK(queue_backpressure_ == nullptr);

ThisFiber::SetName("DflyConnection");

// We must initialize tl_queue_backpressure_ here and not in the c'tor because a connection object
// may be created in a differrent thread from where it runs.
if (tl_queue_backpressure_.publish_buffer_limit == 0) {
tl_queue_backpressure_.publish_buffer_limit = absl::GetFlag(FLAGS_publish_buffer_limit);
tl_queue_backpressure_.pipeline_cache_limit = absl::GetFlag(FLAGS_request_cache_limit);
tl_queue_backpressure_.pipeline_buffer_limit = absl::GetFlag(FLAGS_pipeline_buffer_limit);
if (tl_queue_backpressure_.publish_buffer_limit == 0 ||
tl_queue_backpressure_.pipeline_cache_limit == 0 ||
tl_queue_backpressure_.pipeline_buffer_limit == 0) {
LOG(ERROR) << "Buffer limit settings are 0";
exit(-1);
}
}

queue_backpressure_ = &tl_queue_backpressure_;
stats_ = &tl_facade_stats->conn_stats;
}

void Connection::HandleRequests() {
VLOG(1) << "[" << id_ << "] HandleRequests";

if (absl::GetFlag(FLAGS_tcp_nodelay) && !socket_->IsUDS()) {
int val = 1;
int res = setsockopt(socket_->native_handle(), IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
DCHECK_EQ(res, 0);
}

stats_ = &tl_facade_stats->conn_stats;
auto remote_ep = RemoteEndpointStr();

FiberSocketBase* peer = socket_.get();
Expand Down Expand Up @@ -882,7 +901,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {

// After the client disconnected.
cc_->conn_closing = true; // Signal dispatch to close.
evc_.notify();
cnd_.notify_one();
phase_ = SHUTTING_DOWN;
VLOG(2) << "Before dispatch_fb.join()";
dispatch_fb_.JoinIfNeeded();
Expand Down Expand Up @@ -934,34 +953,43 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
}
}

void Connection::DispatchCommand(bool has_more, absl::FunctionRef<void()> dispatch_sync,
absl::FunctionRef<MessageHandle()> dispatch_async) {
// Avoid sync dispatch if we can interleave with an ongoing async dispatch
bool can_dispatch_sync = !cc_->async_dispatch;
void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_cb,
absl::FunctionRef<MessageHandle()> cmd_msg_cb) {
DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
bool optimize_for_async = has_more;

// Avoid sync dispatch if we already have pending async messages or
// can potentially receive some (subscriptions > 0)
if (dispatch_q_.size() > 0 || cc_->subscriptions > 0)
can_dispatch_sync = false;
if (stats_->dispatch_queue_bytes > queue_backpressure_->pipeline_buffer_limit) {
fb2::NoOpLock noop;
queue_backpressure_->pipeline_cnd.wait(noop, [this] {
bool res = stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit ||
(dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing;
return res;
});
if (cc_->conn_closing)
return;
romange marked this conversation as resolved.
Show resolved Hide resolved

// Dispatch async if we're handling a pipeline or if we can't dispatch sync.
if (has_more || !can_dispatch_sync) {
SendAsync(dispatch_async());
// prefer synchronous dispatching to save memory.
optimize_for_async = false;
}

if (dispatch_q_.size() > 10)
ThisFiber::Yield();
// Avoid sync dispatch if we can interleave with an ongoing async dispatch.
bool can_dispatch_sync = !cc_->async_dispatch && dispatch_q_.empty() && cc_->subscriptions == 0;

// Dispatch async if we're handling a pipeline or if we can't dispatch sync.
if (optimize_for_async || !can_dispatch_sync) {
SendAsync(cmd_msg_cb());
} else {
ShrinkPipelinePool(); // Gradually release pipeline request pool.
{
cc_->sync_dispatch = true;
dispatch_sync();
invoke_cb();
cc_->sync_dispatch = false;
}
last_interaction_ = time(nullptr);

// We might have blocked the dispatch queue from processing, wake it up.
if (dispatch_q_.size() > 0)
evc_.notify();
cnd_.notify_one();
}
}

Expand Down Expand Up @@ -993,7 +1021,8 @@ Connection::ParserStatus Connection::ParseRedis(SinkReplyBuilder* orig_builder)
if (tl_traffic_logger.log_file && IsMain() /* log only on the main interface */) {
LogTraffic(id_, has_more, absl::MakeSpan(parse_args), service_->GetContextInfo(cc_.get()));
}
DispatchCommand(has_more, dispatch_sync, dispatch_async);

DispatchSingle(has_more, dispatch_sync, dispatch_async);
}
io_buf_.ConsumeInput(consumed);
} while (RedisParser::OK == result && !orig_builder->GetError());
Expand Down Expand Up @@ -1049,7 +1078,7 @@ auto Connection::ParseMemcache() -> ParserStatus {
return NEED_MORE;
}
}
DispatchCommand(total_len < io_buf_.InputLen(), dispatch_sync, dispatch_async);
DispatchSingle(total_len < io_buf_.InputLen(), dispatch_sync, dispatch_async);
io_buf_.ConsumeInput(total_len);
} while (!builder->GetError());

Expand Down Expand Up @@ -1084,7 +1113,7 @@ void Connection::OnBreakCb(int32_t mask) {

cc_->conn_closing = true;
BreakOnce(mask);
evc_.notify(); // Notify dispatch fiber.
cnd_.notify_one(); // Notify dispatch fiber.
}

void Connection::HandleMigrateRequest() {
Expand Down Expand Up @@ -1248,7 +1277,7 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) {
auto& pmsg = get<PipelineMessagePtr>(msg.handle);
squash_cmds.push_back(absl::MakeSpan(pmsg->args));
}

stats_->squashed_commands += squash_cmds.size();
cc_->async_dispatch = true;

size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get());
Expand Down Expand Up @@ -1286,6 +1315,7 @@ void Connection::ClearPipelinedMessages() {
}

dispatch_q_.clear();
queue_backpressure_->pipeline_cnd.notify_all();
queue_backpressure_->ec.notifyAll();
}

Expand Down Expand Up @@ -1318,18 +1348,21 @@ std::string Connection::DebugInfo() const {
// into the dispatch queue and DispatchFiber will run those commands asynchronously with
// InputLoop. Note: in some cases, InputLoop may decide to dispatch directly and bypass the
// DispatchFiber.
void Connection::DispatchFiber(util::FiberSocketBase* peer) {
ThisFiber::SetName("DispatchFiber");
void Connection::ExecutionFiber(util::FiberSocketBase* peer) {
ThisFiber::SetName("ExecutionFiber");
SinkReplyBuilder* builder = cc_->reply_builder();
DispatchOperations dispatch_op{builder, this};

size_t squashing_threshold = absl::GetFlag(FLAGS_pipeline_squash);

uint64_t prev_epoch = fb2::FiberSwitchEpoch();
fb2::NoOpLock noop_lk;

while (!builder->GetError()) {
DCHECK_EQ(socket()->proactor(), ProactorBase::me());
evc_.await(
[this] { return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch); });
cnd_.wait(noop_lk, [this] {
return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch);
});
if (cc_->conn_closing)
break;

Expand All @@ -1351,6 +1384,9 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {

builder->SetBatchMode(dispatch_q_.size() > 1);

bool subscriber_over_limit =
stats_->dispatch_queue_subscriber_bytes >= queue_backpressure_->publish_buffer_limit;

// Special case: if the dispatch queue accumulated a big number of commands,
// we can try to squash them
// It is only enabled if the threshold is reached and the whole dispatch queue
Expand All @@ -1374,6 +1410,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
if (ShouldEndDispatchFiber(msg)) {
RecycleMessage(std::move(msg));
CHECK(dispatch_q_.empty()) << DebugInfo();
queue_backpressure_->pipeline_cnd.notify_all();
return; // don't set conn closing flag
}

Expand All @@ -1383,11 +1420,20 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
RecycleMessage(std::move(msg));
}

queue_backpressure_->ec.notify();
DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
if (stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit ||
dispatch_q_.empty()) {
queue_backpressure_->pipeline_cnd.notify_all(); // very cheap if noone is waiting on it.
}

if (subscriber_over_limit &&
stats_->dispatch_queue_subscriber_bytes < queue_backpressure_->publish_buffer_limit)
queue_backpressure_->ec.notify();
}

DCHECK(cc_->conn_closing || builder->GetError());
cc_->conn_closing = true;
queue_backpressure_->pipeline_cnd.notify_all();
}

Connection::PipelineMessagePtr Connection::FromArgs(RespVec args, mi_heap_t* heap) {
Expand Down Expand Up @@ -1527,7 +1573,7 @@ void Connection::LaunchDispatchFiberIfNeeded() {
if (!dispatch_fb_.IsJoinable() && !migration_in_process_) {
VLOG(1) << "[" << id_ << "] LaunchDispatchFiberIfNeeded ";
dispatch_fb_ = fb2::Fiber(fb2::Launch::post, "connection_dispatch",
[&, peer = socket_.get()]() { DispatchFiber(peer); });
[this, peer = socket_.get()]() { ExecutionFiber(peer); });
}
}

Expand Down Expand Up @@ -1573,7 +1619,7 @@ void Connection::SendAsync(MessageHandle msg) {

// Don't notify if a sync dispatch is in progress, it will wake after finishing.
if (dispatch_q_.size() == 1 && !cc_->sync_dispatch) {
evc_.notify();
cnd_.notify_one();
}
}

Expand Down Expand Up @@ -1699,6 +1745,7 @@ void Connection::BreakOnce(uint32_t ev_mask) {
Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, QueueBackpressure* backpressure,
unsigned thread, uint32_t client_id)
: ptr_{ptr}, backpressure_{backpressure}, thread_{thread}, client_id_{client_id} {
DCHECK(backpressure);
}

unsigned Connection::WeakRef::Thread() const {
Expand Down
37 changes: 23 additions & 14 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class Connection : public util::Connection {
ServiceInterface* service);
~Connection();

// A callback called by Listener::OnConnectionStart in the same thread where
// HandleRequests will run.
void OnConnectionStart();

using BreakerCb = std::function<void(uint32_t)>;
using ShutdownCb = std::function<void()>;
using ShutdownHandle = unsigned;
Expand Down Expand Up @@ -215,7 +219,6 @@ class Connection : public util::Connection {
uint32_t client_id_;
};

public:
// Add PubMessage to dispatch queue.
// Virtual because behavior is overridden in test_utils.
virtual void SendPubMessageAsync(PubMessage);
Expand Down Expand Up @@ -318,18 +321,24 @@ class Connection : public util::Connection {
struct DispatchCleanup;
struct Shutdown;

// Keeps track of total per-thread sizes of dispatch queues to
// limit memory taken up by messages from PUBLISH commands and slow down clients
// producing them to quickly via EnsureAsyncMemoryBudget.
// Keeps track of total per-thread sizes of dispatch queues to limit memory taken up by messages
// in these queues.
struct QueueBackpressure {
// Block until memory usage is below limit, can be called from any thread
// Block until subscriber memory usage is below limit, can be called from any thread.
void EnsureBelowLimit();

// Used by publisher/subscriber actors to make sure we do not publish too many messages
// into the queue. Thread-safe to allow safe access in EnsureBelowLimit.
util::fb2::EventCount ec;
std::atomic_size_t subscriber_bytes = 0;

size_t subscriber_thread_limit = 0; // cached flag subscriber_thread_limit
size_t pipeline_cache_limit = 0; // cached flag pipeline_cache_limit
// Used by pipelining/execution fiber to throttle the incoming pipeline messages.
// Used together with pipeline_buffer_limit to limit the pipeline usage per thread.
util::fb2::CondVarAny pipeline_cnd;

size_t publish_buffer_limit = 0; // cached flag publish_buffer_limit
size_t pipeline_cache_limit = 0; // cached flag pipeline_cache_limit
size_t pipeline_buffer_limit = 0; // cached flag for buffer size in bytes
};

private:
Expand All @@ -346,14 +355,15 @@ class Connection : public util::Connection {
// Returns true if HTTP header is detected.
io::Result<bool> CheckForHttpProto(util::FiberSocketBase* peer);

// Dispatch Redis or MC command. `has_more` should indicate whether the buffer has more commands
// Dispatches a single (Redis or MC) command.
// `has_more` should indicate whether the io buffer has more commands
// (pipelining in progress). Performs async dispatch if forced (already in async mode) or if
// has_more is true, otherwise uses synchronous dispatch.
void DispatchCommand(bool has_more, absl::FunctionRef<void()> sync_dispatch,
absl::FunctionRef<MessageHandle()> async_dispatch);
void DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_cb,
absl::FunctionRef<MessageHandle()> cmd_msg_cb);

// Handles events from dispatch queue.
void DispatchFiber(util::FiberSocketBase* peer);
void ExecutionFiber(util::FiberSocketBase* peer);

void SendAsync(MessageHandle msg);

Expand Down Expand Up @@ -394,7 +404,7 @@ class Connection : public util::Connection {
void BreakOnce(uint32_t ev_mask);

std::deque<MessageHandle> dispatch_q_; // dispatch queue
util::fb2::EventCount evc_; // dispatch queue waker
util::fb2::CondVarAny cnd_; // dispatch queue waker
romange marked this conversation as resolved.
Show resolved Hide resolved
util::fb2::Fiber dispatch_fb_; // dispatch fiber (if started)

size_t pending_pipeline_cmd_cnt_ = 0; // how many queued async commands in dispatch_q
Expand All @@ -413,7 +423,6 @@ class Connection : public util::Connection {
ServiceInterface* service_;

time_t creation_time_, last_interaction_;

Phase phase_ = SETUP;
std::string name_;

Expand All @@ -430,7 +439,7 @@ class Connection : public util::Connection {

// Pointer to corresponding queue backpressure struct.
// Needed for access from different threads by EnsureAsyncMemoryBudget().
QueueBackpressure* queue_backpressure_;
QueueBackpressure* queue_backpressure_ = nullptr;

util::fb2::ProactorBase* migration_request_ = nullptr;

Expand Down
2 changes: 2 additions & 0 deletions src/facade/dragonfly_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ void Listener::OnConnectionStart(util::Connection* conn) {
facade::Connection* facade_conn = static_cast<facade::Connection*>(conn);
VLOG(1) << "Opening connection " << facade_conn->GetClientId();

facade_conn->OnConnectionStart();

absl::base_internal::SpinLockHolder lock{&mutex_};
int32_t prev_cnt = per_thread_[id].num_connections++;
++conn_cnt_;
Expand Down
Loading
Loading