Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Fix: Increment stats.MessagesSent in msgToStream() function #441

Merged
merged 8 commits into from
Sep 10, 2020
3 changes: 2 additions & 1 deletion network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.
return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol())
}

atomic.AddUint64(&bsnet.stats.MessagesSent, 1)

if err := s.SetWriteDeadline(time.Time{}); err != nil {
log.Warnf("error resetting deadline: %s", err)
}
Expand Down Expand Up @@ -320,7 +322,6 @@ func (bsnet *impl) SendMessage(
_ = s.Reset()
return err
}
atomic.AddUint64(&bsnet.stats.MessagesSent, 1)

// TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine.
//nolint
Expand Down
279 changes: 169 additions & 110 deletions network/ipfs_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type receiver struct {
connectionEvent chan bool
lastMessage bsmsg.BitSwapMessage
lastSender peer.ID
listener network.Notifiee
}

func newReceiver() *receiver {
Expand Down Expand Up @@ -254,36 +255,38 @@ func TestMessageSendAndReceive(t *testing.T) {
}
}

func TestMessageResendAfterError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *receiver, p2 tnet.Identity, r2 *receiver) (*ErrHost, bsnet.BitSwapNetwork, *ErrHost, bsnet.BitSwapNetwork, bsmsg.BitSwapMessage) {
// create network
mn := mocknet.New(ctx)
mr := mockrouting.NewServer()
streamNet, err := tn.StreamNet(ctx, mn, mr)
if err != nil {
t.Fatal("Unable to setup network")
}
p1 := tnet.RandIdentityOrFatal(t)
p2 := tnet.RandIdentityOrFatal(t)

// Host 1
h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address())
if err != nil {
t.Fatal(err)
}

// Create a special host that we can force to start returning errors
eh := &ErrHost{Host: h1}
routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore())
bsnet1 := bsnet.NewFromIpfsHost(eh, routing)

bsnet2 := streamNet.Adapter(p2)
r1 := newReceiver()
r2 := newReceiver()
eh1 := &ErrHost{Host: h1}
routing1 := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore())
bsnet1 := bsnet.NewFromIpfsHost(eh1, routing1)
bsnet1.SetDelegate(r1)
if r1.listener != nil {
eh1.Network().Notify(r1.listener)
}

// Host 2
h2, err := mn.AddPeer(p2.PrivateKey(), p2.Address())
if err != nil {
t.Fatal(err)
}
eh2 := &ErrHost{Host: h2}
routing2 := mr.ClientWithDatastore(context.TODO(), p2, ds.NewMapDatastore())
bsnet2 := bsnet.NewFromIpfsHost(eh2, routing2)
bsnet2.SetDelegate(r2)
if r2.listener != nil {
eh2.Network().Notify(r2.listener)
}

// Networking
err = mn.LinkAll()
if err != nil {
t.Fatal(err)
Expand All @@ -307,6 +310,20 @@ func TestMessageResendAfterError(t *testing.T) {
msg := bsmsg.New(false)
msg.AddEntry(block1.Cid(), 1, pb.Message_Wantlist_Block, true)

return eh1, bsnet1, eh2, bsnet2, msg
}

func TestMessageResendAfterError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

p1 := tnet.RandIdentityOrFatal(t)
r1 := newReceiver()
p2 := tnet.RandIdentityOrFatal(t)
r2 := newReceiver()

eh, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)

testSendErrorBackoff := 100 * time.Millisecond
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
MaxRetries: 3,
Expand All @@ -316,6 +333,7 @@ func TestMessageResendAfterError(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer ms.Close()

// Return an error from the networking layer the next time we try to send
// a message
Expand Down Expand Up @@ -345,54 +363,12 @@ func TestMessageSendTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// create network
mn := mocknet.New(ctx)
mr := mockrouting.NewServer()
streamNet, err := tn.StreamNet(ctx, mn, mr)
if err != nil {
t.Fatal("Unable to setup network")
}
p1 := tnet.RandIdentityOrFatal(t)
p2 := tnet.RandIdentityOrFatal(t)

h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address())
if err != nil {
t.Fatal(err)
}

// Create a special host that we can force to start timing out
eh := &ErrHost{Host: h1}
routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore())
bsnet1 := bsnet.NewFromIpfsHost(eh, routing)

bsnet2 := streamNet.Adapter(p2)
r1 := newReceiver()
p2 := tnet.RandIdentityOrFatal(t)
r2 := newReceiver()
bsnet1.SetDelegate(r1)
bsnet2.SetDelegate(r2)

err = mn.LinkAll()
if err != nil {
t.Fatal(err)
}
err = bsnet1.ConnectTo(ctx, p2.ID())
if err != nil {
t.Fatal(err)
}
isConnected := <-r1.connectionEvent
if !isConnected {
t.Fatal("Expected connect event")
}

err = bsnet2.ConnectTo(ctx, p1.ID())
if err != nil {
t.Fatal(err)
}

