Skip to content

Commit

Permalink
Merge pull request ipfs#91 from libp2p/feat/update-stream-muxer
Browse files Browse the repository at this point in the history
update stream muxer
  • Loading branch information
Stebalien authored Sep 14, 2017
2 parents 228ce2e + 5bc13bf commit d0ca2e1
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 58 deletions.
117 changes: 68 additions & 49 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
// receive msg
pmes := new(pb.Message)
if err := r.ReadMsg(pmes); err != nil {
s.Reset()
log.Debugf("Error unmarshaling data: %s", err)
return
}
Expand All @@ -45,13 +46,15 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
// get handler for this msg type.
handler := dht.handlerForMsgType(pmes.GetType())
if handler == nil {
s.Reset()
log.Debug("got back nil handler from handlerForMsgType")
return
}

// dispatch handler.
rpmes, err := handler(ctx, mPeer, pmes)
if err != nil {
s.Reset()
log.Debugf("handle message error: %s", err)
return
}
Expand All @@ -64,6 +67,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {

// send out response msg
if err := w.WriteMsg(rpmes); err != nil {
s.Reset()
log.Debugf("send response error: %s", err)
return
}
Expand Down Expand Up @@ -161,73 +165,88 @@ const streamReuseTries = 3
func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error {
ms.lk.Lock()
defer ms.lk.Unlock()
if err := ms.prep(); err != nil {
return err
}

if err := ms.writeMessage(pmes); err != nil {
return err
}

if ms.singleMes > streamReuseTries {
ms.s.Close()
ms.s = nil
}

return nil
}

func (ms *messageSender) writeMessage(pmes *pb.Message) error {
err := ms.w.WriteMsg(pmes)
if err != nil {
// If the other side isnt expecting us to be reusing streams, we're gonna
// end up erroring here. To make sure things work seamlessly, lets retry once
// before continuing

log.Infof("error writing message: ", err)
ms.s.Close()
ms.s = nil
retry := false
for {
if err := ms.prep(); err != nil {
return err
}

if err := ms.w.WriteMsg(pmes); err != nil {
return err
ms.s.Reset()
ms.s = nil

if retry {
log.Info("error writing message, bailing: ", err)
return err
} else {
log.Info("error writing message, trying again: ", err)
retry = true
continue
}
}

log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

if ms.singleMes > streamReuseTries {
ms.s.Close()
ms.s = nil
} else if retry {
ms.singleMes++
}

// keep track of this happening. If it happens a few times, its
// likely we can assume the otherside will never support stream reuse
ms.singleMes++
return nil
}
return nil
}

func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) {
ms.lk.Lock()
defer ms.lk.Unlock()
if err := ms.prep(); err != nil {
return nil, err
}
retry := false
for {
if err := ms.prep(); err != nil {
return nil, err
}

if err := ms.writeMessage(pmes); err != nil {
return nil, err
}
if err := ms.w.WriteMsg(pmes); err != nil {
ms.s.Reset()
ms.s = nil

if retry {
log.Info("error writing message, bailing: ", err)
return nil, err
} else {
log.Info("error writing message, trying again: ", err)
retry = true
continue
}
}

log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)
mes := new(pb.Message)
if err := ms.ctxReadMsg(ctx, mes); err != nil {
ms.s.Reset()
ms.s = nil

if retry {
log.Info("error reading message, bailing: ", err)
return nil, err
} else {
log.Info("error reading message, trying again: ", err)
retry = true
continue
}
}

mes := new(pb.Message)
if err := ms.ctxReadMsg(ctx, mes); err != nil {
ms.s.Close()
ms.s = nil
return nil, err
}
log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

if ms.singleMes > streamReuseTries {
ms.s.Close()
ms.s = nil
}
if ms.singleMes > streamReuseTries {
ms.s.Close()
ms.s = nil
} else if retry {
ms.singleMes++
}

return mes, nil
return mes, nil
}
}

func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error {
Expand Down
66 changes: 65 additions & 1 deletion ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ func TestNotFound(t *testing.T) {
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}

default:
panic("Shouldnt recieve this.")
}
Expand Down Expand Up @@ -288,3 +287,68 @@ func TestLessThanKResponses(t *testing.T) {
}
t.Fatal("Expected to recieve an error.")
}

// Test multiple queries against a node that closes its stream after every query.
func TestMultipleQueries(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 2)
if err != nil {
t.Fatal(err)
}
hosts := mn.Hosts()
tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, hosts[0], tsds)

d.Update(ctx, hosts[1].ID())

// It would be nice to be able to just get a value and succeed but then
// we'd need to deal with selectors and validators...
hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
defer s.Close()

pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
pbw := ggio.NewDelimitedWriter(s)

pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
}

switch pmes.GetType() {
case pb.Message_GET_VALUE:
pi := hosts[1].Peerstore().PeerInfo(hosts[0].ID())
resp := &pb.Message{
Type: pmes.Type,
CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []pstore.PeerInfo{pi}),
}

if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
default:
panic("Shouldnt recieve this.")
}
})

// long timeout to ensure timing is not at play.
ctx, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()
for i := 0; i < 10; i++ {
if _, err := d.GetValue(ctx, "hello"); err != nil {
switch err {
case routing.ErrNotFound:
//Success!
continue
case u.ErrTimeout:
t.Fatal("Should not have gotten timeout!")
default:
t.Fatalf("Got unexpected error: %s", err)
}
}
t.Fatal("Expected to recieve an error.")
}
}
16 changes: 8 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,21 @@
},
{
"author": "whyrusleeping",
"hash": "QmUwW8jMQDxXhLD2j4EfWqLEMX3MsvyWcWGvJPVDh1aTmu",
"hash": "QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6",
"name": "go-libp2p-host",
"version": "1.3.19"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmXZyBQMkqSYigxhJResC6fLWDGFhbphK67eZoqMDUvBmK",
"hash": "QmZ3ma9g2NTg7GNF1ntWNRa1GW9jVzGq8UE9cKCwRKv6dS",
"name": "go-libp2p",
"version": "4.5.5"
"version": "5.0.1"
},
{
"author": "whyrusleeping",
"hash": "QmahYsGWry85Y7WUe2SX5G4JkH2zifEQAUtJVLZ24aC9DF",
"hash": "QmNa31VPzC561NWwRsJLE7nGYZYuuD2QfpK2b1q9BK54J1",
"name": "go-libp2p-net",
"version": "1.6.12"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
Expand All @@ -156,9 +156,9 @@
},
{
"author": "whyrusleeping",
"hash": "QmQ1bJEsmdEiGfTQRoj6CsshWmAKduAEDEbwzbvk5QT5Ui",
"hash": "QmP4cEjmvf8tC6ykxKXrvmYLo8vqtGsgduMatjbAKnBzv8",
"name": "go-libp2p-netutil",
"version": "0.2.25"
"version": "0.3.1"
},
{
"author": "multiformats",
Expand Down

0 comments on commit d0ca2e1

Please sign in to comment.