Skip to content

Commit

Permalink
chore: call breaker_cb_ on shutdown (#3128)
Browse files Browse the repository at this point in the history
* chore: call breaker_cb_ on shutdown
---------

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Jun 6, 2024
1 parent 229eeeb commit ce2e127
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/reusable-container-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ jobs:

- if: ${{ hashFiles(format('{0}-{1}', matrix.dockerfile, inputs.build_type)) }}
name: Build release image for arm64
uses: docker/build-push-action@v3
uses: docker/build-push-action@v5
with:
context: .
platforms: linux/arm64
Expand Down
35 changes: 12 additions & 23 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -552,11 +552,7 @@ Connection::~Connection() {
void Connection::OnShutdown() {
VLOG(1) << "Connection::OnShutdown";

if (shutdown_cb_) {
for (const auto& k_v : shutdown_cb_->map) {
k_v.second();
}
}
BreakOnce(POLLHUP);
}

void Connection::OnPreMigrateThread() {
Expand Down Expand Up @@ -600,21 +596,6 @@ void Connection::OnPostMigrateThread() {
}
}

auto Connection::RegisterShutdownHook(ShutdownCb cb) -> ShutdownHandle {
if (!shutdown_cb_) {
shutdown_cb_ = make_unique<Shutdown>();
}
return shutdown_cb_->Add(std::move(cb));
}

void Connection::UnregisterShutdownHook(ShutdownHandle id) {
if (shutdown_cb_) {
shutdown_cb_->Remove(id);
if (shutdown_cb_->map.empty())
shutdown_cb_.reset();
}
}

void Connection::HandleRequests() {
ThisFiber::SetName("DflyConnection");

Expand Down Expand Up @@ -1103,8 +1084,7 @@ void Connection::OnBreakCb(int32_t mask) {
<< cc_->reply_builder()->IsSendActive() << " " << cc_->reply_builder()->GetError();

cc_->conn_closing = true;

breaker_cb_(mask);
BreakOnce(mask);
evc_.notify(); // Notify dispatch fiber.
}

Expand Down Expand Up @@ -1546,7 +1526,7 @@ void Connection::SendInvalidationMessageAsync(InvalidationMessage msg) {

void Connection::LaunchDispatchFiberIfNeeded() {
if (!dispatch_fb_.IsJoinable() && !migration_in_process_) {
VLOG(1) << "LaunchDispatchFiberIfNeeded " << GetClientId();
VLOG(1) << "[" << id_ << "] LaunchDispatchFiberIfNeeded ";
dispatch_fb_ = fb2::Fiber(fb2::Launch::post, "connection_dispatch",
[&, peer = socket_.get()]() { DispatchFiber(peer); });
}
Expand Down Expand Up @@ -1718,6 +1698,15 @@ void Connection::DecreaseStatsOnClose() {
--stats_->num_conns;
}

void Connection::BreakOnce(uint32_t ev_mask) {
if (breaker_cb_) {
DVLOG(1) << "[" << id_ << "] Connection::breaker_cb_ " << ev_mask;
auto fun = std::move(breaker_cb_);
DCHECK(!breaker_cb_);
fun(ev_mask);
}
}

Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, QueueBackpressure* backpressure,
unsigned thread, uint32_t client_id)
: ptr_{ptr}, backpressure_{backpressure}, thread_{thread}, client_id_{client_id} {
Expand Down
7 changes: 1 addition & 6 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,6 @@ class Connection : public util::Connection {
// reached. Blocks until free space is available. Controlled with `pipeline_queue_limit` flag.
void EnsureAsyncMemoryBudget();

// Register hook that is executed on connection shutdown.
ShutdownHandle RegisterShutdownHook(ShutdownCb cb);

void UnregisterShutdownHook(ShutdownHandle id);

// Register hook that is executen when the connection breaks.
void RegisterBreakHook(BreakerCb breaker_cb);

Expand Down Expand Up @@ -400,6 +395,7 @@ class Connection : public util::Connection {

private:
void DecreaseStatsOnClose();
void BreakOnce(uint32_t ev_mask);

std::deque<MessageHandle> dispatch_q_; // dispatch queue
util::fb2::EventCount evc_; // dispatch queue waker
Expand Down Expand Up @@ -428,7 +424,6 @@ class Connection : public util::Connection {
unsigned parser_error_ = 0;

BreakerCb breaker_cb_;
std::unique_ptr<Shutdown> shutdown_cb_;

// Used by redis parser to avoid allocations
RespVec tmp_parse_args_;
Expand Down

0 comments on commit ce2e127

Please sign in to comment.