Skip to content

Commit

Permalink
Merge pull request #22 from jumpeiMano/feature/add-error-messages
Browse files Browse the repository at this point in the history
Add logger and set timeout in newConn()
  • Loading branch information
jumpeiMano authored Feb 15, 2019
2 parents 0c72fab + 424e953 commit 30bd0a7
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 19 deletions.
3 changes: 1 addition & 2 deletions cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,7 @@ func (cp *ConnectionPool) getOrGat(command string, exp int64, keys []string) ([]
}
select {
case <-ctx.Done():
err = ErrCanceldByContext
return results, err
return results, ErrCanceldByContext
case err = <-ec:
if err != nil {
return results, err
Expand Down
20 changes: 14 additions & 6 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package memcached

import (
"bufio"
"context"
"io"
"net"
"strings"
Expand Down Expand Up @@ -39,6 +40,9 @@ var (
)

func newConn(cp *ConnectionPool) (*conn, error) {
ctx, cancel := context.WithTimeout(context.Background(), cp.connectTimeout*2)
defer cancel()

ls := len(cp.servers)
now := time.Now()
c := &conn{
Expand Down Expand Up @@ -66,6 +70,7 @@ func newConn(cp *ConnectionPool) (*conn, error) {
ec <- err1
return
}
cp.logf("Failed connect to %s", node)
c.removeNode(node)
mu.Lock()
c.ncs[node] = &nc{
Expand All @@ -76,13 +81,15 @@ func newConn(cp *ConnectionPool) (*conn, error) {
}(s)
}
for range cp.servers {
if err1 := <-ec; err1 != nil {
err = err1
select {
case <-ctx.Done():
return c, ErrCanceldByContext
case err = <-ec:
if err != nil {
return c, err
}
}
}
if err != nil {
return c, err
}

var existsAlive bool
for _, nc := range c.ncs {
Expand Down Expand Up @@ -173,8 +180,8 @@ func (c *conn) tryReconnect() {
}
defer func() {
c.Lock()
defer c.Unlock()
c.nextTryReconnectAt = now.Add(c.cp.tryReconnectPeriod)
c.Unlock()
}()
notAliveNodes := make([]string, 0, len(c.ncs))
for node, nc := range c.ncs {
Expand All @@ -190,6 +197,7 @@ func (c *conn) tryReconnect() {
if _s == nil {
continue
}
c.cp.logf("Trying reconnect to %s", n)
go func(s *Server, node string) {
nc, err := c.newNC(s)
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package memcached

import (
"log"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -13,6 +15,7 @@ func TestNewConn(t *testing.T) {
}
_cp := New(_ss, "")
defer _cp.Close()
_cp.SetLogger(log.Printf)
_, err := newConn(_cp)
assert.NotNil(t, err)

Expand All @@ -21,3 +24,27 @@ func TestNewConn(t *testing.T) {
assert.Nil(t, err)
c2.close()
}

func TestTryReconnect(t *testing.T) {
_ss := []Server{
{Host: "127.0.0.1", Port: 11211, Alias: "1"},
{Host: "127.0.0.1", Port: 11212, Alias: "2"},
}
_cp := New(_ss, "")
defer _cp.Close()
c, err := newConn(_cp)
if err != nil {
t.Fatalf("Failed newConn:%v", err)
}
_cp.SetFailover(true)
_cp.SetLogger(log.Printf)
c.removeNode("2")
c.ncs["2"] = &nc{
isAlive: false,
}
c.tryReconnect()
time.Sleep(10 * time.Millisecond)
c.RLock()
defer c.RUnlock()
assert.Equal(t, true, c.ncs["2"].isAlive)
}
29 changes: 18 additions & 11 deletions connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ const (
defaultPollTimeout = 1 * time.Second
defaultTryReconnectPeriod = 60 * time.Second
defaultKeepAlivePeriod = 60 * time.Second

contextDeadlineExceededErrorString = "context deadline exceeded"
)

// ConnectionPool struct
Expand All @@ -43,6 +41,8 @@ type ConnectionPool struct {
maxOpen int // maximum amount of connection num. maxOpen <= 0 means unlimited.
cleanerCh chan struct{}
closed bool

logf func(format string, params ...interface{})
}

// Servers are slice of Server.
Expand Down Expand Up @@ -104,6 +104,7 @@ func New(servers Servers, prefix string) (cp *ConnectionPool) {
cp.cancelTimeout = defaultPollTimeout + (3 * time.Second)
cp.tryReconnectPeriod = defaultTryReconnectPeriod
cp.keepAlivePeriod = defaultKeepAlivePeriod
cp.logf = log.Printf

go cp.opener()

Expand Down Expand Up @@ -222,7 +223,7 @@ func (cp *ConnectionPool) _conn(ctx context.Context, useFreeConn bool) (*conn, e
default:
case <-ctx.Done():
cp.mu.Unlock()
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "the context is expired")
}
lifetime := cp.maxLifetime

Expand All @@ -239,7 +240,7 @@ func (cp *ConnectionPool) _conn(ctx context.Context, useFreeConn bool) (*conn, e
}
c.tryReconnect()
err := c.setDeadline()
return c, err
return c, errors.Wrap(err, "Failed setDeadline")
}

if cp.maxOpen > 0 && cp.maxOpen <= cp.numOpen {
Expand All @@ -264,17 +265,17 @@ func (cp *ConnectionPool) _conn(ctx context.Context, useFreeConn bool) (*conn, e
}
default:
}
return nil, ctx.Err()
return nil, errors.Wrap(ctx.Err(), "Deadline of connRequests exceeded")
case ret, ok := <-req:
if !ok {
return nil, ErrMemcachedClosed
}
if ret.err != nil {
return ret.conn, ret.err
return ret.conn, errors.Wrap(ret.err, "Response has an error")
}
ret.conn.tryReconnect()
err := ret.conn.setDeadline()
return ret.conn, err
return ret.conn, errors.Wrap(err, "Failed setDeadline in response")
}
}

Expand All @@ -286,10 +287,10 @@ func (cp *ConnectionPool) _conn(ctx context.Context, useFreeConn bool) (*conn, e
defer cp.mu.Unlock()
cp.numOpen--
cp.maybeOpenNewConnections()
return nil, err
return nil, errors.Wrap(err, "Failed newConn")
}
err = newCn.setDeadline()
return newCn, err
return newCn, errors.Wrap(err, "Failed setDeadline of new conn")
}

// SetConnMaxLifetime sets the maximum amount of time a connection may be reused.
Expand Down Expand Up @@ -357,6 +358,13 @@ func (cp *ConnectionPool) SetFailover(failover bool) {
cp.failover = failover
}

// SetLogger is used to set logger
func (cp *ConnectionPool) SetLogger(logf func(format string, params ...interface{})) {
cp.mu.Lock()
defer cp.mu.Unlock()
cp.logf = logf
}

func (cp *ConnectionPool) needStartCleaner() bool {
return cp.maxLifetime > 0 &&
cp.numOpen > 0 &&
Expand Down Expand Up @@ -478,6 +486,5 @@ func needCloseConn(err error) bool {
errcouse == ErrServer ||
errcouse == ErrCanceldByContext ||
errcouse == context.DeadlineExceeded ||
errcouse == context.Canceled ||
errcouse.Error() == contextDeadlineExceededErrorString
errcouse == context.Canceled
}

0 comments on commit 30bd0a7

Please sign in to comment.