blockGenerator := blocksutil.NewBlockGenerator()
block1 := blockGenerator.Next()
msg := bsmsg.New(false)
msg.AddEntry(block1.Cid(), 1, pb.Message_Wantlist_Block, true)
eh, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)

ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
MaxRetries: 3,
Expand All @@ -402,6 +378,7 @@ func TestMessageSendTimeout(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer ms.Close()

// Return a DeadlineExceeded error from the networking layer the next time we try to
// send a message
Expand All @@ -416,7 +393,7 @@ func TestMessageSendTimeout(t *testing.T) {
select {
case <-time.After(500 * time.Millisecond):
t.Fatal("Did not receive disconnect event")
case isConnected = <-r1.connectionEvent:
case isConnected := <-r1.connectionEvent:
if isConnected {
t.Fatal("Expected disconnect event (got connect event)")
}
Expand All @@ -427,69 +404,28 @@ func TestMessageSendNotSupportedResponse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// create network
mn := mocknet.New(ctx)
mr := mockrouting.NewServer()
streamNet, err := tn.StreamNet(ctx, mn, mr)
if err != nil {
t.Fatal("Unable to setup network")
}
p1 := tnet.RandIdentityOrFatal(t)
p2 := tnet.RandIdentityOrFatal(t)

h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address())
if err != nil {
t.Fatal(err)
}

// Create a special host that responds with ErrNotSupported
eh := &ErrHost{Host: h1}
routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore())
bsnet1 := bsnet.NewFromIpfsHost(eh, routing)

bsnet2 := streamNet.Adapter(p2)
r1 := newReceiver()
p2 := tnet.RandIdentityOrFatal(t)
r2 := newReceiver()
bsnet1.SetDelegate(r1)
bsnet2.SetDelegate(r2)

err = mn.LinkAll()
if err != nil {
t.Fatal(err)
}
err = bsnet1.ConnectTo(ctx, p2.ID())
if err != nil {
t.Fatal(err)
}
isConnected := <-r1.connectionEvent
if !isConnected {
t.Fatal("Expected connect event")
}

err = bsnet2.ConnectTo(ctx, p1.ID())
if err != nil {
t.Fatal(err)
}

blockGenerator := blocksutil.NewBlockGenerator()
block1 := blockGenerator.Next()
msg := bsmsg.New(false)
msg.AddEntry(block1.Cid(), 1, pb.Message_Wantlist_Block, true)
eh, bsnet1, _, _, _ := prepareNetwork(t, ctx, p1, r1, p2, r2)

eh.setError(multistream.ErrNotSupported)
_, err = bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
MaxRetries: 3,
SendTimeout: 100 * time.Millisecond,
SendErrorBackoff: 100 * time.Millisecond,
})
if err == nil {
ms.Close()
t.Fatal("Expected ErrNotSupported")
}

select {
case <-time.After(500 * time.Millisecond):
t.Fatal("Did not receive disconnect event")
case isConnected = <-r1.connectionEvent:
case isConnected := <-r1.connectionEvent:
if isConnected {
t.Fatal("Expected disconnect event (got connect event)")
}
Expand Down Expand Up @@ -535,9 +471,132 @@ func TestSupportsHave(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer senderCurrent.Close()

if senderCurrent.SupportsHave() != tc.expSupportsHave {
t.Fatal("Expected sender HAVE message support", tc.proto, tc.expSupportsHave)
}
}
}

func testNetworkCounters(t *testing.T, n1 int, n2 int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

p1 := tnet.RandIdentityOrFatal(t)
r1 := newReceiver()
p2 := tnet.RandIdentityOrFatal(t)
r2 := newReceiver()

var wg1, wg2 sync.WaitGroup
r1.listener = &network.NotifyBundle{
OpenedStreamF: func(n network.Network, s network.Stream) {
wg1.Add(1)
},
ClosedStreamF: func(n network.Network, s network.Stream) {
wg1.Done()
},
}
r2.listener = &network.NotifyBundle{
OpenedStreamF: func(n network.Network, s network.Stream) {
wg2.Add(1)
},
ClosedStreamF: func(n network.Network, s network.Stream) {
wg2.Done()
},
}
_, bsnet1, _, bsnet2, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)

for n := 0; n < n1; n++ {
ctx, cancel := context.WithTimeout(ctx, time.Second)
err := bsnet1.SendMessage(ctx, p2.ID(), msg)
if err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("p2 did not receive message sent")
case <-r2.messageReceived:
for j := 0; j < 2; j++ {
err := bsnet2.SendMessage(ctx, p1.ID(), msg)
if err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("p1 did not receive message sent")
case <-r1.messageReceived:
}
}
}
cancel()
}

if n2 > 0 {
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{})
if err != nil {
t.Fatal(err)
}
defer ms.Close()
for n := 0; n < n2; n++ {
ctx, cancel := context.WithTimeout(ctx, time.Second)
err = ms.SendMsg(ctx, msg)
if err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("p2 did not receive message sent")
case <-r2.messageReceived:
for j := 0; j < 2; j++ {
err := bsnet2.SendMessage(ctx, p1.ID(), msg)
if err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("p1 did not receive message sent")
case <-r1.messageReceived:
}
}
}
cancel()
}
ms.Close()
}

// Wait until all streams are closed and MessagesRecvd counters
// updated.
ctxto, cancelto := context.WithTimeout(ctx, 5*time.Second)
defer cancelto()
ctxwait, cancelwait := context.WithCancel(ctx)
defer cancelwait()
go func() {
wg1.Wait()
wg2.Wait()
cancelwait()
}()
select {
case <-ctxto.Done():
t.Fatal("network streams closing timed out")
case <-ctxwait.Done():
}

if bsnet1.Stats().MessagesSent != uint64(n1+n2) {
t.Fatal(fmt.Errorf("expected %d sent messages, got %d", n1+n2, bsnet1.Stats().MessagesSent))
}

if bsnet2.Stats().MessagesRecvd != uint64(n1+n2) {
t.Fatal(fmt.Errorf("expected %d received messages, got %d", n1+n2, bsnet2.Stats().MessagesRecvd))
}

if bsnet1.Stats().MessagesRecvd != 2*uint64(n1+n2) {
t.Fatal(fmt.Errorf("expected %d received reply messages, got %d", 2*(n1+n2), bsnet1.Stats().MessagesRecvd))
}
}

func TestNetworkCounters(t *testing.T) {
for n := 0; n < 11; n++ {
testNetworkCounters(t, 10-n, n)
}
}