From ce2e127183ba16f9a2952544932c0a782361746f Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 6 Jun 2024 08:09:40 +0300 Subject: [PATCH] chore: call breaker_cb_ on shutdown (#3128) * chore: call breaker_cb_ on shutdown --------- Signed-off-by: Roman Gershman --- .../reusable-container-workflow.yaml | 2 +- src/facade/dragonfly_connection.cc | 35 +++++++------------ src/facade/dragonfly_connection.h | 7 +--- 3 files changed, 14 insertions(+), 30 deletions(-) diff --git a/.github/workflows/reusable-container-workflow.yaml b/.github/workflows/reusable-container-workflow.yaml index 35981ce1c4ed..8998a5dc32e3 100644 --- a/.github/workflows/reusable-container-workflow.yaml +++ b/.github/workflows/reusable-container-workflow.yaml @@ -150,7 +150,7 @@ jobs: - if: ${{ hashFiles(format('{0}-{1}', matrix.dockerfile, inputs.build_type)) }} name: Build release image for arm64 - uses: docker/build-push-action@v3 + uses: docker/build-push-action@v5 with: context: . platforms: linux/arm64 diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index c5d5370a27fc..55ef12f68d97 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -552,11 +552,7 @@ Connection::~Connection() { void Connection::OnShutdown() { VLOG(1) << "Connection::OnShutdown"; - if (shutdown_cb_) { - for (const auto& k_v : shutdown_cb_->map) { - k_v.second(); - } - } + BreakOnce(POLLHUP); } void Connection::OnPreMigrateThread() { @@ -600,21 +596,6 @@ void Connection::OnPostMigrateThread() { } } -auto Connection::RegisterShutdownHook(ShutdownCb cb) -> ShutdownHandle { - if (!shutdown_cb_) { - shutdown_cb_ = make_unique(); - } - return shutdown_cb_->Add(std::move(cb)); -} - -void Connection::UnregisterShutdownHook(ShutdownHandle id) { - if (shutdown_cb_) { - shutdown_cb_->Remove(id); - if (shutdown_cb_->map.empty()) - shutdown_cb_.reset(); - } -} - void Connection::HandleRequests() { ThisFiber::SetName("DflyConnection"); @@ -1103,8 +1084,7 @@ void Connection::OnBreakCb(int32_t mask) { << cc_->reply_builder()->IsSendActive() << " " << cc_->reply_builder()->GetError(); cc_->conn_closing = true; - - breaker_cb_(mask); + BreakOnce(mask); evc_.notify(); // Notify dispatch fiber. } @@ -1546,7 +1526,7 @@ void Connection::SendInvalidationMessageAsync(InvalidationMessage msg) { void Connection::LaunchDispatchFiberIfNeeded() { if (!dispatch_fb_.IsJoinable() && !migration_in_process_) { - VLOG(1) << "LaunchDispatchFiberIfNeeded " << GetClientId(); + VLOG(1) << "[" << id_ << "] LaunchDispatchFiberIfNeeded "; dispatch_fb_ = fb2::Fiber(fb2::Launch::post, "connection_dispatch", [&, peer = socket_.get()]() { DispatchFiber(peer); }); } @@ -1718,6 +1698,15 @@ void Connection::DecreaseStatsOnClose() { --stats_->num_conns; } +void Connection::BreakOnce(uint32_t ev_mask) { + if (breaker_cb_) { + DVLOG(1) << "[" << id_ << "] Connection::breaker_cb_ " << ev_mask; + auto fun = std::move(breaker_cb_); + DCHECK(!breaker_cb_); + fun(ev_mask); + } +} + Connection::WeakRef::WeakRef(std::shared_ptr ptr, QueueBackpressure* backpressure, unsigned thread, uint32_t client_id) : ptr_{ptr}, backpressure_{backpressure}, thread_{thread}, client_id_{client_id} { diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 4521846f05d5..932f8e924eed 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -238,11 +238,6 @@ class Connection : public util::Connection { // reached. Blocks until free space is available. Controlled with `pipeline_queue_limit` flag. void EnsureAsyncMemoryBudget(); - // Register hook that is executed on connection shutdown. - ShutdownHandle RegisterShutdownHook(ShutdownCb cb); - - void UnregisterShutdownHook(ShutdownHandle id); - // Register hook that is executen when the connection breaks. void RegisterBreakHook(BreakerCb breaker_cb); @@ -400,6 +395,7 @@ class Connection : public util::Connection { private: void DecreaseStatsOnClose(); + void BreakOnce(uint32_t ev_mask); std::deque dispatch_q_; // dispatch queue util::fb2::EventCount evc_; // dispatch queue waker @@ -428,7 +424,6 @@ class Connection : public util::Connection { unsigned parser_error_ = 0; BreakerCb breaker_cb_; - std::unique_ptr shutdown_cb_; // Used by redis parser to avoid allocations RespVec tmp_parse_args_;