Skip to content

Commit

Permalink
chore: comments
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Jun 9, 2024
1 parent 84aa1e6 commit 17fc5a9
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 30 deletions.
51 changes: 22 additions & 29 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ void Connection::OnConnectionStart() {
}

void Connection::HandleRequests() {
VLOG(1) << "[" << id_ << "HandleRequests";
VLOG(1) << "[" << id_ << "] HandleRequests";

if (absl::GetFlag(FLAGS_tcp_nodelay) && !socket_->IsUDS()) {
int val = 1;
Expand Down Expand Up @@ -955,36 +955,28 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {

void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_cb,
absl::FunctionRef<MessageHandle()> cmd_msg_cb) {
// Avoid sync dispatch if we can interleave with an ongoing async dispatch
bool can_dispatch_sync = !cc_->async_dispatch;

// Avoid sync dispatch if we already have pending async messages or
// can potentially receive some (subscriptions > 0)
if (can_dispatch_sync && (!dispatch_q_.empty() || cc_->subscriptions > 0)) {
DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
if (stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit) {
can_dispatch_sync = false;
} else {
fb2::NoOpLock noop;
queue_backpressure_->pipeline_cnd.wait(noop, [this] {
bool res = stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit ||
(dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing;
return res;
});
if (cc_->conn_closing)
return;
DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
bool optimize_for_async = has_more;

if (stats_->dispatch_queue_bytes > queue_backpressure_->pipeline_buffer_limit) {
fb2::NoOpLock noop;
queue_backpressure_->pipeline_cnd.wait(noop, [this] {
bool res = stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit ||
(dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing;
return res;
});
if (cc_->conn_closing)
return;

// Prefer sending synchronous request if possible (can_dispatch_sync=false),
// to reduce the memory pressure.
has_more = false;
if (cc_->async_dispatch || !dispatch_q_.empty() || cc_->subscriptions > 0) {
can_dispatch_sync = false;
}
}
// prefer synchronous dispatching to save memory.
optimize_for_async = false;
}

// Avoid sync dispatch if we can interleave with an ongoing async dispatch.
bool can_dispatch_sync = !cc_->async_dispatch && dispatch_q_.empty() && cc_->subscriptions == 0;

// Dispatch async if we're handling a pipeline or if we can't dispatch sync.
if (has_more || !can_dispatch_sync) {
if (optimize_for_async || !can_dispatch_sync) {
SendAsync(cmd_msg_cb());
} else {
ShrinkPipelinePool(); // Gradually release pipeline request pool.
Expand Down Expand Up @@ -1392,15 +1384,16 @@ void Connection::ExecutionFiber(util::FiberSocketBase* peer) {

builder->SetBatchMode(dispatch_q_.size() > 1);

bool subscriber_over_limit =
stats_->dispatch_queue_subscriber_bytes >= queue_backpressure_->publish_buffer_limit;

// Special case: if the dispatch queue accumulated a big number of commands,
// we can try to squash them
// It is only enabled if the threshold is reached and the whole dispatch queue
// consists only of commands (no pubsub or monitor messages)
bool squashing_enabled = squashing_threshold > 0;
bool threshold_reached = pending_pipeline_cmd_cnt_ > squashing_threshold;
bool are_all_plain_cmds = pending_pipeline_cmd_cnt_ == dispatch_q_.size();
bool subscriber_over_limit =
stats_->dispatch_queue_subscriber_bytes >= queue_backpressure_->publish_buffer_limit;
if (squashing_enabled && threshold_reached && are_all_plain_cmds && !skip_next_squashing_) {
SquashPipeline(builder);
} else {
Expand Down
2 changes: 1 addition & 1 deletion tests/dragonfly/connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ async def resub(s: "aioredis.PubSub", sub: bool, chan: str):


@pytest.mark.slow
@dfly_args({"proactor_threads": "1", "subscriber_thread_limit": "100"})
@dfly_args({"proactor_threads": "1", "publish_buffer_limit": "100"})
async def test_publish_stuck(df_server: DflyInstance, async_client: aioredis.Redis):
reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port, limit=10)
writer.write(b"SUBSCRIBE channel\r\n")
Expand Down

0 comments on commit 17fc5a9

Please sign in to comment.