Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/network): fix bugs in notifications protocol handlers; add metrics for inbound/outbound streams #2010

Merged
merged 73 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from 70 commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
4666518
add lots of logs, comment out maintainTransactionPool for now
noot Nov 3, 2021
e03fa95
merge
noot Nov 3, 2021
0692825
update blocktree.HighestCommonAncestor to return err if ancestor is nil
noot Nov 3, 2021
71f2937
increase first block timeout
noot Nov 4, 2021
2950c17
close and set stream to nil if err on message send
noot Nov 4, 2021
4597344
close stream if failed to decode msg
noot Nov 4, 2021
d873979
add block announce and grandpa stream count metrics
noot Nov 4, 2021
07ed907
fix getNumStreams interface conv
noot Nov 5, 2021
bed53dc
check error in sendData
noot Nov 5, 2021
39de2d7
add nil checks to HighestCommonAncestor, increase BABE first block ti…
noot Nov 5, 2021
a07d83d
add nil checks to core service
noot Nov 5, 2021
178f727
address comments
noot Nov 8, 2021
6f3fd63
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Nov 8, 2021
f335df9
update blockState.AddBlockToBlockTree to also store in unfinalised bl…
noot Nov 8, 2021
67f59cc
fix tests
noot Nov 8, 2021
941f954
cleanup
noot Nov 8, 2021
d9bdff4
add logs for when body isn't in response
noot Nov 8, 2021
7f067c7
Merge branch 'noot/devnet-debug' into noot/debug-network
noot Nov 8, 2021
fa1fb31
merge w noot/fix-blocktree
noot Nov 8, 2021
d99a4ab
Merge branch 'noot/fix-blocks' into noot/debug-network
noot Nov 8, 2021
beb2e47
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Nov 9, 2021
e7a5a15
Merge branch 'noot/fix-blocktree' of github.com:ChainSafe/gossamer in…
noot Nov 9, 2021
00091c5
cleanup
noot Nov 9, 2021
577ff25
delete hsData on stream close
noot Nov 9, 2021
3f5a323
merge w development
noot Nov 10, 2021
f4eb7a5
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Nov 10, 2021
178434e
cleanup logic in createNotificationsMessageHandler
noot Nov 10, 2021
c6a9465
update readStream to use closeInboundStream
noot Nov 10, 2021
3eb1b8e
treat io.EOF the same as other errs in readStream
noot Nov 11, 2021
dea40af
update streamManager to use closeInboundStream func
noot Nov 11, 2021
d9f7e36
remove gossip module
noot Nov 11, 2021
f072ef3
re-add discovery connect code for now
noot Nov 11, 2021
56362e4
make sendData synchronous for now
noot Nov 11, 2021
5a5ee4b
maybe improve concurrency in connmgr closeHandler
noot Nov 11, 2021
9584b4e
remove cache-ignore of consensus messages
noot Nov 11, 2021
c2fa05e
embed mutex in handshakeData
noot Nov 11, 2021
d25196b
make sendData async again
noot Nov 11, 2021
0319217
maybe fix locking?
noot Nov 11, 2021
5a66fff
change messageCache TTL to 5s
noot Nov 11, 2021
5ec3aa0
make hsData lock ptr again
noot Nov 11, 2021
e30a399
update network service readStream to always assume inbound
noot Nov 11, 2021
02c15ac
re-add gossip logic for inbound msg re-gossiping
noot Nov 11, 2021
83beda8
reset streams instead of closing
noot Nov 11, 2021
958c0c1
comment out cleanupStreams
noot Nov 11, 2021
7c81966
fix
noot Nov 11, 2021
4a8732e
fix stream close handler bug, was deleting both hsDatas instead of 1
noot Nov 11, 2021
dfc5e2c
cleanup
noot Nov 11, 2021
a7de1d1
move lock in sendData
noot Nov 11, 2021
0fd0230
change some stuff back
noot Nov 11, 2021
912a78b
change some more stuff back
noot Nov 11, 2021
8629b43
change more stuff back again
noot Nov 11, 2021
f077036
change more stuff back again
noot Nov 11, 2021
be2fcd8
close stream on handshake data delete
noot Nov 11, 2021
7f1f429
add closeInboundStream and closeOutboundStream funcs
noot Nov 11, 2021
a08a742
add sendDataMutexes for each peer per protocol to prevent races
noot Nov 12, 2021
7d093ba
fix
noot Nov 12, 2021
9aaa1f2
merge w development
noot Nov 12, 2021
15bccf2
fix tests
noot Nov 12, 2021
bfbca17
cleanup
noot Nov 12, 2021
5efe331
move errors to file
noot Nov 12, 2021
0ef20de
fix tests
noot Nov 12, 2021
9541b72
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Nov 12, 2021
426a5fa
address comments
noot Nov 12, 2021
d8445b3
merge w development
noot Nov 15, 2021
3a7a426
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Nov 16, 2021
122c2a4
address comments
noot Nov 16, 2021
b3db52a
lint
noot Nov 16, 2021
7ef53e3
lint
noot Nov 16, 2021
d5e6339
Merge branch 'development' of github.com:ChainSafe/gossamer into noot…
noot Nov 16, 2021
99053f9
address comments
noot Nov 16, 2021
a8e9cff
make license
noot Nov 17, 2021
b0d01a0
address comments
noot Nov 17, 2021
b291a9f
lint
noot Nov 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dot/network/block_announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestValidateBlockAnnounceHandshake(t *testing.T) {
inboundHandshakeData: new(sync.Map),
}
testPeerID := peer.ID("noot")
nodeA.notificationsProtocols[BlockAnnounceMsgType].inboundHandshakeData.Store(testPeerID, handshakeData{})
nodeA.notificationsProtocols[BlockAnnounceMsgType].inboundHandshakeData.Store(testPeerID, &handshakeData{})

