Skip to content

Commit

Permalink
fix(dot/network): Return on EOF error while reading stream. (ChainSaf…
Browse files Browse the repository at this point in the history
…e#1733)

* Return on EOF error while reading stream.

* Add unit test to verify exit on EOF.
  • Loading branch information
arijitAD authored and timwu20 committed Dec 6, 2021
1 parent 10ed3de commit a9d32fb
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 4 deletions.
48 changes: 48 additions & 0 deletions dot/network/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,3 +476,51 @@ func Test_RemoveReservedPeers(t *testing.T) {
err = nodeA.host.removeReservedPeers("failing peer ID")
require.Error(t, err)
}

func TestStreamCloseEOF(t *testing.T) {
basePathA := utils.NewTestBasePath(t, "nodeA")
configA := &Config{
BasePath: basePathA,
Port: 7001,
NoBootstrap: true,
NoMDNS: true,
}

nodeA := createTestService(t, configA)
nodeA.noGossip = true

basePathB := utils.NewTestBasePath(t, "nodeB")

configB := &Config{
BasePath: basePathB,
Port: 7002,
NoBootstrap: true,
NoMDNS: true,
}

nodeB := createTestService(t, configB)
nodeB.noGossip = true
handler := newTestStreamHandler(testBlockRequestMessageDecoder)
nodeB.host.registerStreamHandler("", handler.handleStream)
require.False(t, handler.exit)

addrInfoB := nodeB.host.addrInfo()
err := nodeA.host.connect(addrInfoB)
// retry connect if "failed to dial" error
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = nodeA.host.connect(addrInfoB)
}
require.NoError(t, err)

stream, err := nodeA.host.send(addrInfoB.ID, nodeB.host.protocolID, testBlockRequestMessage)
require.NoError(t, err)
require.False(t, handler.exit)

err = stream.Close()
require.NoError(t, err)

time.Sleep(TestBackoffTimeout)

require.True(t, handler.exit)
}
4 changes: 2 additions & 2 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,8 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder

for {
tot, err := readStream(stream, msgBytes[:])
if err == io.EOF {
continue
if errors.Is(err, io.EOF) {
return
} else if err != nil {
logger.Trace("failed to read from stream", "peer", stream.Conn().RemotePeer(), "protocol", stream.Protocol(), "error", err)
_ = stream.Close()
Expand Down
10 changes: 8 additions & 2 deletions dot/network/test_helpers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package network

import (
"errors"
"io"
"math/big"

Expand Down Expand Up @@ -89,6 +90,7 @@ func testBlockResponseMessage() *BlockResponseMessage {
type testStreamHandler struct {
messages map[peer.ID][]Message
decoder messageDecoder
exit bool
}

func newTestStreamHandler(decoder messageDecoder) *testStreamHandler {
Expand Down Expand Up @@ -135,10 +137,14 @@ func (s *testStreamHandler) readStream(stream libp2pnetwork.Stream, peer peer.ID
msgBytes = make([]byte, maxMessageSize)
)

defer func() {
s.exit = true
}()

for {
tot, err := readStream(stream, msgBytes)
if err == io.EOF {
continue
if errors.Is(err, io.EOF) {
return
} else if err != nil {
logger.Debug("failed to read from stream", "protocol", stream.Protocol(), "error", err)
_ = stream.Close()
Expand Down

0 comments on commit a9d32fb

Please sign in to comment.