Skip to content

Commit

Permalink
chore: allow limiting pipelining queue by length (#3551)
Browse files Browse the repository at this point in the history
* chore: allow limiting pipelining queue by length

We already allow limiting the queue by memory usage but it also makes sense to limit by depth,
so that in extreme cases we would provide backpressure back to client connections. Otherwise if we parse and read everything,
clients do not have a sense of how loaded the connection is on the server side.

---------

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Aug 24, 2024
1 parent 9f7cbc7 commit 52b3866
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 50 deletions.
82 changes: 61 additions & 21 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ ABSL_FLAG(uint64_t, publish_buffer_limit, 128_MB,

ABSL_FLAG(bool, no_tls_on_admin_port, false, "Allow non-tls connections on admin port");

ABSL_FLAG(uint64_t, pipeline_squash, 10,
ABSL_FLAG(uint32_t, pipeline_squash, 10,
"Number of queued pipelined commands above which squashing is enabled, 0 means disabled");

ABSL_FLAG(uint32_t, pipeline_queue_limit, 1000,
"Pipeline queue max length, the server will stop reading from the client socket"
" once the pipeline reaches this limit");

// When changing this constant, also update `test_large_cmd` test in connection_test.py.
ABSL_FLAG(uint32_t, max_multi_bulk_len, 1u << 16,
"Maximum multi-bulk (array) length that is "
Expand All @@ -73,6 +77,7 @@ ABSL_FLAG(bool, migrate_connections, true,
"happen at most once per connection.");

using namespace util;
using absl::GetFlag;
using nonstd::make_unexpected;

namespace facade {
Expand Down Expand Up @@ -254,11 +259,37 @@ const char* kPhaseName[Connection::NUM_PHASES] = {"SETUP", "READ", "PROCESS", "S

} // namespace

// Keeps track of total per-thread sizes of dispatch queues to limit memory taken up by messages
// in these queues.
struct Connection::QueueBackpressure {
// Block until subscriber memory usage is below limit, can be called from any thread.
void EnsureBelowLimit();

bool IsPipelineBufferOverLimit(size_t size, uint32_t q_len) const {
return size >= pipeline_buffer_limit || q_len > pipeline_queue_max_len;
}

// Used by publisher/subscriber actors to make sure we do not publish too many messages
// into the queue. Thread-safe to allow safe access in EnsureBelowLimit.
util::fb2::EventCount pubsub_ec;
std::atomic_size_t subscriber_bytes = 0;

// Used by pipelining/execution fiber to throttle the incoming pipeline messages.
// Used together with pipeline_buffer_limit to limit the pipeline usage per thread.
util::fb2::CondVarAny pipeline_cnd;

size_t publish_buffer_limit = 0; // cached flag publish_buffer_limit
size_t pipeline_cache_limit = 0; // cached flag pipeline_cache_limit
size_t pipeline_buffer_limit = 0; // cached flag for buffer size in bytes
uint32_t pipeline_queue_max_len = 256; // cached flag for pipeline queue max length.
};

thread_local vector<Connection::PipelineMessagePtr> Connection::pipeline_req_pool_;
thread_local Connection::QueueBackpressure Connection::tl_queue_backpressure_;

void Connection::QueueBackpressure::EnsureBelowLimit() {
ec.await([this] { return subscriber_bytes.load(memory_order_relaxed) <= publish_buffer_limit; });
pubsub_ec.await(
[this] { return subscriber_bytes.load(memory_order_relaxed) <= publish_buffer_limit; });
}

struct Connection::Shutdown {
Expand Down Expand Up @@ -497,7 +528,7 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,

switch (protocol) {
case Protocol::REDIS:
redis_parser_.reset(new RedisParser(absl::GetFlag(FLAGS_max_multi_bulk_len)));
redis_parser_.reset(new RedisParser(GetFlag(FLAGS_max_multi_bulk_len)));
break;
case Protocol::MEMCACHE:
memcache_parser_.reset(new MemcacheParser);
Expand All @@ -508,7 +539,7 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
last_interaction_ = creation_time_;
id_ = next_id.fetch_add(1, memory_order_relaxed);

migration_enabled_ = absl::GetFlag(FLAGS_migrate_connections);
migration_enabled_ = GetFlag(FLAGS_migrate_connections);

// Create shared_ptr with empty value and associate it with `this` pointer (aliasing constructor).
// We use it for reference counting and accessing `this` (without managing it).
Expand Down Expand Up @@ -585,13 +616,16 @@ void Connection::OnConnectionStart() {
// We must initialize tl_queue_backpressure_ here and not in the c'tor because a connection object
// may be created in a differrent thread from where it runs.
if (tl_queue_backpressure_.publish_buffer_limit == 0) {
tl_queue_backpressure_.publish_buffer_limit = absl::GetFlag(FLAGS_publish_buffer_limit);
tl_queue_backpressure_.pipeline_cache_limit = absl::GetFlag(FLAGS_request_cache_limit);
tl_queue_backpressure_.pipeline_buffer_limit = absl::GetFlag(FLAGS_pipeline_buffer_limit);
tl_queue_backpressure_.publish_buffer_limit = GetFlag(FLAGS_publish_buffer_limit);
tl_queue_backpressure_.pipeline_cache_limit = GetFlag(FLAGS_request_cache_limit);
tl_queue_backpressure_.pipeline_buffer_limit = GetFlag(FLAGS_pipeline_buffer_limit);
tl_queue_backpressure_.pipeline_queue_max_len = GetFlag(FLAGS_pipeline_queue_limit);

if (tl_queue_backpressure_.publish_buffer_limit == 0 ||
tl_queue_backpressure_.pipeline_cache_limit == 0 ||
tl_queue_backpressure_.pipeline_buffer_limit == 0) {
LOG(ERROR) << "Buffer limit settings are 0";
tl_queue_backpressure_.pipeline_buffer_limit == 0 ||
tl_queue_backpressure_.pipeline_queue_max_len == 0) {
LOG(ERROR) << "pipeline flag limit is 0";
exit(-1);
}
}
Expand All @@ -603,7 +637,7 @@ void Connection::OnConnectionStart() {
void Connection::HandleRequests() {
VLOG(1) << "[" << id_ << "] HandleRequests";

if (absl::GetFlag(FLAGS_tcp_nodelay) && !socket_->IsUDS()) {
if (GetFlag(FLAGS_tcp_nodelay) && !socket_->IsUDS()) {
int val = 1;
int res = setsockopt(socket_->native_handle(), IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
DCHECK_EQ(res, 0);
Expand All @@ -614,7 +648,7 @@ void Connection::HandleRequests() {
FiberSocketBase* peer = socket_.get();
#ifdef DFLY_USE_SSL
if (ssl_ctx_) {
const bool no_tls_on_admin_port = absl::GetFlag(FLAGS_no_tls_on_admin_port);
const bool no_tls_on_admin_port = GetFlag(FLAGS_no_tls_on_admin_port);
if (!(IsPrivileged() && no_tls_on_admin_port)) {
// Must be done atomically before the premption point in Accept so that at any
// point in time, the socket_ is defined.
Expand Down Expand Up @@ -808,7 +842,7 @@ io::Result<bool> Connection::CheckForHttpProto(FiberSocketBase* peer) {
return false;
}

const bool primary_port_enabled = absl::GetFlag(FLAGS_primary_port_http_enabled);
const bool primary_port_enabled = GetFlag(FLAGS_primary_port_http_enabled);
if (!primary_port_enabled && !IsPrivileged()) {
return false;
}
Expand Down Expand Up @@ -946,12 +980,13 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_
DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
bool optimize_for_async = has_more;

if (optimize_for_async &&
queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes)) {
if (optimize_for_async && queue_backpressure_->IsPipelineBufferOverLimit(
stats_->dispatch_queue_bytes, dispatch_q_.size())) {
fb2::NoOpLock noop;
queue_backpressure_->pipeline_cnd.wait(noop, [this] {
return !queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes) ||
(dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing;
bool over_limits = queue_backpressure_->IsPipelineBufferOverLimit(
stats_->dispatch_queue_bytes, dispatch_q_.size());
return !over_limits || (dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing;
});
if (cc_->conn_closing)
return;
Expand Down Expand Up @@ -1159,7 +1194,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
error_code ec;
ParserStatus parse_status = OK;

size_t max_iobfuf_len = absl::GetFlag(FLAGS_max_client_iobuf_len);
size_t max_iobfuf_len = GetFlag(FLAGS_max_client_iobuf_len);

do {
HandleMigrateRequest();
Expand Down Expand Up @@ -1319,7 +1354,7 @@ void Connection::ClearPipelinedMessages() {

dispatch_q_.clear();
queue_backpressure_->pipeline_cnd.notify_all();
queue_backpressure_->ec.notifyAll();
queue_backpressure_->pubsub_ec.notifyAll();
}

std::string Connection::DebugInfo() const {
Expand Down Expand Up @@ -1356,7 +1391,7 @@ void Connection::ExecutionFiber(util::FiberSocketBase* peer) {
SinkReplyBuilder* builder = cc_->reply_builder();
DispatchOperations dispatch_op{builder, this};

size_t squashing_threshold = absl::GetFlag(FLAGS_pipeline_squash);
size_t squashing_threshold = GetFlag(FLAGS_pipeline_squash);

uint64_t prev_epoch = fb2::FiberSwitchEpoch();
fb2::NoOpLock noop_lk;
Expand Down Expand Up @@ -1424,14 +1459,15 @@ void Connection::ExecutionFiber(util::FiberSocketBase* peer) {
}

DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
if (!queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes) ||
if (!queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes,
dispatch_q_.size()) ||
dispatch_q_.empty()) {
queue_backpressure_->pipeline_cnd.notify_all(); // very cheap if noone is waiting on it.
}

if (subscriber_over_limit &&
stats_->dispatch_queue_subscriber_bytes < queue_backpressure_->publish_buffer_limit)
queue_backpressure_->ec.notify();
queue_backpressure_->pubsub_ec.notify();
}

DCHECK(cc_->conn_closing || builder->GetError());
Expand Down Expand Up @@ -1748,6 +1784,10 @@ void Connection::BreakOnce(uint32_t ev_mask) {
}
}

void Connection::SetMaxQueueLenThreadLocal(uint32_t val) {
tl_queue_backpressure_.pipeline_queue_max_len = val;
}

Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, QueueBackpressure* backpressure,
unsigned thread, uint32_t client_id)
: ptr_{ptr}, backpressure_{backpressure}, thread_{thread}, client_id_{client_id} {
Expand Down
34 changes: 5 additions & 29 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,43 +303,23 @@ class Connection : public util::Connection {

bool IsHttp() const;

// Sets max queue length locally in the calling thread.
static void SetMaxQueueLenThreadLocal(uint32_t val);

protected:
void OnShutdown() override;
void OnPreMigrateThread() override;
void OnPostMigrateThread() override;

std::unique_ptr<ConnectionContext> cc_; // Null for http connections

private:
enum ParserStatus { OK, NEED_MORE, ERROR };

struct DispatchOperations;
struct DispatchCleanup;
struct Shutdown;

// Keeps track of total per-thread sizes of dispatch queues to limit memory taken up by messages
// in these queues.
struct QueueBackpressure {
// Block until subscriber memory usage is below limit, can be called from any thread.
void EnsureBelowLimit();

bool IsPipelineBufferOverLimit(size_t size) const {
return size >= pipeline_buffer_limit;
}

// Used by publisher/subscriber actors to make sure we do not publish too many messages
// into the queue. Thread-safe to allow safe access in EnsureBelowLimit.
util::fb2::EventCount ec;
std::atomic_size_t subscriber_bytes = 0;

// Used by pipelining/execution fiber to throttle the incoming pipeline messages.
// Used together with pipeline_buffer_limit to limit the pipeline usage per thread.
util::fb2::CondVarAny pipeline_cnd;

size_t publish_buffer_limit = 0; // cached flag publish_buffer_limit
size_t pipeline_cache_limit = 0; // cached flag pipeline_cache_limit
size_t pipeline_buffer_limit = 0; // cached flag for buffer size in bytes
};

private:
// Check protocol and handle connection.
void HandleRequests() final;

Expand Down Expand Up @@ -394,10 +374,6 @@ class Connection : public util::Connection {

std::pair<std::string, std::string> GetClientInfoBeforeAfterTid() const;

protected:
std::unique_ptr<ConnectionContext> cc_; // Null for http connections

private:
void DecreaseStatsOnClose();
void BreakOnce(uint32_t ev_mask);

Expand Down
10 changes: 10 additions & 0 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,16 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
config_registry.RegisterMutable("dbfilename");
config_registry.RegisterMutable("table_growth_margin");
config_registry.RegisterMutable("pipeline_squash");
config_registry.RegisterMutable("pipeline_queue_limit",
[pool = &pp_](const absl::CommandLineFlag& flag) {
auto res = flag.TryGet<uint32_t>();
if (res.has_value()) {
pool->AwaitBrief([val = *res](unsigned, auto*) {
facade::Connection::SetMaxQueueLenThreadLocal(val);
});
}
return res.has_value();
});

serialization_max_chunk_size = GetFlag(FLAGS_serialization_max_chunk_size);
uint32_t shard_num = GetFlag(FLAGS_num_shards);
Expand Down

0 comments on commit 52b3866

Please sign in to comment.