From d32993054c46ef1ea9ed82fb073db10fb67809b9 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Fri, 7 Jun 2024 12:19:31 +0300 Subject: [PATCH 1/5] chore: Introduce pipeline back-pressure Also, improve synchronization primitives and replace them with thread-local variations. Before the change, on my local machine with the dragonfly running with 8 threads, `memtier_benchmark -c 10 --threads 8 --command="PING" --key-maximum 100000000 --hide-histogram --distinct-client-seed --pipeline=20 --test-time=10` reached 10M qps with 0.327ms p99.9. After the change, the same command showed 13.8M qps with 0.2ms p99.9 Signed-off-by: Roman Gershman --- src/facade/dragonfly_connection.cc | 81 ++++++++++++++++++++---------- src/facade/dragonfly_connection.h | 15 +++--- 2 files changed, 64 insertions(+), 32 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 013366af9853..6c5ff7d396e9 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -30,6 +30,7 @@ #endif using namespace std; +using facade::operator""_MB; ABSL_FLAG(bool, tcp_nodelay, true, "Configures dragonfly connections with socket option TCP_NODELAY"); @@ -44,10 +45,13 @@ ABSL_FLAG(string, admin_bind, "", "If set, the admin consol TCP connection would be bind the given address. " "This supports both HTTP and RESP protocols"); -ABSL_FLAG(uint64_t, request_cache_limit, 1ULL << 26, // 64MB +ABSL_FLAG(uint64_t, request_cache_limit, 64_MB, "Amount of memory to use for request cache in bytes - per IO thread."); -ABSL_FLAG(uint64_t, subscriber_thread_limit, 1ULL << 27, // 128MB +ABSL_FLAG(uint64_t, pipeline_buffer_limit, 4_MB, + "Amount of memory to use for parsing pipeline requests - per IO thread."); + +ABSL_FLAG(uint64_t, subscriber_thread_limit, 128_MB, "Amount of memory to use for storing pub commands in bytes - per IO thread"); ABSL_FLAG(bool, no_tls_on_admin_port, false, "Allow non-tls connections on admin port"); @@ -521,12 +525,6 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, last_interaction_ = creation_time_; id_ = next_id.fetch_add(1, memory_order_relaxed); - queue_backpressure_ = &tl_queue_backpressure_; - if (queue_backpressure_->subscriber_thread_limit == 0) { - queue_backpressure_->subscriber_thread_limit = absl::GetFlag(FLAGS_subscriber_thread_limit); - queue_backpressure_->pipeline_cache_limit = absl::GetFlag(FLAGS_request_cache_limit); - } - migration_enabled_ = absl::GetFlag(FLAGS_migrate_connections); // Create shared_ptr with empty value and associate it with `this` pointer (aliasing constructor). @@ -599,6 +597,16 @@ void Connection::OnPostMigrateThread() { void Connection::HandleRequests() { ThisFiber::SetName("DflyConnection"); + // We must initialize tl_queue_backpressure_ here and not in the c'tor because a connection object + // may be created in a differrent thread from where it runs. + if (tl_queue_backpressure_.subscriber_thread_limit == 0) { + tl_queue_backpressure_.subscriber_thread_limit = absl::GetFlag(FLAGS_subscriber_thread_limit); + tl_queue_backpressure_.pipeline_cache_limit = absl::GetFlag(FLAGS_request_cache_limit); + tl_queue_backpressure_.pipeline_buffer_limit = absl::GetFlag(FLAGS_pipeline_buffer_limit); + } + + queue_backpressure_ = &tl_queue_backpressure_; + if (absl::GetFlag(FLAGS_tcp_nodelay) && !socket_->IsUDS()) { int val = 1; int res = setsockopt(socket_->native_handle(), IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)); @@ -882,7 +890,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { // After the client disconnected. cc_->conn_closing = true; // Signal dispatch to close. - evc_.notify(); + cnd_.notify_one(); phase_ = SHUTTING_DOWN; VLOG(2) << "Before dispatch_fb.join()"; dispatch_fb_.JoinIfNeeded(); @@ -934,22 +942,31 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { } } -void Connection::DispatchCommand(bool has_more, absl::FunctionRef dispatch_sync, - absl::FunctionRef dispatch_async) { +void Connection::DispatchSingle(bool has_more, absl::FunctionRef dispatch_sync, + absl::FunctionRef dispatch_async) { // Avoid sync dispatch if we can interleave with an ongoing async dispatch bool can_dispatch_sync = !cc_->async_dispatch; // Avoid sync dispatch if we already have pending async messages or // can potentially receive some (subscriptions > 0) - if (dispatch_q_.size() > 0 || cc_->subscriptions > 0) + if (dispatch_q_.size() > 0 || cc_->subscriptions > 0) { can_dispatch_sync = false; + if (stats_->dispatch_queue_bytes >= queue_backpressure_->pipeline_buffer_limit) { + DCHECK(queue_backpressure_ == &tl_queue_backpressure_); + fb2::NoOpLock noop; + queue_backpressure_->pipeline_cnd.wait(noop, [this] { + bool res = stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit / 2 || + cc_->conn_closing; + return res; + }); + if (cc_->conn_closing) + return; + } + } // Dispatch async if we're handling a pipeline or if we can't dispatch sync. if (has_more || !can_dispatch_sync) { SendAsync(dispatch_async()); - - if (dispatch_q_.size() > 10) - ThisFiber::Yield(); } else { ShrinkPipelinePool(); // Gradually release pipeline request pool. { @@ -961,7 +978,7 @@ void Connection::DispatchCommand(bool has_more, absl::FunctionRef dispat // We might have blocked the dispatch queue from processing, wake it up. if (dispatch_q_.size() > 0) - evc_.notify(); + cnd_.notify_one(); } } @@ -993,7 +1010,8 @@ Connection::ParserStatus Connection::ParseRedis(SinkReplyBuilder* orig_builder) if (tl_traffic_logger.log_file && IsMain() /* log only on the main interface */) { LogTraffic(id_, has_more, absl::MakeSpan(parse_args), service_->GetContextInfo(cc_.get())); } - DispatchCommand(has_more, dispatch_sync, dispatch_async); + + DispatchSingle(has_more, dispatch_sync, dispatch_async); } io_buf_.ConsumeInput(consumed); } while (RedisParser::OK == result && !orig_builder->GetError()); @@ -1049,7 +1067,7 @@ auto Connection::ParseMemcache() -> ParserStatus { return NEED_MORE; } } - DispatchCommand(total_len < io_buf_.InputLen(), dispatch_sync, dispatch_async); + DispatchSingle(total_len < io_buf_.InputLen(), dispatch_sync, dispatch_async); io_buf_.ConsumeInput(total_len); } while (!builder->GetError()); @@ -1084,7 +1102,7 @@ void Connection::OnBreakCb(int32_t mask) { cc_->conn_closing = true; BreakOnce(mask); - evc_.notify(); // Notify dispatch fiber. + cnd_.notify_one(); // Notify dispatch fiber. } void Connection::HandleMigrateRequest() { @@ -1286,6 +1304,7 @@ void Connection::ClearPipelinedMessages() { } dispatch_q_.clear(); + queue_backpressure_->pipeline_cnd.notify_all(); queue_backpressure_->ec.notifyAll(); } @@ -1318,18 +1337,21 @@ std::string Connection::DebugInfo() const { // into the dispatch queue and DispatchFiber will run those commands asynchronously with // InputLoop. Note: in some cases, InputLoop may decide to dispatch directly and bypass the // DispatchFiber. -void Connection::DispatchFiber(util::FiberSocketBase* peer) { - ThisFiber::SetName("DispatchFiber"); +void Connection::ExecutionFiber(util::FiberSocketBase* peer) { + ThisFiber::SetName("ExecutionFiber"); SinkReplyBuilder* builder = cc_->reply_builder(); DispatchOperations dispatch_op{builder, this}; size_t squashing_threshold = absl::GetFlag(FLAGS_pipeline_squash); uint64_t prev_epoch = fb2::FiberSwitchEpoch(); + fb2::NoOpLock noop_lk; + while (!builder->GetError()) { DCHECK_EQ(socket()->proactor(), ProactorBase::me()); - evc_.await( - [this] { return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch); }); + cnd_.wait(noop_lk, [this] { + return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch); + }); if (cc_->conn_closing) break; @@ -1374,6 +1396,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) { if (ShouldEndDispatchFiber(msg)) { RecycleMessage(std::move(msg)); CHECK(dispatch_q_.empty()) << DebugInfo(); + queue_backpressure_->pipeline_cnd.notify_all(); return; // don't set conn closing flag } @@ -1383,11 +1406,17 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) { RecycleMessage(std::move(msg)); } - queue_backpressure_->ec.notify(); + DCHECK(queue_backpressure_ == &tl_queue_backpressure_); + if (stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit / 2) { + queue_backpressure_->pipeline_cnd.notify_all(); + } + if (stats_->dispatch_queue_subscriber_bytes < queue_backpressure_->subscriber_thread_limit) + queue_backpressure_->ec.notify(); } DCHECK(cc_->conn_closing || builder->GetError()); cc_->conn_closing = true; + queue_backpressure_->pipeline_cnd.notify_all(); } Connection::PipelineMessagePtr Connection::FromArgs(RespVec args, mi_heap_t* heap) { @@ -1527,7 +1556,7 @@ void Connection::LaunchDispatchFiberIfNeeded() { if (!dispatch_fb_.IsJoinable() && !migration_in_process_) { VLOG(1) << "[" << id_ << "] LaunchDispatchFiberIfNeeded "; dispatch_fb_ = fb2::Fiber(fb2::Launch::post, "connection_dispatch", - [&, peer = socket_.get()]() { DispatchFiber(peer); }); + [this, peer = socket_.get()]() { ExecutionFiber(peer); }); } } @@ -1573,7 +1602,7 @@ void Connection::SendAsync(MessageHandle msg) { // Don't notify if a sync dispatch is in progress, it will wake after finishing. if (dispatch_q_.size() == 1 && !cc_->sync_dispatch) { - evc_.notify(); + cnd_.notify_one(); } } diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index a1c10549deaf..39d21ebe0c53 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -328,8 +328,11 @@ class Connection : public util::Connection { util::fb2::EventCount ec; std::atomic_size_t subscriber_bytes = 0; + util::fb2::CondVarAny pipeline_cnd; + size_t subscriber_thread_limit = 0; // cached flag subscriber_thread_limit size_t pipeline_cache_limit = 0; // cached flag pipeline_cache_limit + size_t pipeline_buffer_limit = 0; // cached flag for buffer size in bytes }; private: @@ -346,14 +349,15 @@ class Connection : public util::Connection { // Returns true if HTTP header is detected. io::Result CheckForHttpProto(util::FiberSocketBase* peer); - // Dispatch Redis or MC command. `has_more` should indicate whether the buffer has more commands + // Dispatches a single (Redis or MC) command. + // `has_more` should indicate whether the io buffer has more commands // (pipelining in progress). Performs async dispatch if forced (already in async mode) or if // has_more is true, otherwise uses synchronous dispatch. - void DispatchCommand(bool has_more, absl::FunctionRef sync_dispatch, - absl::FunctionRef async_dispatch); + void DispatchSingle(bool has_more, absl::FunctionRef sync_dispatch, + absl::FunctionRef async_dispatch); // Handles events from dispatch queue. - void DispatchFiber(util::FiberSocketBase* peer); + void ExecutionFiber(util::FiberSocketBase* peer); void SendAsync(MessageHandle msg); @@ -394,7 +398,7 @@ class Connection : public util::Connection { void BreakOnce(uint32_t ev_mask); std::deque dispatch_q_; // dispatch queue - util::fb2::EventCount evc_; // dispatch queue waker + util::fb2::CondVarAny cnd_; // dispatch queue waker util::fb2::Fiber dispatch_fb_; // dispatch fiber (if started) size_t pending_pipeline_cmd_cnt_ = 0; // how many queued async commands in dispatch_q @@ -413,7 +417,6 @@ class Connection : public util::Connection { ServiceInterface* service_; time_t creation_time_, last_interaction_; - Phase phase_ = SETUP; std::string name_; From 84aa1e67e4e3814454d953a3c13d53927266e9fe Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sat, 8 Jun 2024 12:23:09 +0300 Subject: [PATCH 2/5] chore: fixes Signed-off-by: Roman Gershman --- src/facade/dragonfly_connection.cc | 67 ++++++++++++++++++++---------- src/facade/dragonfly_connection.h | 28 ++++++++----- src/facade/dragonfly_listener.cc | 2 + src/server/test_utils.cc | 1 + 4 files changed, 66 insertions(+), 32 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 6c5ff7d396e9..181cbd98c7a1 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -48,10 +48,10 @@ ABSL_FLAG(string, admin_bind, "", ABSL_FLAG(uint64_t, request_cache_limit, 64_MB, "Amount of memory to use for request cache in bytes - per IO thread."); -ABSL_FLAG(uint64_t, pipeline_buffer_limit, 4_MB, +ABSL_FLAG(uint64_t, pipeline_buffer_limit, 8_MB, "Amount of memory to use for parsing pipeline requests - per IO thread."); -ABSL_FLAG(uint64_t, subscriber_thread_limit, 128_MB, +ABSL_FLAG(uint64_t, publish_buffer_limit, 128_MB, "Amount of memory to use for storing pub commands in bytes - per IO thread"); ABSL_FLAG(bool, no_tls_on_admin_port, false, "Allow non-tls connections on admin port"); @@ -258,8 +258,7 @@ thread_local vector Connection::pipeline_req_poo thread_local Connection::QueueBackpressure Connection::tl_queue_backpressure_; void Connection::QueueBackpressure::EnsureBelowLimit() { - ec.await( - [this] { return subscriber_bytes.load(memory_order_relaxed) <= subscriber_thread_limit; }); + ec.await([this] { return subscriber_bytes.load(memory_order_relaxed) <= publish_buffer_limit; }); } struct Connection::Shutdown { @@ -594,18 +593,31 @@ void Connection::OnPostMigrateThread() { } } -void Connection::HandleRequests() { +void Connection::OnConnectionStart() { + DCHECK(queue_backpressure_ == nullptr); + ThisFiber::SetName("DflyConnection"); // We must initialize tl_queue_backpressure_ here and not in the c'tor because a connection object // may be created in a differrent thread from where it runs. - if (tl_queue_backpressure_.subscriber_thread_limit == 0) { - tl_queue_backpressure_.subscriber_thread_limit = absl::GetFlag(FLAGS_subscriber_thread_limit); + if (tl_queue_backpressure_.publish_buffer_limit == 0) { + tl_queue_backpressure_.publish_buffer_limit = absl::GetFlag(FLAGS_publish_buffer_limit); tl_queue_backpressure_.pipeline_cache_limit = absl::GetFlag(FLAGS_request_cache_limit); tl_queue_backpressure_.pipeline_buffer_limit = absl::GetFlag(FLAGS_pipeline_buffer_limit); + if (tl_queue_backpressure_.publish_buffer_limit == 0 || + tl_queue_backpressure_.pipeline_cache_limit == 0 || + tl_queue_backpressure_.pipeline_buffer_limit == 0) { + LOG(ERROR) << "Buffer limit settings are 0"; + exit(-1); + } } queue_backpressure_ = &tl_queue_backpressure_; + stats_ = &tl_facade_stats->conn_stats; +} + +void Connection::HandleRequests() { + VLOG(1) << "[" << id_ << "HandleRequests"; if (absl::GetFlag(FLAGS_tcp_nodelay) && !socket_->IsUDS()) { int val = 1; @@ -613,7 +625,6 @@ void Connection::HandleRequests() { DCHECK_EQ(res, 0); } - stats_ = &tl_facade_stats->conn_stats; auto remote_ep = RemoteEndpointStr(); FiberSocketBase* peer = socket_.get(); @@ -942,36 +953,44 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { } } -void Connection::DispatchSingle(bool has_more, absl::FunctionRef dispatch_sync, - absl::FunctionRef dispatch_async) { +void Connection::DispatchSingle(bool has_more, absl::FunctionRef invoke_cb, + absl::FunctionRef cmd_msg_cb) { // Avoid sync dispatch if we can interleave with an ongoing async dispatch bool can_dispatch_sync = !cc_->async_dispatch; // Avoid sync dispatch if we already have pending async messages or // can potentially receive some (subscriptions > 0) - if (dispatch_q_.size() > 0 || cc_->subscriptions > 0) { - can_dispatch_sync = false; - if (stats_->dispatch_queue_bytes >= queue_backpressure_->pipeline_buffer_limit) { - DCHECK(queue_backpressure_ == &tl_queue_backpressure_); + if (can_dispatch_sync && (!dispatch_q_.empty() || cc_->subscriptions > 0)) { + DCHECK(queue_backpressure_ == &tl_queue_backpressure_); + if (stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit) { + can_dispatch_sync = false; + } else { fb2::NoOpLock noop; queue_backpressure_->pipeline_cnd.wait(noop, [this] { - bool res = stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit / 2 || - cc_->conn_closing; + bool res = stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit || + (dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing; return res; }); if (cc_->conn_closing) return; + + // Prefer sending synchronous request if possible (can_dispatch_sync=false), + // to reduce the memory pressure. + has_more = false; + if (cc_->async_dispatch || !dispatch_q_.empty() || cc_->subscriptions > 0) { + can_dispatch_sync = false; + } } } // Dispatch async if we're handling a pipeline or if we can't dispatch sync. if (has_more || !can_dispatch_sync) { - SendAsync(dispatch_async()); + SendAsync(cmd_msg_cb()); } else { ShrinkPipelinePool(); // Gradually release pipeline request pool. { cc_->sync_dispatch = true; - dispatch_sync(); + invoke_cb(); cc_->sync_dispatch = false; } last_interaction_ = time(nullptr); @@ -1380,6 +1399,8 @@ void Connection::ExecutionFiber(util::FiberSocketBase* peer) { bool squashing_enabled = squashing_threshold > 0; bool threshold_reached = pending_pipeline_cmd_cnt_ > squashing_threshold; bool are_all_plain_cmds = pending_pipeline_cmd_cnt_ == dispatch_q_.size(); + bool subscriber_over_limit = + stats_->dispatch_queue_subscriber_bytes >= queue_backpressure_->publish_buffer_limit; if (squashing_enabled && threshold_reached && are_all_plain_cmds && !skip_next_squashing_) { SquashPipeline(builder); } else { @@ -1407,10 +1428,13 @@ void Connection::ExecutionFiber(util::FiberSocketBase* peer) { } DCHECK(queue_backpressure_ == &tl_queue_backpressure_); - if (stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit / 2) { - queue_backpressure_->pipeline_cnd.notify_all(); + if (stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit || + dispatch_q_.empty()) { + queue_backpressure_->pipeline_cnd.notify_all(); // very cheap if noone is waiting on it. } - if (stats_->dispatch_queue_subscriber_bytes < queue_backpressure_->subscriber_thread_limit) + + if (subscriber_over_limit && + stats_->dispatch_queue_subscriber_bytes < queue_backpressure_->publish_buffer_limit) queue_backpressure_->ec.notify(); } @@ -1728,6 +1752,7 @@ void Connection::BreakOnce(uint32_t ev_mask) { Connection::WeakRef::WeakRef(std::shared_ptr ptr, QueueBackpressure* backpressure, unsigned thread, uint32_t client_id) : ptr_{ptr}, backpressure_{backpressure}, thread_{thread}, client_id_{client_id} { + DCHECK(backpressure); } unsigned Connection::WeakRef::Thread() const { diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 39d21ebe0c53..34d336db6f9a 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -61,6 +61,10 @@ class Connection : public util::Connection { ServiceInterface* service); ~Connection(); + // A callback called by Listener::OnConnectionStart in the same thread where + // HandleRequests will run. + void OnConnectionStart(); + using BreakerCb = std::function; using ShutdownCb = std::function; using ShutdownHandle = unsigned; @@ -215,7 +219,6 @@ class Connection : public util::Connection { uint32_t client_id_; }; - public: // Add PubMessage to dispatch queue. // Virtual because behavior is overridden in test_utils. virtual void SendPubMessageAsync(PubMessage); @@ -318,21 +321,24 @@ class Connection : public util::Connection { struct DispatchCleanup; struct Shutdown; - // Keeps track of total per-thread sizes of dispatch queues to - // limit memory taken up by messages from PUBLISH commands and slow down clients - // producing them to quickly via EnsureAsyncMemoryBudget. + // Keeps track of total per-thread sizes of dispatch queues to limit memory taken up by messages + // in these queues. struct QueueBackpressure { - // Block until memory usage is below limit, can be called from any thread + // Block until subscriber memory usage is below limit, can be called from any thread. void EnsureBelowLimit(); + // Used by publisher/subscriber actors to make sure we do not publish too many messages + // into the queue. Thread-safe to allow safe access in EnsureBelowLimit. util::fb2::EventCount ec; std::atomic_size_t subscriber_bytes = 0; + // Used by pipelining/execution fiber to throttle the incoming pipeline messages. + // Used together with pipeline_buffer_limit to limit the pipeline usage per thread. util::fb2::CondVarAny pipeline_cnd; - size_t subscriber_thread_limit = 0; // cached flag subscriber_thread_limit - size_t pipeline_cache_limit = 0; // cached flag pipeline_cache_limit - size_t pipeline_buffer_limit = 0; // cached flag for buffer size in bytes + size_t publish_buffer_limit = 0; // cached flag publish_buffer_limit + size_t pipeline_cache_limit = 0; // cached flag pipeline_cache_limit + size_t pipeline_buffer_limit = 0; // cached flag for buffer size in bytes }; private: @@ -353,8 +359,8 @@ class Connection : public util::Connection { // `has_more` should indicate whether the io buffer has more commands // (pipelining in progress). Performs async dispatch if forced (already in async mode) or if // has_more is true, otherwise uses synchronous dispatch. - void DispatchSingle(bool has_more, absl::FunctionRef sync_dispatch, - absl::FunctionRef async_dispatch); + void DispatchSingle(bool has_more, absl::FunctionRef invoke_cb, + absl::FunctionRef cmd_msg_cb); // Handles events from dispatch queue. void ExecutionFiber(util::FiberSocketBase* peer); @@ -433,7 +439,7 @@ class Connection : public util::Connection { // Pointer to corresponding queue backpressure struct. // Needed for access from different threads by EnsureAsyncMemoryBudget(). - QueueBackpressure* queue_backpressure_; + QueueBackpressure* queue_backpressure_ = nullptr; util::fb2::ProactorBase* migration_request_ = nullptr; diff --git a/src/facade/dragonfly_listener.cc b/src/facade/dragonfly_listener.cc index c3f4303bdc2d..a28af6f90248 100644 --- a/src/facade/dragonfly_listener.cc +++ b/src/facade/dragonfly_listener.cc @@ -325,6 +325,8 @@ void Listener::OnConnectionStart(util::Connection* conn) { facade::Connection* facade_conn = static_cast(conn); VLOG(1) << "Opening connection " << facade_conn->GetClientId(); + facade_conn->OnConnectionStart(); + absl::base_internal::SpinLockHolder lock{&mutex_}; int32_t prev_cnt = per_thread_[id].num_connections++; ++conn_cnt_; diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 81a1e8bbdafc..2df43cdbeb07 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -59,6 +59,7 @@ TestConnection::TestConnection(Protocol protocol, io::StringSink* sink) : facade::Connection(protocol, nullptr, nullptr, nullptr), sink_(sink) { cc_.reset(new dfly::ConnectionContext(sink_, this)); SetSocket(ProactorBase::me()->CreateSocket()); + OnConnectionStart(); } void TestConnection::SendPubMessageAsync(PubMessage pmsg) { From 17fc5a996ca2ec4e4b101a4b40670b42a3e325b6 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 9 Jun 2024 14:59:44 +0300 Subject: [PATCH 3/5] chore: comments Signed-off-by: Roman Gershman --- src/facade/dragonfly_connection.cc | 51 +++++++++++++----------------- tests/dragonfly/connection_test.py | 2 +- 2 files changed, 23 insertions(+), 30 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 181cbd98c7a1..939cedb8b753 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -617,7 +617,7 @@ void Connection::OnConnectionStart() { } void Connection::HandleRequests() { - VLOG(1) << "[" << id_ << "HandleRequests"; + VLOG(1) << "[" << id_ << "] HandleRequests"; if (absl::GetFlag(FLAGS_tcp_nodelay) && !socket_->IsUDS()) { int val = 1; @@ -955,36 +955,28 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { void Connection::DispatchSingle(bool has_more, absl::FunctionRef invoke_cb, absl::FunctionRef cmd_msg_cb) { - // Avoid sync dispatch if we can interleave with an ongoing async dispatch - bool can_dispatch_sync = !cc_->async_dispatch; - - // Avoid sync dispatch if we already have pending async messages or - // can potentially receive some (subscriptions > 0) - if (can_dispatch_sync && (!dispatch_q_.empty() || cc_->subscriptions > 0)) { - DCHECK(queue_backpressure_ == &tl_queue_backpressure_); - if (stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit) { - can_dispatch_sync = false; - } else { - fb2::NoOpLock noop; - queue_backpressure_->pipeline_cnd.wait(noop, [this] { - bool res = stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit || - (dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing; - return res; - }); - if (cc_->conn_closing) - return; + DCHECK(queue_backpressure_ == &tl_queue_backpressure_); + bool optimize_for_async = has_more; + + if (stats_->dispatch_queue_bytes > queue_backpressure_->pipeline_buffer_limit) { + fb2::NoOpLock noop; + queue_backpressure_->pipeline_cnd.wait(noop, [this] { + bool res = stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit || + (dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing; + return res; + }); + if (cc_->conn_closing) + return; - // Prefer sending synchronous request if possible (can_dispatch_sync=false), - // to reduce the memory pressure. - has_more = false; - if (cc_->async_dispatch || !dispatch_q_.empty() || cc_->subscriptions > 0) { - can_dispatch_sync = false; - } - } + // prefer synchronous dispatching to save memory. + optimize_for_async = false; } + // Avoid sync dispatch if we can interleave with an ongoing async dispatch. + bool can_dispatch_sync = !cc_->async_dispatch && dispatch_q_.empty() && cc_->subscriptions == 0; + // Dispatch async if we're handling a pipeline or if we can't dispatch sync. - if (has_more || !can_dispatch_sync) { + if (optimize_for_async || !can_dispatch_sync) { SendAsync(cmd_msg_cb()); } else { ShrinkPipelinePool(); // Gradually release pipeline request pool. @@ -1392,6 +1384,9 @@ void Connection::ExecutionFiber(util::FiberSocketBase* peer) { builder->SetBatchMode(dispatch_q_.size() > 1); + bool subscriber_over_limit = + stats_->dispatch_queue_subscriber_bytes >= queue_backpressure_->publish_buffer_limit; + // Special case: if the dispatch queue accumulated a big number of commands, // we can try to squash them // It is only enabled if the threshold is reached and the whole dispatch queue @@ -1399,8 +1394,6 @@ void Connection::ExecutionFiber(util::FiberSocketBase* peer) { bool squashing_enabled = squashing_threshold > 0; bool threshold_reached = pending_pipeline_cmd_cnt_ > squashing_threshold; bool are_all_plain_cmds = pending_pipeline_cmd_cnt_ == dispatch_q_.size(); - bool subscriber_over_limit = - stats_->dispatch_queue_subscriber_bytes >= queue_backpressure_->publish_buffer_limit; if (squashing_enabled && threshold_reached && are_all_plain_cmds && !skip_next_squashing_) { SquashPipeline(builder); } else { diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index b4c3ee5decbb..9eaf81af20c7 100755 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -334,7 +334,7 @@ async def resub(s: "aioredis.PubSub", sub: bool, chan: str): @pytest.mark.slow -@dfly_args({"proactor_threads": "1", "subscriber_thread_limit": "100"}) +@dfly_args({"proactor_threads": "1", "publish_buffer_limit": "100"}) async def test_publish_stuck(df_server: DflyInstance, async_client: aioredis.Redis): reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port, limit=10) writer.write(b"SUBSCRIBE channel\r\n") From 9b787f659a4c57c30f2afd31f90a800f0e832edb Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 9 Jun 2024 21:14:56 +0300 Subject: [PATCH 4/5] chore: add pipeline_squashed statistics --- src/facade/dragonfly_connection.cc | 2 +- src/facade/facade.cc | 3 ++- src/facade/facade_types.h | 1 + src/server/server_family.cc | 1 + 4 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 939cedb8b753..3045953bb7ed 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -1277,7 +1277,7 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) { auto& pmsg = get(msg.handle); squash_cmds.push_back(absl::MakeSpan(pmsg->args)); } - + stats_->squashed_commands += squash_cmds.size(); cc_->async_dispatch = true; size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get()); diff --git a/src/facade/facade.cc b/src/facade/facade.cc index d6487fe9dd9d..1748c1cf4061 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -20,7 +20,7 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats); ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { // To break this code deliberately if we add/remove a field to this struct. - static_assert(kSizeConnStats == 112u); + static_assert(kSizeConnStats == 120u); ADD(read_buf_capacity); ADD(dispatch_queue_entries); @@ -37,6 +37,7 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { ADD(num_replicas); ADD(num_blocked_clients); ADD(num_migrations); + ADD(squashed_commands); return *this; } diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index 87cf137f6fc2..f0fe055e67a3 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -66,6 +66,7 @@ struct ConnectionStats { uint32_t num_replicas = 0; uint32_t num_blocked_clients = 0; uint64_t num_migrations = 0; + uint64_t squashed_commands = 0; ConnectionStats& operator+=(const ConnectionStats& o); }; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index a617b1ebe22e..f749f3d97a90 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2102,6 +2102,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("total_commands_processed", conn_stats.command_cnt); append("instantaneous_ops_per_sec", m.qps); append("total_pipelined_commands", conn_stats.pipelined_cmd_cnt); + append("total_pipelined_squashed_commands", conn_stats.squashed_commands); append("pipelined_latency_usec", conn_stats.pipelined_cmd_latency); append("total_net_input_bytes", conn_stats.io_read_bytes); append("connection_migrations", conn_stats.num_migrations); From c0fed682a01897dd80068d8a6db27ad1c9902fb0 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 10 Jun 2024 10:42:26 +0300 Subject: [PATCH 5/5] chore: comments Signed-off-by: Roman Gershman --- .github/workflows/daily-builds.yml | 1 + src/facade/dragonfly_connection.cc | 23 ++++++++++++++++++----- src/facade/dragonfly_connection.h | 8 ++++++++ src/server/replica.cc | 1 - 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/.github/workflows/daily-builds.yml b/.github/workflows/daily-builds.yml index e7813f39310b..0e1eacc2975a 100644 --- a/.github/workflows/daily-builds.yml +++ b/.github/workflows/daily-builds.yml @@ -67,6 +67,7 @@ jobs: ctest -V -L DFLY build-macos: + if: false runs-on: macos-12 timeout-minutes: 45 steps: diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 3045953bb7ed..f7475b657e86 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -958,12 +958,12 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef invoke_ DCHECK(queue_backpressure_ == &tl_queue_backpressure_); bool optimize_for_async = has_more; - if (stats_->dispatch_queue_bytes > queue_backpressure_->pipeline_buffer_limit) { + if (optimize_for_async && + queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes)) { fb2::NoOpLock noop; queue_backpressure_->pipeline_cnd.wait(noop, [this] { - bool res = stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit || - (dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing; - return res; + return !queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes) || + (dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing; }); if (cc_->conn_closing) return; @@ -978,6 +978,19 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef invoke_ // Dispatch async if we're handling a pipeline or if we can't dispatch sync. if (optimize_for_async || !can_dispatch_sync) { SendAsync(cmd_msg_cb()); + + auto epoch = fb2::FiberSwitchEpoch(); + + if (async_fiber_epoch_ == epoch) { + // If we pushed too many items without context switching - yield + if (++async_streak_len_ >= 10 && !cc_->async_dispatch) { + async_streak_len_ = 0; + ThisFiber::Yield(); + } + } else { + async_streak_len_ = 0; + async_fiber_epoch_ = epoch; + } } else { ShrinkPipelinePool(); // Gradually release pipeline request pool. { @@ -1421,7 +1434,7 @@ void Connection::ExecutionFiber(util::FiberSocketBase* peer) { } DCHECK(queue_backpressure_ == &tl_queue_backpressure_); - if (stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit || + if (!queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes) || dispatch_q_.empty()) { queue_backpressure_->pipeline_cnd.notify_all(); // very cheap if noone is waiting on it. } diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 34d336db6f9a..3b5513cea4f8 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -327,6 +327,10 @@ class Connection : public util::Connection { // Block until subscriber memory usage is below limit, can be called from any thread. void EnsureBelowLimit(); + bool IsPipelineBufferOverLimit(size_t size) const { + return size >= pipeline_buffer_limit; + } + // Used by publisher/subscriber actors to make sure we do not publish too many messages // into the queue. Thread-safe to allow safe access in EnsureBelowLimit. util::fb2::EventCount ec; @@ -428,6 +432,10 @@ class Connection : public util::Connection { unsigned parser_error_ = 0; + // amount of times we enqued requests asynchronously during the same async_fiber_epoch_. + unsigned async_streak_len_ = 0; + uint64_t async_fiber_epoch_ = 0; + BreakerCb breaker_cb_; // Used by redis parser to avoid allocations diff --git a/src/server/replica.cc b/src/server/replica.cc index 6fc78439435b..3e8b7d39562b 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -23,7 +23,6 @@ extern "C" { #include #include "base/logging.h" -#include "facade/dragonfly_connection.h" #include "facade/redis_parser.h" #include "server/error.h" #include "server/journal/executor.h"