Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Fix: Increment stats.MessagesSent in msgToStream() function #441

Merged
merged 8 commits into from
Sep 10, 2020
3 changes: 2 additions & 1 deletion network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.
return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol())
}

atomic.AddUint64(&bsnet.stats.MessagesSent, 1)

if err := s.SetWriteDeadline(time.Time{}); err != nil {
log.Warnf("error resetting deadline: %s", err)
}
Expand Down Expand Up @@ -320,7 +322,6 @@ func (bsnet *impl) SendMessage(
_ = s.Reset()
return err
}
atomic.AddUint64(&bsnet.stats.MessagesSent, 1)

// TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine.
//nolint
Expand Down
236 changes: 128 additions & 108 deletions network/ipfs_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,34 +254,31 @@ func TestMessageSendAndReceive(t *testing.T) {
}
}

func TestMessageResendAfterError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *receiver, p2 tnet.Identity, r2 *receiver) (*ErrHost, bsnet.BitSwapNetwork, *ErrHost, bsnet.BitSwapNetwork, bsmsg.BitSwapMessage) {
// create network
mn := mocknet.New(ctx)
mr := mockrouting.NewServer()
streamNet, err := tn.StreamNet(ctx, mn, mr)
if err != nil {
t.Fatal("Unable to setup network")
}
p1 := tnet.RandIdentityOrFatal(t)
p2 := tnet.RandIdentityOrFatal(t)

h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address())
if err != nil {
t.Fatal(err)
}

// Create a special host that we can force to start returning errors
eh := &ErrHost{Host: h1}
routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore())
bsnet1 := bsnet.NewFromIpfsHost(eh, routing)

bsnet2 := streamNet.Adapter(p2)
r1 := newReceiver()
r2 := newReceiver()
eh1 := &ErrHost{Host: h1}
routing1 := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore())
bsnet1 := bsnet.NewFromIpfsHost(eh1, routing1)
bsnet1.SetDelegate(r1)

h2, err := mn.AddPeer(p2.PrivateKey(), p2.Address())
if err != nil {
t.Fatal(err)
}

// Create a special host that we can force to start returning errors
eh2 := &ErrHost{Host: h2}
routing2 := mr.ClientWithDatastore(context.TODO(), p2, ds.NewMapDatastore())
bsnet2 := bsnet.NewFromIpfsHost(eh2, routing2)
bsnet2.SetDelegate(r2)

err = mn.LinkAll()
Expand All @@ -307,6 +304,20 @@ func TestMessageResendAfterError(t *testing.T) {
msg := bsmsg.New(false)
msg.AddEntry(block1.Cid(), 1, pb.Message_Wantlist_Block, true)

return eh1, bsnet1, eh2, bsnet2, msg
}

func TestMessageResendAfterError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

p1 := tnet.RandIdentityOrFatal(t)
r1 := newReceiver()
p2 := tnet.RandIdentityOrFatal(t)
r2 := newReceiver()

eh, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)

testSendErrorBackoff := 100 * time.Millisecond
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
MaxRetries: 3,
Expand Down Expand Up @@ -345,54 +356,12 @@ func TestMessageSendTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// create network
mn := mocknet.New(ctx)
mr := mockrouting.NewServer()
streamNet, err := tn.StreamNet(ctx, mn, mr)
if err != nil {
t.Fatal("Unable to setup network")
}
p1 := tnet.RandIdentityOrFatal(t)
p2 := tnet.RandIdentityOrFatal(t)

h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address())
if err != nil {
t.Fatal(err)
}

// Create a special host that we can force to start timing out
eh := &ErrHost{Host: h1}
routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore())
bsnet1 := bsnet.NewFromIpfsHost(eh, routing)

bsnet2 := streamNet.Adapter(p2)
r1 := newReceiver()
p2 := tnet.RandIdentityOrFatal(t)
r2 := newReceiver()
bsnet1.SetDelegate(r1)
bsnet2.SetDelegate(r2)

err = mn.LinkAll()
if err != nil {
t.Fatal(err)
}
err = bsnet1.ConnectTo(ctx, p2.ID())
if err != nil {
t.Fatal(err)
}
isConnected := <-r1.connectionEvent
if !isConnected {
t.Fatal("Expected connect event")
}

err = bsnet2.ConnectTo(ctx, p1.ID())
if err != nil {
t.Fatal(err)
}

blockGenerator := blocksutil.NewBlockGenerator()
block1 := blockGenerator.Next()
msg := bsmsg.New(false)
msg.AddEntry(block1.Cid(), 1, pb.Message_Wantlist_Block, true)
eh, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)

ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
MaxRetries: 3,
Expand All @@ -416,7 +385,7 @@ func TestMessageSendTimeout(t *testing.T) {
select {
case <-time.After(500 * time.Millisecond):
t.Fatal("Did not receive disconnect event")
case isConnected = <-r1.connectionEvent:
case isConnected := <-r1.connectionEvent:
if isConnected {
t.Fatal("Expected disconnect event (got connect event)")
}
Expand All @@ -427,57 +396,15 @@ func TestMessageSendNotSupportedResponse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// create network
mn := mocknet.New(ctx)
mr := mockrouting.NewServer()
streamNet, err := tn.StreamNet(ctx, mn, mr)
if err != nil {
t.Fatal("Unable to setup network")
}
p1 := tnet.RandIdentityOrFatal(t)
p2 := tnet.RandIdentityOrFatal(t)

h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address())
if err != nil {
t.Fatal(err)
}

// Create a special host that responds with ErrNotSupported
eh := &ErrHost{Host: h1}
routing := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore())
bsnet1 := bsnet.NewFromIpfsHost(eh, routing)