err := nodeA.validateBlockAnnounceHandshake(testPeerID, &BlockAnnounceHandshake{
BestBlockNumber: 100,
Expand Down
39 changes: 13 additions & 26 deletions dot/network/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
ma "github.com/multiformats/go-multiaddr"

"github.com/ChainSafe/gossamer/dot/peerset"
Expand All @@ -23,11 +22,9 @@ type ConnManager struct {
sync.Mutex
host *host
min, max int
connectHandler func(peer.ID)
disconnectHandler func(peer.ID)

// closeHandlerMap contains close handler corresponding to a protocol.
closeHandlerMap map[protocol.ID]func(peerID peer.ID)

// protectedPeers contains a list of peers that are protected from pruning
// when we reach the maximum numbers of peers.
protectedPeers *sync.Map // map[peer.ID]struct{}
Expand All @@ -47,7 +44,6 @@ func newConnManager(min, max int, peerSetCfg *peerset.ConfigSet) (*ConnManager,
return &ConnManager{
min: min,
max: max,
closeHandlerMap: make(map[protocol.ID]func(peerID peer.ID)),
protectedPeers: new(sync.Map),
persistentPeers: new(sync.Map),
peerSetHandler: psh,
Expand All @@ -68,19 +64,19 @@ func (cm *ConnManager) Notifee() network.Notifiee {
return nb
}

// TagPeer peer
// TagPeer is unimplemented
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
func (*ConnManager) TagPeer(peer.ID, string, int) {}

// UntagPeer peer
// UntagPeer is unimplemented
func (*ConnManager) UntagPeer(peer.ID, string) {}

// UpsertTag peer
// UpsertTag is unimplemented
func (*ConnManager) UpsertTag(peer.ID, string, func(int) int) {}

// GetTagInfo peer
// GetTagInfo is unimplemented
func (*ConnManager) GetTagInfo(peer.ID) *connmgr.TagInfo { return &connmgr.TagInfo{} }

// TrimOpenConns peer
// TrimOpenConns is unimplemented
func (*ConnManager) TrimOpenConns(context.Context) {}

// Protect peer will add the given peer to the protectedPeerMap which will
Expand All @@ -97,7 +93,7 @@ func (cm *ConnManager) Unprotect(id peer.ID, _ string) bool {
return wasDeleted
}

// Close peer
// Close is unimplemented
func (*ConnManager) Close() error { return nil }

// IsProtected returns whether the given peer is protected from pruning or not.
Expand Down Expand Up @@ -134,6 +130,7 @@ func (cm *ConnManager) unprotectedPeers(peers []peer.ID) []peer.ID {
func (cm *ConnManager) Connected(n network.Network, c network.Conn) {
logger.Tracef(
"Host %s connected to peer %s", n.LocalPeer(), c.RemotePeer())
cm.connectHandler(c.RemotePeer())

cm.Lock()
defer cm.Unlock()
Expand All @@ -143,7 +140,9 @@ func (cm *ConnManager) Connected(n network.Network, c network.Conn) {
return
}

// TODO: peer scoring doesn't seem to prevent us from going over the max.
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
// if over the max peer count, disconnect from (total_peers - maximum) peers
// (#2039)
for i := 0; i < over; i++ {
unprotPeers := cm.unprotectedPeers(n.Peers())
if len(unprotPeers) == 0 {
Expand All @@ -170,31 +169,19 @@ func (cm *ConnManager) Disconnected(_ network.Network, c network.Conn) {
logger.Tracef("Host %s disconnected from peer %s", c.LocalPeer(), c.RemotePeer())

cm.Unprotect(c.RemotePeer(), "")
if cm.disconnectHandler != nil {
cm.disconnectHandler(c.RemotePeer())
}
cm.disconnectHandler(c.RemotePeer())
}

// OpenedStream is called when a stream opened
// OpenedStream is called when a stream is opened
func (cm *ConnManager) OpenedStream(_ network.Network, s network.Stream) {
logger.Tracef("Stream opened with peer %s using protocol %s",
s.Conn().RemotePeer(), s.Protocol())
}

func (cm *ConnManager) registerCloseHandler(protocolID protocol.ID, cb func(id peer.ID)) {
cm.closeHandlerMap[protocolID] = cb
}

// ClosedStream is called when a stream closed
// ClosedStream is called when a stream is closed
func (cm *ConnManager) ClosedStream(_ network.Network, s network.Stream) {
logger.Tracef("Stream closed with peer %s using protocol %s",
s.Conn().RemotePeer(), s.Protocol())

cm.Lock()
defer cm.Unlock()
if closeCB, ok := cm.closeHandlerMap[s.Protocol()]; ok {
closeCB(s.Conn().RemotePeer())
}
}

func (cm *ConnManager) isPersistent(p peer.ID) bool {
Expand Down
13 changes: 13 additions & 0 deletions dot/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,21 @@ func (d *discovery) findPeers(ctx context.Context) {

logger.Tracef("found new peer %s via DHT", peer.ID)

// TODO: this isn't working on the devnet (#2026)
// can remove the code block below which directly connects
// once that's fixed
d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
d.handler.AddPeer(0, peer.ID)

// found a peer, try to connect if we need more peers
if len(d.h.Network().Peers()) >= d.maxPeers {
d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
return
}
qdm12 marked this conversation as resolved.
Show resolved Hide resolved

if err = d.h.Connect(d.ctx, peer); err != nil {
logger.Tracef("failed to connect to discovered peer %s: %s", peer.ID, err)
}
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions dot/network/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package network

import (
"errors"
)

var (
errCannotValidateHandshake = errors.New("failed to validate handshake")
errInvalidNotificationsMessage = errors.New("message is not NotificationsMessage")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit maybe change that to errMessageTypeNotValid = errors.New("message type is not valid")

and use it as fmt.Errorf("%w: expected %T but got %T", errMessageTypeNotValid, (NotificationsMessage)(nil), message)? That would be cool since it would adapt to name changes to the types. But that might be me getting excited for no reason haha!

errMessageIsNotHandshake = errors.New("failed to convert message to Handshake")
errMissingHandshakeMutex = errors.New("outboundHandshakeMutex does not exist")
errInvalidHandshakeForPeer = errors.New("peer previously sent invalid handshake")
errHandshakeTimeout = errors.New("handshake timeout reached")
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
)
2 changes: 1 addition & 1 deletion dot/network/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func TestStreamCloseMetadataCleanup(t *testing.T) {
info := nodeA.notificationsProtocols[BlockAnnounceMsgType]

// Set handshake data to received
info.inboundHandshakeData.Store(nodeB.host.id(), handshakeData{
info.inboundHandshakeData.Store(nodeB.host.id(), &handshakeData{
received: true,
validated: true,
})
Expand Down
73 changes: 73 additions & 0 deletions dot/network/inbound.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package network

import (
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
)

func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder, handler messageHandler) {
// we NEED to reset the stream if we ever return from this function, as if we return,
// the stream will never again be read by us, so we need to tell the remote side we're
// done with this stream, and they should also forget about it.
defer s.resetInboundStream(stream)
s.streamManager.logNewStream(stream)

peer := stream.Conn().RemotePeer()
msgBytes := s.bufPool.get()
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
defer s.bufPool.put(msgBytes)

for {
n, err := readStream(stream, msgBytes[:])
if err != nil {
logger.Tracef(
"failed to read from stream id %s of peer %s using protocol %s: %s",
stream.ID(), stream.Conn().RemotePeer(), stream.Protocol(), err)
return
}

s.streamManager.logMessageReceived(stream.ID())

// decode message based on message type
msg, err := decoder(msgBytes[:n], peer, isInbound(stream)) // stream should always be inbound if it passes through service.readStream
if err != nil {
logger.Tracef("failed to decode message from stream id %s using protocol %s: %s",
stream.ID(), stream.Protocol(), err)
continue
}

logger.Tracef(
"host %s received message from peer %s: %s",
s.host.id(), peer, msg)

if err = handler(stream, msg); err != nil {
logger.Tracef("failed to handle message %s from stream id %s: %s", msg, stream.ID(), err)
return
}

s.host.bwc.LogRecvMessage(int64(n))
}
}

func (s *Service) resetInboundStream(stream libp2pnetwork.Stream) {
protocolID := stream.Protocol()
peerID := stream.Conn().RemotePeer()

s.notificationsMu.Lock()
defer s.notificationsMu.Unlock()

for _, prtl := range s.notificationsProtocols {
if prtl.protocolID != protocolID {
continue
}

prtl.inboundHandshakeData.Delete(peerID)
break
}

logger.Debugf(
"cleaning up inbound handshake data for protocol=%s, peer=%s",
stream.Protocol(),
peerID,
)

_ = stream.Reset()
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}
75 changes: 75 additions & 0 deletions dot/network/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,71 @@ import (
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/pkg/scale"

libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
)

// handleLightStream handles streams with the <protocol-id>/light/2 protocol ID
func (s *Service) handleLightStream(stream libp2pnetwork.Stream) {
s.readStream(stream, s.decodeLightMessage, s.handleLightMsg)
}

func (s *Service) decodeLightMessage(in []byte, peer peer.ID, _ bool) (Message, error) {
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
s.lightRequestMu.RLock()
defer s.lightRequestMu.RUnlock()

// check if we are the requester
if _, ok := s.lightRequest[peer]; ok {
// if we are, decode the bytes as a LightResponse
return newLightResponseFromBytes(in)
}

// otherwise, decode bytes as LightRequest
return newLightRequestFromBytes(in)
}

func (s *Service) handleLightMsg(stream libp2pnetwork.Stream, msg Message) (err error) {
defer func() {
_ = stream.Close()
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}()

lr, ok := msg.(*LightRequest)
if !ok {
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to double check, it's all good here if the message is not a request? Maybe should calling layers check that and pass it as a *LightRequest instead of a Message here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should never not be a request, just by reasoning of how the substrate networking protocol works (https://docs.rs/sc-network/0.9.0/sc_network/) if it's not a request, then we can ignore it, as it shouldn't be sent over an inbound stream. (in fact we can probably downscore the peer, but there's another issue open for that)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't change it to be *LightRequest as it needs to implement the function type messageHandler = func(stream libp2pnetwork.Stream, msg Message) error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying!!

Copy link
Contributor

@qdm12 qdm12 Nov 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think we could return an error since it's not meant to happen.

Or just remove the type assertion check so it panics, up to you. Panicing is fine too in this case I'd say.

}

resp := NewLightResponse()
switch {
case lr.RemoteCallRequest != nil:
resp.RemoteCallResponse, err = remoteCallResp(lr.RemoteCallRequest)
case lr.RemoteHeaderRequest != nil:
resp.RemoteHeaderResponse, err = remoteHeaderResp(lr.RemoteHeaderRequest)
case lr.RemoteChangesRequest != nil:
resp.RemoteChangesResponse, err = remoteChangeResp(lr.RemoteChangesRequest)
case lr.RemoteReadRequest != nil:
resp.RemoteReadResponse, err = remoteReadResp(lr.RemoteReadRequest)
case lr.RemoteReadChildRequest != nil:
resp.RemoteReadResponse, err = remoteReadChildResp(lr.RemoteReadChildRequest)
default:
logger.Warn("ignoring LightRequest without request data")
return nil
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}

if err != nil {
return err
}

// TODO(arijit): Remove once we implement the internal APIs. Added to increase code coverage. (#1856)
logger.Debugf("LightResponse message: %s", resp)

err = s.host.writeToStream(stream, resp)
if err != nil {
logger.Warnf("failed to send LightResponse message to peer %s: %s", stream.Conn().RemotePeer(), err)
}
return err
}

// Pair is a pair of arbitrary bytes.
type Pair struct {
First []byte
Expand Down Expand Up @@ -46,6 +109,12 @@ func NewLightRequest() *LightRequest {
}
}

func newLightRequestFromBytes(in []byte) (*LightRequest, error) {
msg := NewLightRequest()
err := msg.Decode(in)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit adding named returned values

Suggested change
func newLightRequestFromBytes(in []byte) (*LightRequest, error) {
msg := NewLightRequest()
err := msg.Decode(in)
func newLightRequestFromBytes(in []byte) (msg *LightRequest, err error) {
msg = NewLightRequest()
err = msg.Decode(in)

return msg, err
}

func newRequest() *request {
return &request{
RemoteCallRequest: *newRemoteCallRequest(),
Expand Down Expand Up @@ -122,6 +191,12 @@ func NewLightResponse() *LightResponse {
}
}

func newLightResponseFromBytes(in []byte) (*LightResponse, error) {
msg := NewLightResponse()
err := msg.Decode(in)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func newLightResponseFromBytes(in []byte) (*LightResponse, error) {
msg := NewLightResponse()
err := msg.Decode(in)
func newLightResponseFromBytes(in []byte) (msg *LightResponse, err error) {
msg = NewLightResponse()
err = msg.Decode(in)

return msg, err
}

func newResponse() *response {
return &response{
RemoteCallResponse: *newRemoteCallResponse(),
Expand Down
Loading