Skip to content

Commit

Permalink
Merge pull request #52 from multiformats/steb/fix-for-rwstreams
Browse files Browse the repository at this point in the history
improve negotiation flushing
  • Loading branch information
Stebalien authored Nov 11, 2020
2 parents ff9dcd5 + 4e08419 commit 548b202
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 10 deletions.
Empty file added cases.md
Empty file.
31 changes: 22 additions & 9 deletions lazyClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,9 @@ import (
"sync"
)

// Multistream represents in essense a ReadWriteCloser, or a single
// communication wire which supports multiple streams on it. Each
// stream is identified by a protocol tag.
type Multistream interface {
io.ReadWriteCloser
}

// NewMSSelect returns a new Multistream which is able to perform
// protocol selection with a MultistreamMuxer.
func NewMSSelect(c io.ReadWriteCloser, proto string) Multistream {
func NewMSSelect(c io.ReadWriteCloser, proto string) LazyConn {
return &lazyClientConn{
protos: []string{ProtocolID, proto},
con: c,
Expand All @@ -25,7 +18,7 @@ func NewMSSelect(c io.ReadWriteCloser, proto string) Multistream {
// NewMultistream returns a multistream for the given protocol. This will not
// perform any protocol selection. If you are using a MultistreamMuxer, use
// NewMSSelect.
func NewMultistream(c io.ReadWriteCloser, proto string) Multistream {
func NewMultistream(c io.ReadWriteCloser, proto string) LazyConn {
return &lazyClientConn{
protos: []string{proto},
con: c,
Expand Down Expand Up @@ -138,6 +131,26 @@ func (l *lazyClientConn) Write(b []byte) (int, error) {
}

// Close closes the underlying io.ReadWriteCloser
//
// This does not flush anything.
func (l *lazyClientConn) Close() error {
// As the client, we flush the handshake on close to cover an
// interesting edge-case where the server only speaks a single protocol
// and responds eagerly with that protocol before waiting for out
// handshake.
//
// Again, we must not read the error because the other end may have
// closed the stream for reading. I mean, we're the initiator so that's
// strange... but it's still allowed
_ = l.Flush()
return l.con.Close()
}

// Flush sends the handshake.
func (l *lazyClientConn) Flush() error {
l.whandshakeOnce.Do(func() {
go l.rhandshakeOnce.Do(l.doReadHandshake)
l.doWriteHandshake()
})
return l.werr
}
14 changes: 14 additions & 0 deletions lazyServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,19 @@ func (l *lazyServerConn) Read(b []byte) (int, error) {
}

func (l *lazyServerConn) Close() error {
// As the server, we MUST flush the handshake on close. Otherwise, if
// the other side is actually waiting for our close (i.e., reading until
// EOF), they may get an error even though we received the request.
//
// However, we MUST NOT return any errors from Flush. The initiator may
// have already closed their side for reading. Basically, _we_ don't
// care about the outcome of this flush, only the other side does.
_ = l.Flush()
return l.con.Close()
}

// Flush sends the handshake.
func (l *lazyServerConn) Flush() error {
l.waitForHandshake.Do(func() { panic("didn't initiate handshake") })
return l.werr
}
8 changes: 8 additions & 0 deletions multistream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bufio"
"bytes"
"errors"

"io"
"sync"

Expand Down Expand Up @@ -51,6 +52,13 @@ func NewMultistreamMuxer() *MultistreamMuxer {
return new(MultistreamMuxer)
}

// LazyConn is the connection type returned by the lazy negotiation functions.
type LazyConn interface {
io.ReadWriteCloser
// Flush flushes the lazy negotiation, if any.
Flush() error
}

func writeUvarint(w io.Writer, i uint64) error {
varintbuf := make([]byte, 16)
n := varint.PutUvarint(varintbuf, i)
Expand Down
2 changes: 1 addition & 1 deletion multistream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestProtocolNegotiationLazy(t *testing.T) {
mux.AddHandler("/b", nil)
mux.AddHandler("/c", nil)

var ac Multistream
var ac io.ReadWriteCloser
done := make(chan struct{})
go func() {
m, selected, _, err := mux.NegotiateLazy(a)
Expand Down

0 comments on commit 548b202

Please sign in to comment.