bsnet2 := streamNet.Adapter(p2)
r1 := newReceiver()
p2 := tnet.RandIdentityOrFatal(t)
r2 := newReceiver()
bsnet1.SetDelegate(r1)
bsnet2.SetDelegate(r2)

err = mn.LinkAll()
if err != nil {
t.Fatal(err)
}
err = bsnet1.ConnectTo(ctx, p2.ID())
if err != nil {
t.Fatal(err)
}
isConnected := <-r1.connectionEvent
if !isConnected {
t.Fatal("Expected connect event")
}

err = bsnet2.ConnectTo(ctx, p1.ID())
if err != nil {
t.Fatal(err)
}

blockGenerator := blocksutil.NewBlockGenerator()
block1 := blockGenerator.Next()
msg := bsmsg.New(false)
msg.AddEntry(block1.Cid(), 1, pb.Message_Wantlist_Block, true)
eh, bsnet1, _, _, _ := prepareNetwork(t, ctx, p1, r1, p2, r2)

eh.setError(multistream.ErrNotSupported)
_, err = bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
_, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
MaxRetries: 3,
SendTimeout: 100 * time.Millisecond,
SendErrorBackoff: 100 * time.Millisecond,
Expand All @@ -489,7 +416,7 @@ func TestMessageSendNotSupportedResponse(t *testing.T) {
select {
case <-time.After(500 * time.Millisecond):
t.Fatal("Did not receive disconnect event")
case isConnected = <-r1.connectionEvent:
case isConnected := <-r1.connectionEvent:
if isConnected {
t.Fatal("Expected disconnect event (got connect event)")
}
Expand Down Expand Up @@ -541,3 +468,96 @@ func TestSupportsHave(t *testing.T) {
}
}
}

func testNetworkCounters(t *testing.T, n1 int, n2 int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

p1 := tnet.RandIdentityOrFatal(t)
r1 := newReceiver()
p2 := tnet.RandIdentityOrFatal(t)
r2 := newReceiver()

_, bsnet1, _, bsnet2, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)

for n := 0; n < n1; n++ {
ctx, cancel := context.WithTimeout(ctx, time.Second)
err := bsnet1.SendMessage(ctx, p2.ID(), msg)
if err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("p2 did not receive message sent")
case <-r2.messageReceived:
for j := 0; j < 2; j++ {
err := bsnet2.SendMessage(ctx, p1.ID(), msg)
if err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("p1 did not receive message sent")
case <-r1.messageReceived:
}
}
}
cancel()
}

if n2 > 0 {
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{})
if err != nil {
t.Fatal(err)
}
for n := 0; n < n2; n++ {
ctx, cancel := context.WithTimeout(ctx, time.Second)
err = ms.SendMsg(ctx, msg)
if err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("p2 did not receive message sent")
case <-r2.messageReceived:
for j := 0; j < 2; j++ {
err := bsnet2.SendMessage(ctx, p1.ID(), msg)
if err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
t.Fatal("p1 did not receive message sent")
case <-r1.messageReceived:
}
}
}
cancel()
}
}

// Wait a little for MessagesRecvd counters to be updated
// after receiver.ReceiveMessage() returns. FIXME: Can we
// use some network event instead of a timer?
ctxwait, cancelwait := context.WithTimeout(ctx, 100*time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be possible to use bsnet.host.Network().Notify(listener) to listen for ClosedStream(n network.Network, v network.Stream).
Could you give that a shot? If it's more than half an hour of work no worries I think this is good enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was interesting enough. Now the receiver has listener network.Notifiee field that is used by prepareNetwork() to set up the listener before the network becomes active (that's important). Also I've found that every MessageSender needs to be closed ;) and added .Close() calls to all tests in the file.

defer cancelwait()
<-ctxwait.Done()

if bsnet1.Stats().MessagesSent != uint64(n1+n2) {
t.Fatal(fmt.Errorf("expected %d sent messages, got %d", n1+n2, bsnet1.Stats().MessagesSent))
}

if bsnet2.Stats().MessagesRecvd != uint64(n1+n2) {
t.Fatal(fmt.Errorf("expected %d received messages, got %d", n1+n2, bsnet2.Stats().MessagesRecvd))
}

if bsnet1.Stats().MessagesRecvd != 2*uint64(n1+n2) {
t.Fatal(fmt.Errorf("expected %d received reply messages, got %d", 2*(n1+n2), bsnet1.Stats().MessagesRecvd))
}
}

func TestNetworkCounters(t *testing.T) {
for n := 0; n < 11; n++ {
testNetworkCounters(t, 10-n, n)
}
}