-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(dot/network): fix receiving notifications messages #1517
Changes from 15 commits
e6b4efd
60246a2
a475b73
f976f1e
87b1f17
bf7072f
3915d03
dce28d6
223f1aa
b006e57
9b805ab
b4243ef
5388e0e
14e6c4a
8256bcb
41fe922
4b5d23b
8708374
21f464f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -131,7 +131,6 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, | |
err := handshakeValidator(peer, hs) | ||
if err != nil { | ||
logger.Trace("failed to validate handshake", "protocol", info.protocolID, "peer", peer, "error", err) | ||
_ = stream.Conn().Close() | ||
return errCannotValidateHandshake | ||
} | ||
|
||
|
@@ -141,17 +140,17 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, | |
// once validated, send back a handshake | ||
resp, err := info.getHandshake() | ||
if err != nil { | ||
logger.Debug("failed to get handshake", "protocol", info.protocolID, "error", err) | ||
logger.Warn("failed to get handshake", "protocol", info.protocolID, "error", err) | ||
return err | ||
} | ||
|
||
err = s.host.send(peer, info.protocolID, resp) | ||
err = s.host.writeToStream(stream, resp) | ||
if err != nil { | ||
logger.Trace("failed to send handshake", "protocol", info.protocolID, "peer", peer, "error", err) | ||
_ = stream.Conn().Close() | ||
return err | ||
} | ||
logger.Trace("receiver: sent handshake", "protocol", info.protocolID, "peer", peer) | ||
return nil | ||
} | ||
|
||
// if we are the initiator and haven't received the handshake already, validate it | ||
|
@@ -161,7 +160,6 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, | |
if err != nil { | ||
logger.Trace("failed to validate handshake", "protocol", info.protocolID, "peer", peer, "error", err) | ||
hsData.validated = false | ||
_ = stream.Conn().Close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We no longer close connection stream when there is an error, why? Is connection closing handled somewhere else? In network/service.go line 500, the stream is closed when there is an error. I'm just try to better understand how these streams are handled. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, this was duplicate code, if this function returns an error then
|
||
return errCannotValidateHandshake | ||
} | ||
|
||
|
@@ -175,7 +173,7 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, | |
// if we are the initiator, send the message | ||
if hsData, has := info.getHandshakeData(peer); has && hsData.validated && hsData.received && hsData.outboundMsg != nil { | ||
logger.Trace("sender: sending message", "protocol", info.protocolID) | ||
err := s.host.send(peer, info.protocolID, hsData.outboundMsg) | ||
err := s.host.writeToStream(stream, hsData.outboundMsg) | ||
if err != nil { | ||
logger.Debug("failed to send message", "protocol", info.protocolID, "peer", peer, "error", err) | ||
return err | ||
|
@@ -197,11 +195,14 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol, | |
} | ||
|
||
// TODO: improve this by keeping track of who you've received/sent messages from | ||
if !s.noGossip { | ||
seen := s.gossip.hasSeen(msg) | ||
if !seen { | ||
s.broadcastExcluding(info, peer, msg) | ||
} | ||
if s.noGossip { | ||
return nil | ||
} | ||
|
||
seen := s.gossip.hasSeen(msg) | ||
if !seen { | ||
// TODO: update this to write to stream | ||
s.broadcastExcluding(info, peer, msg) | ||
} | ||
|
||
return nil | ||
|
@@ -225,6 +226,9 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer | |
peers := s.host.peers() | ||
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) | ||
|
||
info.mapMu.RLock() | ||
defer info.mapMu.RUnlock() | ||
|
||
for i, peer := range peers { // TODO: check if stream is open, if not, open and send handshake | ||
// TODO: configure this and determine ideal ratio, as well as when to use broadcast vs gossip | ||
if i > len(peers)/3 { | ||
|
@@ -235,9 +239,6 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer | |
continue | ||
} | ||
|
||
info.mapMu.RLock() | ||
defer info.mapMu.RUnlock() | ||
|
||
if hsData, has := info.getHandshakeData(peer); !has || !hsData.received { | ||
info.handshakeData.Store(peer, &handshakeData{ | ||
validated: false, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -145,7 +145,7 @@ func NewService(cfg *Config) (*Service, error) { | |
} | ||
|
||
network.syncQueue = newSyncQueue(network) | ||
|
||
network.noGossip = true // TODO: remove once duplicate message sending is merged | ||
return network, err | ||
} | ||
|
||
|
@@ -281,40 +281,8 @@ func (s *Service) logPeerCount() { | |
|
||
func (s *Service) handleConn(conn libp2pnetwork.Conn) { | ||
// give new peers a slight weight | ||
// TODO: do this once handshake is received | ||
s.syncQueue.updatePeerScore(conn.RemotePeer(), 1) | ||
|
||
s.notificationsMu.Lock() | ||
defer s.notificationsMu.Unlock() | ||
|
||
info, has := s.notificationsProtocols[BlockAnnounceMsgType] | ||
if !has { | ||
// this shouldn't happen | ||
logger.Warn("block announce protocol is not yet registered!") | ||
return | ||
} | ||
|
||
// open block announce substream | ||
hs, err := info.getHandshake() | ||
if err != nil { | ||
logger.Warn("failed to get handshake", "protocol", blockAnnounceID, "error", err) | ||
return | ||
} | ||
|
||
info.mapMu.RLock() | ||
defer info.mapMu.RUnlock() | ||
|
||
peer := conn.RemotePeer() | ||
if hsData, has := info.getHandshakeData(peer); !has || !hsData.received { //nolint | ||
info.handshakeData.Store(peer, &handshakeData{ | ||
validated: false, | ||
}) | ||
|
||
logger.Trace("sending handshake", "protocol", info.protocolID, "peer", peer, "message", hs) | ||
err = s.host.send(peer, info.protocolID, hs) | ||
if err != nil { | ||
logger.Trace("failed to send block announce handshake to peer", "peer", peer, "error", err) | ||
} | ||
} | ||
} | ||
|
||
func (s *Service) beginDiscovery() error { | ||
|
@@ -528,7 +496,7 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder | |
if err == io.EOF { | ||
continue | ||
} else if err != nil { | ||
logger.Trace("failed to read from stream", "protocol", stream.Protocol(), "error", err) | ||
logger.Trace("failed to read from stream", "peer", stream.Conn().RemotePeer(), "protocol", stream.Protocol(), "error", err) | ||
_ = stream.Close() | ||
return | ||
} | ||
|
@@ -541,21 +509,18 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder | |
} | ||
|
||
logger.Trace( | ||
"Received message from peer", | ||
"received message from peer", | ||
"host", s.host.id(), | ||
"peer", peer, | ||
"msg", msg.String(), | ||
) | ||
|
||
go func() { | ||
// handle message based on peer status and message type | ||
err = handler(stream, msg) | ||
if err != nil { | ||
logger.Trace("Failed to handle message from stream", "message", msg, "error", err) | ||
_ = stream.Close() | ||
return | ||
} | ||
}() | ||
err = handler(stream, msg) | ||
if err != nil { | ||
logger.Debug("failed to handle message from stream", "message", msg, "error", err) | ||
_ = stream.Close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. defer the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why? we only want to close the stream if there's an error There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. I assumed we would close the stream after reading once. |
||
return | ||
} | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ import ( | |
"github.com/ChainSafe/gossamer/lib/common/optional" | ||
"github.com/ChainSafe/gossamer/lib/utils" | ||
|
||
"github.com/ChainSafe/chaindb" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
@@ -425,9 +426,10 @@ func TestSyncQueue_SyncAtHead(t *testing.T) { | |
q.stop() | ||
time.Sleep(time.Second) | ||
q.ctx = context.Background() | ||
q.slotDuration = time.Millisecond * 100 | ||
|
||
go q.syncAtHead() | ||
time.Sleep(time.Millisecond * 6100) | ||
time.Sleep(q.slotDuration * 3) | ||
select { | ||
case req := <-q.requestCh: | ||
require.Equal(t, uint64(2), req.req.StartingBlock.Uint64()) | ||
|
@@ -500,7 +502,7 @@ func TestSyncQueue_handleBlockDataFailure_MissingParent(t *testing.T) { | |
q.ctx = context.Background() | ||
|
||
data := testBlockResponseMessage().BlockData | ||
q.handleBlockDataFailure(0, fmt.Errorf("failed to get parent hash: Key not found"), data) | ||
q.handleBlockDataFailure(0, fmt.Errorf("some error: %w", chaindb.ErrKeyNotFound), data) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. q.handleBlockDataFailure(0, chaindb.ErrKeyNotFound, data) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wanted to test the
|
||
select { | ||
case req := <-q.requestCh: | ||
require.True(t, req.req.StartingBlock.IsHash()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably do the reconnection in a background go routine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done