Skip to content

Commit

Permalink
Merge pull request #20 from smira/19-wrong-address
Browse files Browse the repository at this point in the history
Fix sendLoop not exiting when connection can't be established
  • Loading branch information
smira authored May 15, 2019
2 parents 512a37d + 4b89458 commit 448e76c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 4 deletions.
7 changes: 7 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ func setupListener(t *testing.T) (*net.UDPConn, chan []byte) {
return inSocket, received
}

func TestWrongAddress(t *testing.T) {
client := NewClient("BOOM:BOOM")
if err := client.Close(); err != nil {
t.Errorf("error from close: %v", err)
}
}

func TestCommands(t *testing.T) {
inSocket, received := setupListener(t)

Expand Down
33 changes: 29 additions & 4 deletions loops.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ SOFTWARE.
*/

import (
"context"
"net"
"sync/atomic"
"time"
Expand Down Expand Up @@ -69,6 +70,8 @@ func (t *transport) sendLoop(addr string, reconnectInterval, retryTimeout time.D
reconnectC <-chan time.Time
)

defer t.shutdownWg.Done()

if reconnectInterval > 0 {
reconnectTicker := time.NewTicker(reconnectInterval)
defer reconnectTicker.Stop()
Expand All @@ -77,7 +80,23 @@ func (t *transport) sendLoop(addr string, reconnectInterval, retryTimeout time.D

RECONNECT:
// Attempt to connect
sock, err = net.Dial("udp", addr)
sock, err = func() (net.Conn, error) {
// Dial with context which is aborted when client is shut down
ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()

go func() {
select {
case <-t.shutdown:
ctxCancel()
case <-ctx.Done():
}
}()

var d net.Dialer
return d.DialContext(ctx, "udp", addr)
}()

if err != nil {
log.Printf("[STATSD] Error connecting to server: %s", err)
goto WAIT
Expand All @@ -89,7 +108,6 @@ RECONNECT:
// Get a buffer from the queue
if !ok {
_ = sock.Close() // nolint: gosec
t.shutdownWg.Done()
return
}

Expand Down Expand Up @@ -117,8 +135,15 @@ RECONNECT:

WAIT:
// Wait for a while
time.Sleep(retryTimeout)
goto RECONNECT
select {
case <-time.After(retryTimeout):
goto RECONNECT
case <-t.shutdown:
}

// drain send queue waiting for flush loops to terminate
for range t.sendQueue {
}
}

// reportLoop reports periodically number of packets lost
Expand Down

0 comments on commit 448e76c

Please sign in to comment.