Skip to content

Commit

Permalink
chore: comments
Browse files Browse the repository at this point in the history
  • Loading branch information
romange committed Jun 10, 2024
1 parent 9b787f6 commit 0b337d5
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
10 changes: 5 additions & 5 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -958,12 +958,12 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_
DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
bool optimize_for_async = has_more;

if (stats_->dispatch_queue_bytes > queue_backpressure_->pipeline_buffer_limit) {
if (optimize_for_async &&
queue_backpressure_->PipelineBufferOverLimit(stats_->dispatch_queue_bytes)) {
fb2::NoOpLock noop;
queue_backpressure_->pipeline_cnd.wait(noop, [this] {
bool res = stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit ||
(dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing;
return res;
return !queue_backpressure_->PipelineBufferOverLimit(stats_->dispatch_queue_bytes) ||
(dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing;
});
if (cc_->conn_closing)
return;
Expand Down Expand Up @@ -1421,7 +1421,7 @@ void Connection::ExecutionFiber(util::FiberSocketBase* peer) {
}

DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
if (stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit ||
if (!queue_backpressure_->PipelineBufferOverLimit(stats_->dispatch_queue_bytes) ||
dispatch_q_.empty()) {
queue_backpressure_->pipeline_cnd.notify_all(); // very cheap if noone is waiting on it.
}
Expand Down
4 changes: 4 additions & 0 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,10 @@ class Connection : public util::Connection {
// Block until subscriber memory usage is below limit, can be called from any thread.
void EnsureBelowLimit();

bool IsPipelineBufferOverLimit(size_t size) const {
return size >= pipeline_buffer_limit;
}

// Used by publisher/subscriber actors to make sure we do not publish too many messages
// into the queue. Thread-safe to allow safe access in EnsureBelowLimit.
util::fb2::EventCount ec;
Expand Down

0 comments on commit 0b337d5

Please sign in to comment.