Skip to content

Commit

Permalink
perf(consensus): Remove reactor rlocks on Consensus (cometbft#3211)
Browse files Browse the repository at this point in the history
PR with @ebuchman !!!

Remove (most) of the reactor RLock's on consensus! (Remaining one is in
the gossip routine, should be easy to fix, but as a separate PR)

The consensus reactor on ingesting messages takes RLock's on Consensus
mutex, preventing us from adding things to the queue. (And therefore
blocking behavior)

This can cause the peer message queue to be blocked. Now it won't be
blocked because we update the round state directly from the cs state
update routine (via event bus) when:
- A vote is added
- A block part is added
- We enter a new consensus step. (Receiving a full block triggers this
for us, we will enter prevote or precommit round)

This shouldn't change reactor message validity, because Reactor.Receive
could always view a cs.RoundState that will be old when the packet
reaches the cs.HandleMsg queue. Every update to consensus' roundstate
pushes the update into the reactor's view, so we avoid locks.

I don't think any new tests are required!

---

#### PR checklist

- [x] Tests written/updated
- [x] Changelog entry added in `.changelog` (we use
[unclog](https://github.com/informalsystems/unclog) to manage our
changelog)
- [ ] Updated relevant documentation (`docs/` or `spec/`) and code
comments
- [x] Title follows the [Conventional
Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec

---------

Co-authored-by: Ethan Buchman <[email protected]>
Co-authored-by: Anton Kaliaev <[email protected]>
  • Loading branch information
3 people authored Jun 20, 2024
1 parent 10bfd89 commit 4b278e9
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- `[consensus]` Make the consensus reactor no longer have packets on receive take the consensus lock.
Consensus will now update the reactor's view after every relevant change through the existing
synchronous event bus subscription.
([\#3211](https://github.com/cometbft/cometbft/pull/3211))
43 changes: 27 additions & 16 deletions internal/consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ type Reactor struct {
waitSync atomic.Bool
eventBus *types.EventBus

rsMtx cmtsync.Mutex
rs *cstypes.RoundState
rsMtx cmtsync.RWMutex
rs *cstypes.RoundState
initialHeight int64 // under rsMtx

Metrics *Metrics
}
Expand Down Expand Up @@ -267,9 +268,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
case StateChannel:
switch msg := msg.(type) {
case *NewRoundStepMessage:
conR.conS.mtx.RLock()
initialHeight := conR.conS.state.InitialHeight
conR.conS.mtx.RUnlock()
conR.rsMtx.RLock()
initialHeight := conR.initialHeight
conR.rsMtx.RUnlock()
if err = msg.ValidateHeight(initialHeight); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
Expand All @@ -283,10 +284,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
case *HasProposalBlockPartMessage:
ps.ApplyHasProposalBlockPartMessage(msg)
case *VoteSetMaj23Message:
cs := conR.conS
cs.mtx.RLock()
height, votes := cs.Height, cs.Votes
cs.mtx.RUnlock()
conR.rsMtx.RLock()
height, votes := conR.rs.Height, conR.rs.Votes
conR.rsMtx.RUnlock()
if height != msg.Height {
return
}
Expand Down Expand Up @@ -351,9 +351,10 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
switch msg := msg.(type) {
case *VoteMessage:
cs := conR.conS
cs.mtx.RLock()
height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
cs.mtx.RUnlock()

conR.rsMtx.RLock()
height, valSize, lastCommitSize := conR.rs.Height, conR.rs.Validators.Size(), conR.rs.LastCommit.Size()
conR.rsMtx.RUnlock()
ps.SetHasVoteFromPeer(msg.Vote, height, valSize, lastCommitSize)

cs.peerMsgQueue <- msgInfo{msg, e.Src.ID(), time.Time{}}
Expand All @@ -370,10 +371,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
}
switch msg := msg.(type) {
case *VoteSetBitsMessage:
cs := conR.conS
cs.mtx.RLock()
height, votes := cs.Height, cs.Votes
cs.mtx.RUnlock()
conR.rsMtx.RLock()
height, votes := conR.rs.Height, conR.rs.Votes
conR.rsMtx.RUnlock()

if height == msg.Height {
var ourVotes *bits.BitArray
Expand Down Expand Up @@ -420,6 +420,7 @@ func (conR *Reactor) subscribeToBroadcastEvents() {
if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep,
func(data cmtevents.EventData) {
conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState))
conR.updateRoundStateNoCsLock()
}); err != nil {
conR.Logger.Error("Error adding listener for events (NewRoundStep)", "err", err)
}
Expand All @@ -434,13 +435,15 @@ func (conR *Reactor) subscribeToBroadcastEvents() {
if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote,
func(data cmtevents.EventData) {
conR.broadcastHasVoteMessage(data.(*types.Vote))
conR.updateRoundStateNoCsLock()
}); err != nil {
conR.Logger.Error("Error adding listener for events (Vote)", "err", err)
}

if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventProposalBlockPart,
func(data cmtevents.EventData) {
conR.broadcastHasProposalBlockPartMessage(data.(*BlockPartMessage))
conR.updateRoundStateNoCsLock()
}); err != nil {
conR.Logger.Error("Error adding listener for events (ProposalBlockPart)", "err", err)
}
Expand Down Expand Up @@ -566,6 +569,14 @@ func (conR *Reactor) updateRoundStateRoutine() {
}
}

func (conR *Reactor) updateRoundStateNoCsLock() {
rs := conR.conS.getRoundState()
conR.rsMtx.Lock()
conR.rs = rs
conR.initialHeight = conR.conS.state.InitialHeight
conR.rsMtx.Unlock()
}

func (conR *Reactor) getRoundState() *cstypes.RoundState {
conR.rsMtx.Lock()
defer conR.rsMtx.Unlock()
Expand Down
10 changes: 9 additions & 1 deletion internal/consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,18 @@ func (cs *State) GetLastHeight() int64 {
}

// GetRoundState returns a shallow copy of the internal consensus state.
// This function is thread-safe.
func (cs *State) GetRoundState() *cstypes.RoundState {
cs.mtx.RLock()
rs := cs.RoundState // copy
rs := cs.getRoundState()
cs.mtx.RUnlock()
return rs
}

// getRoundState returns a shallow copy of the internal consensus state.
// This function is not thread-safe. Use GetRoundState for the thread-safe version.
func (cs *State) getRoundState() *cstypes.RoundState {
rs := cs.RoundState // copy
return &rs
}

Expand Down

0 comments on commit 4b278e9

Please sign in to comment.