Skip to content

Commit

Permalink
improve negotiation flushing
Browse files Browse the repository at this point in the history
First, this change exposes a Flush function so users can flush a handshake without writing or
reading. We need this in libp2p to flush before closing for writing.

Second, this change flushes on Close. We can drop the read half of the handshake, but we need to
send the write half. Otherwise, we could end up with the following situation:

1. A: Send data.
2. B: Receive data.
3. B: Close the stream. (no flush)
4. A: Wait for an EOF from the stream (ensure receipt of data).
5. B: ERROR: the stream was closed but no multistream header was sent.

This is _slightly_ unfortunate as Close should ideally never block. But close
_is_ allowed to flush and the alternative is to spawn a goroutine.
  • Loading branch information
Stebalien committed Oct 24, 2020
1 parent ff9dcd5 commit a49f9df
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 13 deletions.
Empty file added cases.md
Empty file.
32 changes: 22 additions & 10 deletions lazyClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,9 @@ import (
"sync"
)

// Multistream represents in essense a ReadWriteCloser, or a single
// communication wire which supports multiple streams on it. Each
// stream is identified by a protocol tag.
type Multistream interface {
io.ReadWriteCloser
}

// NewMSSelect returns a new Multistream which is able to perform
// protocol selection with a MultistreamMuxer.
func NewMSSelect(c io.ReadWriteCloser, proto string) Multistream {
func NewMSSelect(c io.ReadWriteCloser, proto string) LazyConn {
return &lazyClientConn{
protos: []string{ProtocolID, proto},
con: c,
Expand All @@ -25,7 +18,7 @@ func NewMSSelect(c io.ReadWriteCloser, proto string) Multistream {
// NewMultistream returns a multistream for the given protocol. This will not
// perform any protocol selection. If you are using a MultistreamMuxer, use
// NewMSSelect.
func NewMultistream(c io.ReadWriteCloser, proto string) Multistream {
func NewMultistream(c io.ReadWriteCloser, proto string) LazyConn {
return &lazyClientConn{
protos: []string{proto},
con: c,
Expand Down Expand Up @@ -139,5 +132,24 @@ func (l *lazyClientConn) Write(b []byte) (int, error) {

// Close closes the underlying io.ReadWriteCloser
func (l *lazyClientConn) Close() error {
return l.con.Close()
// We must flush the handshake on a "nice" close.
// Otherwise, if the other side is actually waiting for our close (i.e.,
// reading until EOF), they may get an error even though we received the
// request.
flushErr := l.Flush()
// But we close anyways because close should always close.
closeErr := l.con.Close()
if flushErr != nil {
return flushErr
}
return closeErr
}

// Flush sends the handshake.
func (l *lazyClientConn) Flush() error {
l.whandshakeOnce.Do(func() {
go l.rhandshakeOnce.Do(l.doReadHandshake)
l.doWriteHandshake()
})
return l.werr
}
14 changes: 13 additions & 1 deletion lazyServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,17 @@ func (l *lazyServerConn) Read(b []byte) (int, error) {
}

func (l *lazyServerConn) Close() error {
return l.con.Close()
// We must flush the handshake on a "nice" close.
flushErr := l.Flush()
closeErr := l.con.Close()
if flushErr != nil {
return flushErr
}
return closeErr
}

// Flush sends the handshake.
func (l *lazyServerConn) Flush() error {
l.waitForHandshake.Do(func() { panic("didn't initiate handshake") })
return l.werr
}
10 changes: 9 additions & 1 deletion multistream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bufio"
"bytes"
"errors"

"io"
"sync"

Expand Down Expand Up @@ -51,6 +52,13 @@ func NewMultistreamMuxer() *MultistreamMuxer {
return new(MultistreamMuxer)
}

// LazyConn is the connection type returned by the lazy negotiation functions.
type LazyConn interface {
io.ReadWriteCloser
// Flush flushes the lazy negotiation, if any.
Flush() error
}

func writeUvarint(w io.Writer, i uint64) error {
varintbuf := make([]byte, 16)
n := varint.PutUvarint(varintbuf, i)
Expand Down Expand Up @@ -201,7 +209,7 @@ func (msm *MultistreamMuxer) findHandler(proto string) *Handler {
// a multistream, the protocol used, the handler and an error. It is lazy
// because the write-handshake is performed on a subroutine, allowing this
// to return before that handshake is completed.
func (msm *MultistreamMuxer) NegotiateLazy(rwc io.ReadWriteCloser) (io.ReadWriteCloser, string, HandlerFunc, error) {
func (msm *MultistreamMuxer) NegotiateLazy(rwc io.ReadWriteCloser) (LazyConn, string, HandlerFunc, error) {
pval := make(chan string, 1)
writeErr := make(chan error, 1)
defer close(pval)
Expand Down
2 changes: 1 addition & 1 deletion multistream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestProtocolNegotiationLazy(t *testing.T) {
mux.AddHandler("/b", nil)
mux.AddHandler("/c", nil)

var ac Multistream
var ac LazyConn
done := make(chan struct{})
go func() {
m, selected, _, err := mux.NegotiateLazy(a)
Expand Down

0 comments on commit a49f9df

Please sign in to comment.