Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(lib/grandpa): update grandpa message tracker to store messages with map of maps, also store commit messages #1769

Merged
merged 34 commits into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
348cb52
add logs to attemptToFinalize
noot Jul 23, 2021
9516893
lint
noot Jul 23, 2021
a762b37
print stack trace if failed to finalize
noot Jul 23, 2021
76174d8
return in pruning goroutine
noot Jul 23, 2021
2c76a4e
fix
noot Jul 23, 2021
2257fdb
fix VerifyBlockJustification check
noot Jul 23, 2021
1550040
skip failing test
noot Jul 26, 2021
2946d6b
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Jul 26, 2021
72da44e
comment out pause/catch up logic, store highest setID/round in db
noot Jul 27, 2021
a48b299
add tests, cleanup
noot Jul 28, 2021
8310b18
increase grandpa in channel buffer size
noot Jul 28, 2021
7c0c25a
lint
noot Jul 29, 2021
77a5ce0
fix tests
noot Jul 29, 2021
0b221fd
lint
noot Jul 29, 2021
2601406
cleanup
noot Jul 29, 2021
28a8d74
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Jul 29, 2021
9c9796d
fix test
noot Aug 3, 2021
6ad6be6
try to fix unused code
noot Aug 3, 2021
38587fc
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Aug 4, 2021
a27a87f
address comments
noot Aug 4, 2021
cea99e9
fix rpc method chain_getFinalizedHead
noot Aug 4, 2021
1ec6d9f
Merge branch 'development' into noot/grandpa-fix
noot Aug 4, 2021
44f0560
fix merge issues
noot Aug 5, 2021
b283215
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Aug 5, 2021
07c9b8e
debug sync issues
noot Aug 5, 2021
92262a7
make vote handling synchronous
noot Aug 6, 2021
1982566
merge w development, cleanup
noot Aug 25, 2021
a574e51
cleanup
noot Aug 25, 2021
7525524
more cleanup
noot Aug 25, 2021
39277b7
fix tests, address comments
noot Aug 25, 2021
05b53b8
address comments
noot Aug 26, 2021
dede83e
lint
noot Aug 26, 2021
ce7ebf5
fix tests, address comments
noot Aug 30, 2021
362ef19
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Aug 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 17 additions & 23 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func (s *Service) handleSyncStream(stream libp2pnetwork.Stream) {
return
}

s.readStream(stream, s.decodeSyncMessage, s.handleSyncMessage)
s.readStream(stream, decodeSyncMessage, s.handleSyncMessage)
}

