From 4b278e930098e233f9e8aeb71bdc2004506e71d8 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Wed, 19 Jun 2024 23:48:05 -0400 Subject: [PATCH] perf(consensus): Remove reactor rlocks on Consensus (#3211) PR with @ebuchman !!! Remove (most) of the reactor RLock's on consensus! (Remaining one is in the gossip routine, should be easy to fix, but as a separate PR) The consensus reactor on ingesting messages takes RLock's on Consensus mutex, preventing us from adding things to the queue. (And therefore blocking behavior) This can cause the peer message queue to be blocked. Now it won't be blocked because we update the round state directly from the cs state update routine (via event bus) when: - A vote is added - A block part is added - We enter a new consensus step. (Receiving a full block triggers this for us, we will enter prevote or precommit round) This shouldn't change reactor message validity, because Reactor.Receive could always view a cs.RoundState that will be old when the packet reaches the cs.HandleMsg queue. Every update to consensus' roundstate pushes the update into the reactor's view, so we avoid locks. I don't think any new tests are required! --- #### PR checklist - [x] Tests written/updated - [x] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog) - [ ] Updated relevant documentation (`docs/` or `spec/`) and code comments - [x] Title follows the [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec --------- Co-authored-by: Ethan Buchman Co-authored-by: Anton Kaliaev --- ...ake-cs-reactor-no-longer-takes-cs-locks.md | 4 ++ internal/consensus/reactor.go | 43 ++++++++++++------- internal/consensus/state.go | 10 ++++- 3 files changed, 40 insertions(+), 17 deletions(-) create mode 100644 .changelog/unreleased/improvements/3211-make-cs-reactor-no-longer-takes-cs-locks.md diff --git a/.changelog/unreleased/improvements/3211-make-cs-reactor-no-longer-takes-cs-locks.md b/.changelog/unreleased/improvements/3211-make-cs-reactor-no-longer-takes-cs-locks.md new file mode 100644 index 00000000000..72fae64598e --- /dev/null +++ b/.changelog/unreleased/improvements/3211-make-cs-reactor-no-longer-takes-cs-locks.md @@ -0,0 +1,4 @@ +- `[consensus]` Make the consensus reactor no longer have packets on receive take the consensus lock. +Consensus will now update the reactor's view after every relevant change through the existing +synchronous event bus subscription. + ([\#3211](https://github.com/cometbft/cometbft/pull/3211)) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 28712732fd2..2987f0c3162 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -46,8 +46,9 @@ type Reactor struct { waitSync atomic.Bool eventBus *types.EventBus - rsMtx cmtsync.Mutex - rs *cstypes.RoundState + rsMtx cmtsync.RWMutex + rs *cstypes.RoundState + initialHeight int64 // under rsMtx Metrics *Metrics } @@ -267,9 +268,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) { case StateChannel: switch msg := msg.(type) { case *NewRoundStepMessage: - conR.conS.mtx.RLock() - initialHeight := conR.conS.state.InitialHeight - conR.conS.mtx.RUnlock() + conR.rsMtx.RLock() + initialHeight := conR.initialHeight + conR.rsMtx.RUnlock() if err = msg.ValidateHeight(initialHeight); err != nil { conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err) conR.Switch.StopPeerForError(e.Src, err) @@ -283,10 +284,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) { case *HasProposalBlockPartMessage: ps.ApplyHasProposalBlockPartMessage(msg) case *VoteSetMaj23Message: - cs := conR.conS - cs.mtx.RLock() - height, votes := cs.Height, cs.Votes - cs.mtx.RUnlock() + conR.rsMtx.RLock() + height, votes := conR.rs.Height, conR.rs.Votes + conR.rsMtx.RUnlock() if height != msg.Height { return } @@ -351,9 +351,10 @@ func (conR *Reactor) Receive(e p2p.Envelope) { switch msg := msg.(type) { case *VoteMessage: cs := conR.conS - cs.mtx.RLock() - height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size() - cs.mtx.RUnlock() + + conR.rsMtx.RLock() + height, valSize, lastCommitSize := conR.rs.Height, conR.rs.Validators.Size(), conR.rs.LastCommit.Size() + conR.rsMtx.RUnlock() ps.SetHasVoteFromPeer(msg.Vote, height, valSize, lastCommitSize) cs.peerMsgQueue <- msgInfo{msg, e.Src.ID(), time.Time{}} @@ -370,10 +371,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) { } switch msg := msg.(type) { case *VoteSetBitsMessage: - cs := conR.conS - cs.mtx.RLock() - height, votes := cs.Height, cs.Votes - cs.mtx.RUnlock() + conR.rsMtx.RLock() + height, votes := conR.rs.Height, conR.rs.Votes + conR.rsMtx.RUnlock() if height == msg.Height { var ourVotes *bits.BitArray @@ -420,6 +420,7 @@ func (conR *Reactor) subscribeToBroadcastEvents() { if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep, func(data cmtevents.EventData) { conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState)) + conR.updateRoundStateNoCsLock() }); err != nil { conR.Logger.Error("Error adding listener for events (NewRoundStep)", "err", err) } @@ -434,6 +435,7 @@ func (conR *Reactor) subscribeToBroadcastEvents() { if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote, func(data cmtevents.EventData) { conR.broadcastHasVoteMessage(data.(*types.Vote)) + conR.updateRoundStateNoCsLock() }); err != nil { conR.Logger.Error("Error adding listener for events (Vote)", "err", err) } @@ -441,6 +443,7 @@ func (conR *Reactor) subscribeToBroadcastEvents() { if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventProposalBlockPart, func(data cmtevents.EventData) { conR.broadcastHasProposalBlockPartMessage(data.(*BlockPartMessage)) + conR.updateRoundStateNoCsLock() }); err != nil { conR.Logger.Error("Error adding listener for events (ProposalBlockPart)", "err", err) } @@ -566,6 +569,14 @@ func (conR *Reactor) updateRoundStateRoutine() { } } +func (conR *Reactor) updateRoundStateNoCsLock() { + rs := conR.conS.getRoundState() + conR.rsMtx.Lock() + conR.rs = rs + conR.initialHeight = conR.conS.state.InitialHeight + conR.rsMtx.Unlock() +} + func (conR *Reactor) getRoundState() *cstypes.RoundState { conR.rsMtx.Lock() defer conR.rsMtx.Unlock() diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 07ef9beef3b..8914d856cbc 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -248,10 +248,18 @@ func (cs *State) GetLastHeight() int64 { } // GetRoundState returns a shallow copy of the internal consensus state. +// This function is thread-safe. func (cs *State) GetRoundState() *cstypes.RoundState { cs.mtx.RLock() - rs := cs.RoundState // copy + rs := cs.getRoundState() cs.mtx.RUnlock() + return rs +} + +// getRoundState returns a shallow copy of the internal consensus state. +// This function is not thread-safe. Use GetRoundState for the thread-safe version. +func (cs *State) getRoundState() *cstypes.RoundState { + rs := cs.RoundState // copy return &rs }