Skip to content

Commit

Permalink
chore: comments
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Jun 10, 2024
1 parent 9b787f6 commit c0fed68
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 6 deletions.
1 change: 1 addition & 0 deletions .github/workflows/daily-builds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ jobs:
ctest -V -L DFLY
build-macos:
if: false
runs-on: macos-12
timeout-minutes: 45
steps:
Expand Down
23 changes: 18 additions & 5 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -958,12 +958,12 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_
DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
bool optimize_for_async = has_more;

if (stats_->dispatch_queue_bytes > queue_backpressure_->pipeline_buffer_limit) {
if (optimize_for_async &&
queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes)) {
fb2::NoOpLock noop;
queue_backpressure_->pipeline_cnd.wait(noop, [this] {
bool res = stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit ||
(dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing;
return res;
return !queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes) ||
(dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing;
});
if (cc_->conn_closing)
return;
Expand All @@ -978,6 +978,19 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_
// Dispatch async if we're handling a pipeline or if we can't dispatch sync.
if (optimize_for_async || !can_dispatch_sync) {
SendAsync(cmd_msg_cb());

auto epoch = fb2::FiberSwitchEpoch();

if (async_fiber_epoch_ == epoch) {
// If we pushed too many items without context switching - yield
if (++async_streak_len_ >= 10 && !cc_->async_dispatch) {
async_streak_len_ = 0;
ThisFiber::Yield();
}
} else {
async_streak_len_ = 0;
async_fiber_epoch_ = epoch;
}
} else {
ShrinkPipelinePool(); // Gradually release pipeline request pool.
{
Expand Down Expand Up @@ -1421,7 +1434,7 @@ void Connection::ExecutionFiber(util::FiberSocketBase* peer) {
}

DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
if (stats_->dispatch_queue_bytes < queue_backpressure_->pipeline_buffer_limit ||
if (!queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes) ||
dispatch_q_.empty()) {
queue_backpressure_->pipeline_cnd.notify_all(); // very cheap if noone is waiting on it.
}
Expand Down
8 changes: 8 additions & 0 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,10 @@ class Connection : public util::Connection {
// Block until subscriber memory usage is below limit, can be called from any thread.
void EnsureBelowLimit();

bool IsPipelineBufferOverLimit(size_t size) const {
return size >= pipeline_buffer_limit;
}

// Used by publisher/subscriber actors to make sure we do not publish too many messages
// into the queue. Thread-safe to allow safe access in EnsureBelowLimit.
util::fb2::EventCount ec;
Expand Down Expand Up @@ -428,6 +432,10 @@ class Connection : public util::Connection {

unsigned parser_error_ = 0;

// amount of times we enqued requests asynchronously during the same async_fiber_epoch_.
unsigned async_streak_len_ = 0;
uint64_t async_fiber_epoch_ = 0;

BreakerCb breaker_cb_;

// Used by redis parser to avoid allocations
Expand Down
1 change: 0 additions & 1 deletion src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ extern "C" {
#include <utility>

#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "facade/redis_parser.h"
#include "server/error.h"
#include "server/journal/executor.h"
Expand Down

0 comments on commit c0fed68

Please sign in to comment.