func (s *Service) decodeSyncMessage(in []byte, peer peer.ID, inbound bool) (Message, error) {
func decodeSyncMessage(in []byte, _ peer.ID, _ bool) (Message, error) {
msg := new(BlockRequestMessage)
err := msg.Decode(in)
return msg, err
Expand Down Expand Up @@ -198,7 +198,7 @@ func (q *syncQueue) syncAtHead() {

for {
select {
// sleep for average block time TODO: make this configurable from slot duration
// sleep for average block time
case <-t.C:
case <-q.ctx.Done():
return
Expand All @@ -209,7 +209,7 @@ func (q *syncQueue) syncAtHead() {
continue
}

goal := atomic.LoadInt64(&q.goal)
goal := atomic.LoadInt64(&q.goal) - int64(blockRequestSize)

// we aren't at the head yet, sleep
if curr.Number.Int64() < goal && curr.Number.Cmp(prev.Number) > 0 {
Expand All @@ -230,7 +230,7 @@ func (q *syncQueue) syncAtHead() {

prev = curr
start := uint64(curr.Number.Int64()) + 1
logger.Debug("haven't received new blocks since last check, pushing request", "start", start)
logger.Debug("pushing request for blocks...", "start", start)
q.requestData.Delete(start)
q.pushRequest(start, 1, "")
}
Expand Down Expand Up @@ -444,35 +444,29 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) {
goal := atomic.LoadInt64(&q.goal)
if goal < best.Int64() {
atomic.StoreInt64(&q.goal, best.Int64())
return
}

goal = atomic.LoadInt64(&q.goal)
if goal-int64(start) < int64(blockRequestSize) {
start := best.Int64() + 1
req := createBlockRequest(start, 0)

logger.Debug("pushing request to queue", "start", start)
q.requestData.Store(start, requestData{
received: false,
})

q.requestCh <- &syncRequest{
req: req,
to: to,
}
if goal == best.Int64() {
return
}

// all requests must start at a multiple of 128 + 1
m := start % uint64(blockRequestSize)
start = start - m + 1
reqSize := blockRequestSize
if goal-int64(start) < int64(blockRequestSize) {
start = best.Uint64() + 1
reqSize = uint32(goal) - uint32(start)
} else {
// all requests must start at a multiple of 128 + 1
m := start % uint64(blockRequestSize)
start = start - m + 1
}

for i := 0; i < numRequests; i++ {
if start > uint64(goal) {
return
}

req := createBlockRequest(int64(start), blockRequestSize)
req := createBlockRequest(int64(start), reqSize)

if d, has := q.requestData.Load(start); has {
data := d.(requestData)
Expand Down
13 changes: 4 additions & 9 deletions dot/network/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,11 @@ func createBlockRequests(start, end int64) []*BlockRequestMessage {
}

func TestDecodeSyncMessage(t *testing.T) {
s := &Service{
ctx: context.Background(),
}

s.syncQueue = newSyncQueue(s)
testPeer := peer.ID("noot")

reqEnc, err := testBlockRequestMessage.Encode()
require.NoError(t, err)

msg, err := s.decodeSyncMessage(reqEnc, testPeer, true)
msg, err := decodeSyncMessage(reqEnc, testPeer, true)
require.NoError(t, err)

req, ok := msg.(*BlockRequestMessage)
Expand Down Expand Up @@ -468,6 +462,7 @@ func TestSyncQueue_SyncAtHead(t *testing.T) {
time.Sleep(time.Second)
q.ctx = context.Background()
q.slotDuration = time.Millisecond * 100
q.goal = 129

go q.syncAtHead()
time.Sleep(q.slotDuration * 3)
Expand All @@ -484,9 +479,9 @@ func TestSyncQueue_PushRequest_NearHead(t *testing.T) {
q.stop()
time.Sleep(time.Second)
q.ctx = context.Background()
q.goal = 0
q.goal = 129

q.pushRequest(1, 1, "")
q.pushRequest(2, 1, "")
select {
case req := <-q.requestCh:
require.Equal(t, uint64(2), req.req.StartingBlock.Uint64())
Expand Down
1 change: 1 addition & 0 deletions lib/grandpa/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,5 @@ var (

errVoteExists = errors.New("already have vote")
errVoteToSignatureMismatch = errors.New("votes and authority count mismatch")
errInvalidVoteBlock = errors.New("block in vote is not descendant of previously finalised block")
)
2 changes: 1 addition & 1 deletion lib/grandpa/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (s *Service) initiateRound() error {
s.precommits = new(sync.Map)
s.pvEquivocations = make(map[ed25519.PublicKeyBytes][]*SignedVote)
s.pcEquivocations = make(map[ed25519.PublicKeyBytes][]*SignedVote)
s.tracker, err = newTracker(s.blockState, s.in)
s.tracker, err = newTracker(s.blockState, s.messageHandler)
if err != nil {
return err
}
Expand Down
52 changes: 22 additions & 30 deletions lib/grandpa/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package grandpa

import (
"bytes"
"errors"
"fmt"
"math/big"
"reflect"

"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/blocktree"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/crypto/ed25519"
"github.com/ChainSafe/gossamer/lib/scale"
Expand Down Expand Up @@ -51,41 +53,26 @@ func NewMessageHandler(grandpa *Service, blockState BlockState) *MessageHandler
func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network.NotificationsMessage, error) {
logger.Trace("handling grandpa message", "msg", m)

switch m.Type() {
case voteType:
vm, ok := m.(*VoteMessage)
if h.grandpa != nil && ok {
// send vote message to grandpa service
h.grandpa.in <- &networkVoteMessage{
from: from,
msg: vm,
}
}
return nil, nil
case commitType:
if fm, ok := m.(*CommitMessage); ok {
return nil, h.handleCommitMessage(fm)
}
case neighbourType:
nm, ok := m.(*NeighbourMessage)
if !ok {
return nil, nil
switch msg := m.(type) {
case *VoteMessage:
// send vote message to grandpa service
h.grandpa.in <- &networkVoteMessage{
from: from,
msg: msg,
}

return nil, h.handleNeighbourMessage(from, nm)
case catchUpRequestType:
if r, ok := m.(*catchUpRequest); ok {
return h.handleCatchUpRequest(r)
}
case catchUpResponseType:
if r, ok := m.(*catchUpResponse); ok {
return nil, h.handleCatchUpResponse(r)
}
return nil, nil
case *CommitMessage:
return nil, h.handleCommitMessage(msg)
case *NeighbourMessage:
return nil, h.handleNeighbourMessage(from, msg)
case *catchUpRequest:
return h.handleCatchUpRequest(msg)
case *catchUpResponse:
return nil, h.handleCatchUpResponse(msg)
default:
return nil, ErrInvalidMessageType
}

return nil, nil
}

func (h *MessageHandler) handleNeighbourMessage(from peer.ID, msg *NeighbourMessage) error {
Expand Down Expand Up @@ -125,6 +112,11 @@ func (h *MessageHandler) handleCommitMessage(msg *CommitMessage) error {

// check justification here
if err := h.verifyCommitMessageJustification(msg); err != nil {
if errors.Is(err, blocktree.ErrStartNodeNotFound) {
// TODO: make this synchronous
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue created to this TODO -> #1771

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, I'm still unsure about this todo, after the sync refactor it hopefully won't be necessary, but we'll see

go h.grandpa.network.SendBlockReqestByHash(msg.Vote.Hash)
h.grandpa.tracker.addCommit(msg)
}
return err
}

Expand Down
88 changes: 60 additions & 28 deletions lib/grandpa/message_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,38 @@ import (

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/crypto/ed25519"
)

// tracker keeps track of messages that have been received that have failed to validate with ErrBlockDoesNotExist
// these messages may be needed again in the case that we are slightly out of sync with the rest of the network
type tracker struct {
blockState BlockState
messages map[common.Hash][]*networkVoteMessage // map of vote block hash -> array of VoteMessages for that hash
mapLock sync.Mutex
in chan *types.Block // receive imported block from BlockState
chanID byte // BlockState channel ID
out chan<- *networkVoteMessage // send a VoteMessage back to grandpa. corresponds to grandpa's in channel
stopped chan struct{}
blockState BlockState
handler *MessageHandler
voteMessages map[common.Hash]map[ed25519.PublicKeyBytes]*networkVoteMessage // map of vote block hash -> array of VoteMessages for that hash
commitMessages map[common.Hash]*CommitMessage // map of commit block hash to commit message
mapLock sync.Mutex
in chan *types.Block // receive imported block from BlockState
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This channel is used to receive data
in <-chan *types.Block
We should close the channel at the sender's end.

func (bs *BlockState) UnregisterImportedChannel(id byte) {
	bs.importedLock.Lock()
	defer bs.importedLock.Unlock()

	if ch, ok := bs.imported[id]; !ok {
		close(ch)
	}
	delete(bs.imported, id)
	err := bs.importedBytePool.Put(id)
	if err != nil {
		logger.Error("failed to unregister imported channel", "error", err)
	}
}

instead of

func (t *tracker) stop() {
	close(t.stopped)
	t.blockState.UnregisterImportedChannel(t.chanID)
	close(t.in)
}

Reference:

One general principle of using Go channels is don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders. In other words, we should only close a channel in a sender goroutine if the sender is the only sender of the channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think this relies on Ed's refactor in #1736 so I'd rather not change it here, I agree with you though that the whole notifier code needs to be refactored. I can change it here if you'd like but it overlaps with Ed's PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented on this on Ed's PR #1736 (comment)

chanID byte // BlockState channel ID
stopped chan struct{}
}

func newTracker(bs BlockState, out chan<- *networkVoteMessage) (*tracker, error) {
func newTracker(bs BlockState, handler *MessageHandler) (*tracker, error) {
in := make(chan *types.Block, 16)
id, err := bs.RegisterImportedChannel(in)
if err != nil {
return nil, err
}

return &tracker{
blockState: bs,
messages: make(map[common.Hash][]*networkVoteMessage),
mapLock: sync.Mutex{},
in: in,
chanID: id,
out: out,
stopped: make(chan struct{}),
blockState: bs,
handler: handler,
voteMessages: make(map[common.Hash]map[ed25519.PublicKeyBytes]*networkVoteMessage),
commitMessages: make(map[common.Hash]*CommitMessage),
mapLock: sync.Mutex{},
in: in,
chanID: id,
stopped: make(chan struct{}),
}, nil
}

Expand All @@ -63,15 +66,27 @@ func (t *tracker) stop() {
close(t.in)
}

func (t *tracker) add(v *networkVoteMessage) {
func (t *tracker) addVote(v *networkVoteMessage) {
if v.msg == nil {
return
}

t.mapLock.Lock()
// TODO: change to map of maps, this allows duplicates
t.messages[v.msg.Message.Hash] = append(t.messages[v.msg.Message.Hash], v)
t.mapLock.Unlock()
defer t.mapLock.Unlock()

msgs, has := t.voteMessages[v.msg.Message.Hash]
Copy link
Contributor

@arijitAD arijitAD Aug 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to

	voteMsg, has := t.voteMessages[v.msg.Message.Hash]
	if !has {
		voteMsg = make(map[ed25519.PublicKeyBytes]*networkVoteMessage)
		t.voteMessages[v.msg.Message.Hash] = voteMsg
	}

	voteMsg[v.msg.Message.AuthorityID] = v

Code to verify

func TestMessageTracker_MapInsideMap(t *testing.T) {
	mm := make(map[common.Hash]map[ed25519.PublicKeyBytes]*networkVoteMessage)
	hash := common.NewHash([]byte{0})

	voteMsg, ok := mm[hash]
	require.False(t, ok)

	voteMsg = make(map[ed25519.PublicKeyBytes]*networkVoteMessage)
	mm[hash] = voteMsg

	AuthorityID := ed25519.PublicKeyBytes(common.MustHexToHash("0x34602b88f60513f1c805d87ef52896934baf6a662bc37414dbdbf69356b1a691"))
	voteMsg[AuthorityID] = &networkVoteMessage{}

	voteMsg, ok = mm[hash]
	require.True(t, ok)

	_, ok = voteMsg[AuthorityID]
	require.True(t, ok)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a test for the maps in addVote

if !has {
msgs = make(map[ed25519.PublicKeyBytes]*networkVoteMessage)
t.voteMessages[v.msg.Message.Hash] = msgs
}

msgs[v.msg.Message.AuthorityID] = v
}

func (t *tracker) addCommit(cm *CommitMessage) {
t.mapLock.Lock()
defer t.mapLock.Unlock()
t.commitMessages[cm.Vote.Hash] = cm
}

func (t *tracker) handleBlocks() {
Expand All @@ -82,18 +97,35 @@ func (t *tracker) handleBlocks() {
continue
}

t.mapLock.Lock()
t.handleBlock(b)
case <-t.stopped:
return
}
}
}

func (t *tracker) handleBlock(b *types.Block) {
t.mapLock.Lock()
defer t.mapLock.Unlock()

h := b.Header.Hash()
if t.messages[h] != nil {
for _, v := range t.messages[h] {
t.out <- v
}
h := b.Header.Hash()
if vms, has := t.voteMessages[h]; has {
for _, v := range vms {
_, err := t.handler.handleMessage(v.from, v.msg)
if err != nil {
logger.Warn("failed to handle vote message", "message", v, "error", err)
}
}

t.mapLock.Unlock()
case <-t.stopped:
return
delete(t.voteMessages, h)
}

if cm, has := t.commitMessages[h]; has {
_, err := t.handler.handleMessage("", cm)
if err != nil {
logger.Warn("failed to handle commit message", "message", cm, "error", err)
}

delete(t.commitMessages, h)
}
}
Loading