From dd3fcf03d7410330a1e9f270bc7938757a279c4c Mon Sep 17 00:00:00 2001 From: Daniel Paul Carbone Date: Thu, 20 Sep 2018 12:11:25 -0500 Subject: [PATCH] Cleanup and bugfixes: - Eliminated a few race conditions made present in previous updates - Added "AutoRun" concept to both Session and Candidate - Candidate.WaitX() methods will now return an error if called on a candidate that is not actively in the running - Moved the old Candidate tests to the candidate package and updated them appropriately - Added some more logging at key parts of the session and candidate keepalive loops - Severely reducing the complexity of the Candidate keepalive loop - Session will now attempt to actively destroy the upstream session if the state goes out of whack (generally due to timeout) - Normal session "update" calls no longer block (may want to add unecessary complexity to this later...) - Session.Stop() now returns an error, which if not nil can probably be ignored most of the time --- candidate.go | 4 +- candidate/candidate.go | 239 +++++++++++++++------------- candidate/candidate_test.go | 240 +++++++++++++++++++++++++--- candidate/session/session.go | 161 +++++++++++++------ candidate/session/session_test.go | 57 +++++++ candidate/session/util.go | 8 + candidate_test.go | 250 ------------------------------ 7 files changed, 527 insertions(+), 432 deletions(-) delete mode 100644 candidate_test.go diff --git a/candidate.go b/candidate.go index 88d509b..6c5fe84 100644 --- a/candidate.go +++ b/candidate.go @@ -39,13 +39,11 @@ func NewCandidate(client *Client, candidateID, key, ttl string) (*Candidate, err ID: candidateID, SessionTTL: ttl, Client: client.Client, + AutoRun: true, }) if err != nil { return nil, err } - - c.Run() - return &Candidate{Candidate: c}, nil } diff --git a/candidate/candidate.go b/candidate/candidate.go index bbe3513..4572939 100644 --- a/candidate/candidate.go +++ b/candidate/candidate.go @@ -74,6 +74,11 @@ type ( // // Consul API client. If not specified, one will be created using api.DefaultConfig() Client *api.Client + + // AutoRun [optional] + // + // If set to true, the Candidate will immediately enter its election pool after successful construction + AutoRun bool } Candidate struct { @@ -99,15 +104,19 @@ type ( ) func New(conf *Config) (*Candidate, error) { - var id, kvKey, sessionTTL string - var client *api.Client - var err error + var ( + id, kvKey, sessionTTL string + client *api.Client + autoRun bool + err error + ) if conf != nil { kvKey = conf.KVKey id = conf.ID sessionTTL = conf.SessionTTL client = conf.Client + autoRun = conf.AutoRun } if kvKey == "" { @@ -148,10 +157,16 @@ func New(conf *Config) (*Candidate, error) { sessionTTL: sessionTTL, } + // attempt to create persistent session manager... if c.session, err = c.createSession(); err != nil { return nil, err } + if autoRun { + c.log.Debug("AutoRun enabled") + c.Run() + } + return c, nil } @@ -299,6 +314,10 @@ func (c *Candidate) UpdateWatchers() { func (c *Candidate) WaitFor(td time.Duration) error { var err error + if !c.Running() { + return fmt.Errorf("candidate %s is not in running", c.ID()) + } + timer := time.NewTimer(td) waitLoop: @@ -335,8 +354,8 @@ func (c *Candidate) WaitUntil(t time.Time) error { } // Wait will block until a leader has been elected, regardless of candidate. -func (c *Candidate) Wait() { - c.WaitFor(1<<63 - 1) +func (c *Candidate) Wait() error { + return c.WaitFor(1<<63 - 1) } func (c *Candidate) State() State { @@ -350,22 +369,28 @@ func (c *Candidate) Running() bool { return c.State() == StateRunning } +// sessionUpdate is the receiver for the session update callback func (c *Candidate) sessionUpdate(update session.Update) { - if c.session != nil && c.session.ID() == update.ID { - if c.Running() { - c.sessionUpdateChan <- update - } else { - c.log.Printf("We are no longer in the running, cannot process session update: %#v", update) + c.mu.Lock() + if c.session.ID() == update.ID { + c.mu.Unlock() + select { + case c.sessionUpdateChan <- update: + default: + c.log.Printf("Unable to push session update onto channel. Update: %#v") } } else { - c.log.Printf("Received update from rogue session: %s", update.ID) + c.mu.Unlock() + c.log.Printf("Received update from session %q but our local session is %q...", update.ID, c.session.ID()) } } // acquire will attempt to do just that. Caller must hold lock! func (c *Candidate) acquire(sid string) (bool, error) { - var err error - var elected bool + var ( + elected bool + err error + ) kvpValue := &LeaderKVValue{ LeaderAddress: c.id, @@ -398,14 +423,15 @@ func (c *Candidate) createSession() (*session.Session, error) { } func (c *Candidate) lockRunner() { - var sid string - var elected bool - var resigned chan struct{} - var updated bool - var sessionUpdate session.Update - var err error - - sessionErrorsSeen, sessionStoppedUpdatesSeen := 0, 0 + // TODO: this could stand for some further cleanup... + var ( + sid string + elected, updated bool + resigned chan struct{} + sessionUpdate session.Update + consecutiveSessionErrorCount int + err error + ) // run initial session c.session.Run() @@ -425,100 +451,103 @@ acquisition: select { case <-acquireTicker.C: c.mu.Lock() - - if c.session == nil { - // it is possible for the session to be nil if the sessionUpdate case is unable to recreate upon error - // threshold being met - - // set elected state to false, will be updated later. + if c.session.Running() { + // if our session manager is still running + if sid = c.session.ID(); sid == "" { + // this should only ever happen very early on in the election process + elected = false + updated = c.elected != nil && *c.elected != elected + c.log.Debugf("Acquire tick: Session does not exist, will try locking again in %d seconds...", int64(interval.Seconds())) + } else if elected, err = c.acquire(sid); err != nil { + // most likely hit due to transport error. + updated = c.elected != nil && *c.elected != elected + c.log.Printf("Acquire tick: Error attempting to acquire lock: %s", err) + } else { + // if c.elected is nil, indicating this is the initial election loop, or if the election state + // changed mark update as true + updated = c.elected == nil || *c.elected != elected + } + } else { + // if we are below the threshold, just try to restart existing session + c.log.Printf("Acquire tick: Session is in stopped state, attempting to restart...") elected = false - - // only send an update if a previous election attempt was successful and our state changed updated = c.elected != nil && *c.elected != elected + c.session.Run() + } - c.log.Print("Acquire tick: No session, will try to create...") - if c.session, err = c.createSession(); err != nil { - c.log.Printf("Acquire tick: Error creating session, will try again in %d seconds. Err: %s", int64(interval.Seconds()), err) + // if updated + if updated { + if elected { + c.log.Debug("We have won the election") } else { - c.session.Run() - c.log.Printf("Acquire tick: Session created successfully") + c.log.Debug("We have lost the election") } - } else if sid = c.session.ID(); sid == "" { - // this should only ever happen very early on in the election process - elected = false - updated = c.elected != nil && *c.elected != elected - c.log.Debugf("Acquire tick: Session does not exist, will try locking again in %d seconds...", int64(interval.Seconds())) - } else if elected, err = c.acquire(sid); err != nil { - // most likely hit due to transport error. - elected = false - updated = c.elected != nil && *c.elected != elected - c.log.Printf("Acquire tick: Error attempting to acquire lock: %s", err) + + // update internal state + *c.elected = elected + + // send notifications + up.Elected = elected + c.mu.Unlock() + + c.watchers.notify(*up) } else { - updated = c.elected == nil || *c.elected != elected + c.mu.Unlock() } - c.mu.Unlock() - case sessionUpdate = <-c.sessionUpdateChan: c.mu.Lock() - // if there was an update either creating or renewing our session if sessionUpdate.Error != nil { - sessionErrorsSeen++ - sessionStoppedUpdatesSeen = 0 - - c.log.Printf("Session Update: Error (%d in a row): %s", sessionErrorsSeen, sessionUpdate.Error) - - // if we breach this threshold, stop our current session and attempt to make a new one next pass - if sessionErrorsSeen >= 2 { - c.log.Print("Session Update: 2 successive errors seen, will construct new session") - c.session.Stop() + // if there was an update either creating or renewing our session + consecutiveSessionErrorCount++ + c.log.Printf("Session Update: Error (%d in a row): %s", consecutiveSessionErrorCount, sessionUpdate.Error) + if sessionUpdate.State == session.StateRunning && consecutiveSessionErrorCount > 2 { + // if the session is still running but we've seen more than 2 errors, attempt a stop -> start cycle + c.log.Print("Session Update: 2 successive errors seen, stopping session") + if err = c.session.Stop(); err != nil { + c.log.Printf("Session update: Error stopping session: %s", err) + } elected = false - c.session = nil + updated = c.elected != nil && *c.elected != elected } + // do not modify elected state here unless we've breached the threshold. could just be a temporary + // issue } else if sessionUpdate.State == session.StateStopped { - sessionStoppedUpdatesSeen++ - sessionErrorsSeen = 0 - + // if somehow the session state became stopped (this should basically never happen...), do not attempt + // to kickstart session here. test if we need to update candidate state and notify watchers, then move + // on. next acquire tick will attempt to restart session. + consecutiveSessionErrorCount = 0 elected = false - - c.log.Printf("Session Update: Stopped state seen (%d in row): %#v", sessionStoppedUpdatesSeen, sessionUpdate) - - if sessionStoppedUpdatesSeen >= 2 { - c.log.Print("Session Update: Stopped state seen 2 successive times, will construct new session") - if c.session, err = c.createSession(); err != nil { - c.log.Printf("Unable to recreate will try again in %d seconds. Err: %s.", int64(interval.Seconds()), err) - } else { - c.log.Print("Session created successfully") - c.session.Run() - } - } + updated = c.elected != nil && *c.elected != elected + c.log.Printf("Session Update: Stopped state seen: %#v", sessionUpdate) } else { + // if we got a non-error / non-stopped update, there is nothing to do. + consecutiveSessionErrorCount = 0 c.log.Debugf("Session Update: Received %#v", sessionUpdate) } - updated = c.elected != nil && *c.elected != elected - - c.mu.Unlock() - - case resigned = <-c.resign: - break acquisition - } - - // if updated - if updated { - if elected { - c.log.Debug("We have won the election") + // if updated + if updated { + // this should only ever hit if we breach the error threshold or our session stopped running + if elected { + c.log.Debug("We have won the election") + } else { + c.log.Debug("We have lost the election") + } + // update internal state + *c.elected = elected + // modify update payload + up.Elected = elected + c.mu.Unlock() + // send notifications after unlocking + c.watchers.notify(*up) } else { - c.log.Debug("We have lost the election") + c.mu.Unlock() } - // update internal state - *c.elected = elected - - // send notifications - up.Elected = elected - c.watchers.notify(*up) + case resigned = <-c.resign: + break acquisition } } @@ -528,40 +557,30 @@ acquisition: c.mu.Lock() - // modify state + // modify internal state *c.elected = false c.state = StateResigned // send notifications up.Elected = false up.State = StateResigned - c.watchers.notify(*up) - - done := make(chan struct{}) - // test for session nil, if so spin up new routine that waits for session to be destroyed. - // TODO: do something more clever with the session's update system instead of this chan? if c.session != nil { - go func() { - // stop session, this might block for a bit - c.session.Stop() - done <- struct{}{} - }() - } else { - done <- struct{}{} + if err = c.session.Stop(); err != nil { + c.log.Printf("Error stopping session: %s", err) + } } // release lock before the final steps so the object is usable c.mu.Unlock() - // if need be, wait for session to term - <-done - close(done) - - // notify our caller that we've finished with resignation + // just in case.... if resigned != nil { + // notify caller that we've stopped resigned <- struct{}{} } + c.watchers.notify(*up) + c.log.Print("Resigned") } diff --git a/candidate/candidate_test.go b/candidate/candidate_test.go index e16a14a..3ec2101 100644 --- a/candidate/candidate_test.go +++ b/candidate/candidate_test.go @@ -1,6 +1,7 @@ package candidate_test import ( + "fmt" "github.com/hashicorp/consul/api" cst "github.com/hashicorp/consul/testutil" "github.com/myENA/consultant" @@ -9,12 +10,14 @@ import ( "github.com/myENA/consultant/util" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "sync" "testing" ) const ( testKVKey = "consultant/test/candidate-test" testID = "test-candidate" + lockTTL = "5s" ) func init() { @@ -26,37 +29,30 @@ type CandidateTestSuite struct { server *cst.TestServer client *api.Client - - candidate *candidate.Candidate } func TestCandidate(t *testing.T) { - suite.Run(t, &CandidateTestSuite{}) + suite.Run(t, new(CandidateTestSuite)) } -func (cs *CandidateTestSuite) SetupSuite() { +func (cs *CandidateTestSuite) SetupTest() { server, client := testutil.MakeServerAndClient(cs.T(), nil) cs.server = server cs.client = client.Client } -func (cs *CandidateTestSuite) TearDownSuite() { - if cs.candidate != nil { - cs.candidate.Resign() +func (cs *CandidateTestSuite) TearDownTest() { + if cs.client != nil { + cs.client = nil } - cs.candidate = nil if cs.server != nil { cs.server.Stop() + cs.server = nil } - cs.server = nil - cs.client = nil } -func (cs *CandidateTestSuite) TearDownTest() { - if cs.candidate != nil { - cs.candidate.Resign() - } - cs.candidate = nil +func (cs *CandidateTestSuite) TearDownSuite() { + cs.TearDownTest() } func (cs *CandidateTestSuite) config(conf *candidate.Config) *candidate.Config { @@ -73,21 +69,36 @@ func (cs *CandidateTestSuite) configKeyed(conf *candidate.Config) *candidate.Con return conf } +func (cs *CandidateTestSuite) makeCandidate(num int, conf *candidate.Config) *candidate.Candidate { + lc := new(candidate.Config) + if conf != nil { + *lc = *conf + } + lc.ID = fmt.Sprintf("test-%d", num) + if lc.SessionTTL == "" { + lc.SessionTTL = lockTTL + } + cand, err := candidate.New(cs.configKeyed(lc)) + if err != nil { + cs.T().Fatalf("err: %v", err) + } + + return cand +} + func (cs *CandidateTestSuite) TestNew_EmptyKey() { _, err := candidate.New(cs.config(nil)) require.NotNil(cs.T(), err, "Expected Empty Key error") } func (cs *CandidateTestSuite) TestNew_EmptyID() { - var err error - - cs.candidate, err = candidate.New(cs.configKeyed(nil)) + cand, err := candidate.New(cs.configKeyed(nil)) require.Nil(cs.T(), err, "Error creating candidate: %s", err) - + defer cand.Resign() if myAddr, err := util.MyAddress(); err != nil { - require.NotZero(cs.T(), cs.candidate.ID(), "Expected Candidate ID to not be empty") + require.NotZero(cs.T(), cand.ID(), "Expected Candidate ID to not be empty") } else { - require.Equal(cs.T(), myAddr, cs.candidate.ID(), "Expected Candidate ID to be \"%s\", saw \"%s\"", myAddr, cs.candidate.ID()) + require.Equal(cs.T(), myAddr, cand.ID(), "Expected Candidate ID to be \"%s\", saw \"%s\"", myAddr, cand.ID()) } } @@ -100,3 +111,190 @@ func (cs *CandidateTestSuite) TestNew_InvalidID() { _, err = candidate.New(cs.configKeyed(&candidate.Config{ID: "Große"})) require.Equal(cs.T(), candidate.InvalidCandidateID, err, "Expected \"Große\" to return invalid ID error, saw %+v", err) } + +func (cs *CandidateTestSuite) TestRun_SimpleElectionCycle() { + var candidate1, candidate2, candidate3, leaderCandidate *candidate.Candidate + var leader *api.SessionEntry + var err error + + wg := new(sync.WaitGroup) + + wg.Add(3) + + go func() { + candidate1 = cs.makeCandidate(1, &candidate.Config{AutoRun: true}) + candidate1.Wait() + wg.Done() + }() + go func() { + candidate2 = cs.makeCandidate(2, &candidate.Config{AutoRun: true}) + candidate2.Wait() + wg.Done() + }() + go func() { + candidate3 = cs.makeCandidate(3, &candidate.Config{AutoRun: true}) + candidate3.Wait() + wg.Done() + }() + + wg.Wait() + + leader, err = candidate1.LeaderService() + require.Nil(cs.T(), err, fmt.Sprintf("Unable to locate leader session entry: %v", err)) + + // attempt to locate elected leader + switch leader.ID { + case candidate1.SessionID(): + leaderCandidate = candidate1 + case candidate2.SessionID(): + leaderCandidate = candidate2 + case candidate3.SessionID(): + leaderCandidate = candidate3 + } + + require.NotNil( + cs.T(), + leaderCandidate, + fmt.Sprintf( + "Expected one of \"%+v\", saw \"%s\"", + []string{candidate1.SessionID(), candidate2.SessionID(), candidate3.SessionID()}, + leader.ID)) + + leadersFound := 0 + for i, cand := range []*candidate.Candidate{candidate1, candidate2, candidate3} { + if leaderCandidate == cand { + leadersFound = 1 + continue + } + + require.True( + cs.T(), + 0 == leadersFound || 1 == leadersFound, + fmt.Sprintf("leaderCandidate matched to more than 1 candidate. Iteration \"%d\"", i)) + + require.False( + cs.T(), + cand.Elected(), + fmt.Sprintf("Candidate \"%d\" is not elected but says that it is...", i)) + } + + wg.Add(3) + + go func() { + candidate1.Resign() + wg.Done() + }() + go func() { + candidate2.Resign() + wg.Done() + }() + go func() { + candidate3.Resign() + wg.Done() + }() + + wg.Wait() + + leader, err = candidate1.LeaderService() + require.NotNil(cs.T(), err, "Expected empty key error, got nil") + require.Nil(cs.T(), leader, fmt.Sprintf("Expected nil leader, got %v", leader)) + + // election re-enter attempt + + wg.Add(3) + + go func() { + candidate1 = cs.makeCandidate(1, &candidate.Config{AutoRun: true}) + candidate1.Wait() + wg.Done() + }() + go func() { + candidate2 = cs.makeCandidate(2, &candidate.Config{AutoRun: true}) + candidate2.Wait() + wg.Done() + }() + go func() { + candidate3 = cs.makeCandidate(3, &candidate.Config{AutoRun: true}) + candidate3.Wait() + wg.Done() + }() + + wg.Wait() + + leader, err = candidate1.LeaderService() + require.Nil(cs.T(), err, fmt.Sprintf("Unable to locate re-entered leader session entry: %v", err)) + + // attempt to locate elected leader + switch leader.ID { + case candidate1.SessionID(): + leaderCandidate = candidate1 + case candidate2.SessionID(): + leaderCandidate = candidate2 + case candidate3.SessionID(): + leaderCandidate = candidate3 + default: + leaderCandidate = nil + } + + require.NotNil( + cs.T(), + leaderCandidate, + fmt.Sprintf( + "Expected one of \"%+v\", saw \"%s\"", + []string{candidate1.SessionID(), candidate2.SessionID(), candidate3.SessionID()}, + leader.ID)) + + leadersFound = 0 + for i, cand := range []*candidate.Candidate{candidate1, candidate2, candidate3} { + if leaderCandidate == cand { + leadersFound = 1 + continue + } + + require.True( + cs.T(), + 0 == leadersFound || 1 == leadersFound, + fmt.Sprintf("leaderCandidate matched to more than 1 candidate. Iteration \"%d\"", i)) + + require.False( + cs.T(), + cand.Elected(), + fmt.Sprintf("Candidate \"%d\" is not elected but says that it is...", i)) + } +} + +func (cs *CandidateTestSuite) TestRun_SessionAnarchy() { + cand := cs.makeCandidate(1, &candidate.Config{AutoRun: true}) + + updates := make([]candidate.ElectionUpdate, 0) + updatesMu := sync.Mutex{} + + cand.Watch("", func(update candidate.ElectionUpdate) { + updatesMu.Lock() + cs.T().Logf("Update received: %#v", update) + updates = append(updates, update) + updatesMu.Unlock() + }) + cand.Wait() + + sid := cand.SessionID() + require.NotEmpty(cs.T(), sid, "Expected sid to contain value") + + cs.client.Session().Destroy(sid, nil) + + require.Equal( + cs.T(), + candidate.StateRunning, + cand.State(), + "Expected candidate state to still be %d after session destroyed", + candidate.StateRunning) + + cand.Wait() + + require.NotEmpty(cs.T(), cand.SessionID(), "Expected new session id") + require.NotEqual(cs.T(), sid, cand.SessionID(), "Expected new session id") + + updatesMu.Lock() + require.Len(cs.T(), updates, 3, "Expected to see 3 updates") + updatesMu.Unlock() +} diff --git a/candidate/session/session.go b/candidate/session/session.go index 2c0f6fc..bc9e302 100644 --- a/candidate/session/session.go +++ b/candidate/session/session.go @@ -62,6 +62,11 @@ type ( // // Callback to be executed after session state change UpdateFunc UpdateFunc + + // AutoRun [optional] + // + // Whether the session should start immediately after successful construction + AutoRun bool } Session struct { @@ -80,7 +85,7 @@ type ( interval time.Duration lastRenewed time.Time - stop chan chan struct{} + stop chan chan error state State updateFunc UpdateFunc @@ -88,11 +93,14 @@ type ( ) func New(conf *Config) (*Session, error) { - var key, ttl, behavior string - var client *api.Client - var l log.DebugLogger - var err error - var updateFunc UpdateFunc + var ( + key, ttl, behavior string + client *api.Client + l log.DebugLogger + updateFunc UpdateFunc + autoRun bool + err error + ) if conf != nil { key = conf.Key @@ -101,6 +109,7 @@ func New(conf *Config) (*Session, error) { l = conf.Log client = conf.Client updateFunc = conf.UpdateFunc + autoRun = conf.AutoRun } if behavior == "" { @@ -150,8 +159,8 @@ func New(conf *Config) (*Session, error) { key: key, ttl: ttlTD, behavior: behavior, - interval: time.Duration(ttlTD.Nanoseconds() / 2), - stop: make(chan chan struct{}, 1), + interval: time.Duration(int64(ttlTD) / 2), + stop: make(chan chan error, 1), updateFunc: updateFunc, } @@ -162,6 +171,11 @@ func New(conf *Config) (*Session, error) { l.Debugf("Lock interval: %d seconds", int64(ttlTD.Seconds())) l.Debugf("Session renew interval: %d seconds", int64(cs.interval.Seconds())) + if autoRun { + l.Debug("AutoRun enabled") + cs.Run() + } + return cs, nil } @@ -212,10 +226,13 @@ func (cs *Session) Running() bool { func (cs *Session) Run() { cs.mu.Lock() if cs.state == StateRunning { + // if our state is already running, just continue to do so. + cs.log.Debug("Run() called but I'm already running") cs.mu.Unlock() return } + // modify state cs.state = StateRunning // try to create session immediately @@ -226,24 +243,27 @@ func (cs *Session) Run() { err) } + // release lock before beginning maintenance loop cs.mu.Unlock() go cs.maintain() } -func (cs *Session) Stop() { +func (cs *Session) Stop() error { cs.mu.Lock() if cs.state == StateStopped { + cs.log.Debug("Stop() called but I'm already stopped") cs.mu.Unlock() - return + return nil } cs.state = StateStopped cs.mu.Unlock() - stopped := make(chan struct{}, 1) + stopped := make(chan error, 1) cs.stop <- stopped - <-stopped + err := <-stopped close(stopped) + return err } func (cs *Session) State() State { @@ -253,15 +273,6 @@ func (cs *Session) State() State { return s } -// sendUpdate will attempt to notify whoever cares that the session has been created / refreshed / errored -// Caller MUST hold lock! -func (cs *Session) sendUpdate(up Update) { - if cs.updateFunc == nil { - return - } - cs.updateFunc(up) -} - // create will attempt to do just that. Caller MUST hold lock! func (cs *Session) create() error { var name string @@ -320,10 +331,26 @@ func (cs *Session) renew() error { return err } +// destroy will attempt to destroy the upstream session and removes internal references to it. +// caller MUST hold lock! +func (cs *Session) destroy() error { + sid := cs.id + cs.id = "" + cs.name = "" + cs.lastRenewed = time.Time{} + _, err := cs.client.Session().Destroy(sid, nil) + return err +} + +// TODO: improve updates to include the action taken this loop, and whether it is the last action to be taken this loop +// i.e., destroy / renew can happen in the same loop as create. func (cs *Session) maintain() { - var stopped chan struct{} - var up Update - var err error + var ( + sid, name string + stopped chan error + up Update + err error + ) intervalTicker := time.NewTicker(cs.interval) @@ -332,17 +359,46 @@ maintaining: select { case <-intervalTicker.C: cs.mu.Lock() - if !cs.lastRenewed.IsZero() && time.Now().Sub(cs.lastRenewed) > cs.ttl { - cs.id = "" - cs.name = "" - cs.log.Printf("Last renewed time (%s) is >= ttl (%s), creating new session...", cs.lastRenewed.Format(time.RFC822), cs.ttl) + if cs.id != "" { + // if we were previously able to create an upstream session... + sid, name = cs.id, cs.name + if !cs.lastRenewed.IsZero() && time.Now().Sub(cs.lastRenewed) > cs.ttl { + // if we have a session but the last time we were able to successfully renew it was beyond the TTL, + // attempt to destroy and allow re-creation down below + cs.log.Printf( + "Last renewed time (%s) is > ttl (%s), expiring upstream session %q (%q)...", + cs.lastRenewed.Format(time.RFC822), + cs.ttl, + cs.name, + cs.id, + ) + if err = cs.destroy(); err != nil { + cs.log.Debugf( + "Error destroying expired upstream session %q (%q). This can probably be ignored: %s", + name, + sid, + err, + ) + } + } else if err = cs.renew(); err != nil { + // if error during renewal + cs.log.Printf("Unable to renew Consul Session: %s", err) + // TODO: possibly attempt to destroy the session at this point? the above timeout test statement + // should eventually be hit if this continues to fail... + } else { + // session should be in a happy state. + cs.log.Debugf("Upstream session %q (%q) renewed", cs.name, cs.id) + } } + if cs.id == "" { + // if this is the first iteration of the loop or if an error occurred above, test and try to create + // a new session if err = cs.create(); err != nil { - cs.log.Printf("Unable to create Consul Session: %s", err) + cs.log.Printf("Unable to create upstream session: %s", err) + } else { + cs.log.Debugf("New upstream session %q (%q) created.", cs.name, cs.id) } - } else if err = cs.renew(); err != nil { - cs.log.Printf("Unable to renew Consul Session: %s", err) } // create update behind lock @@ -357,7 +413,8 @@ maintaining: cs.mu.Unlock() //send update after unlock - cs.sendUpdate(up) + // TODO: should this block? + go sendUpdate(cs.updateFunc, up) case stopped = <-cs.stop: break maintaining @@ -370,33 +427,41 @@ maintaining: cs.mu.Lock() - // store most recent session id - sid := cs.id + // localize most recent upstream session info + sid = cs.id + name = cs.name + lastRenewed := cs.lastRenewed + + if cs.id != "" { + // if we have a reference to an upstream session id, attempt to destroy it. + if derr := cs.destroy(); derr != nil { + msg := fmt.Sprintf("Error destroying upstream session %q (%q) during shutdown: %s", name, sid, derr) + log.Print(msg) + if err != nil { + // if there was an existing error, append this error to it to be sent along the Stop() resp chan + err = fmt.Errorf("%s; %s", err, msg) + } + } else { + log.Printf("Upstream session %q (%q) destroyed", name, sid) + } + } - // modify state - cs.id = "" - cs.name = "" + // set our state to stopped, preventing further interaction. cs.state = StateStopped // prepare final update - up = Update{LastRenewed: cs.lastRenewed, State: StateStopped} + up = Update{LastRenewed: lastRenewed, State: StateStopped} cs.mu.Unlock() - // if there was a session id, attempt to destroy it. - if sid != "" { - if _, err = cs.client.Session().Destroy(sid, nil); err != nil { - cs.log.Printf("Unable to destroy session \"%s\": %s", sid, err) - } - } - // just in case... if stopped != nil { - stopped <- struct{}{} + // send along the last seen error, whatever it was. + stopped <- err } - // send final update asynchronously, don't much care if they're around to receive it. - go cs.sendUpdate(up) + // send final update + go sendUpdate(cs.updateFunc, up) cs.log.Print("Session stopped") } diff --git a/candidate/session/session_test.go b/candidate/session/session_test.go index 1f7ca56..7a1b45e 100644 --- a/candidate/session/session_test.go +++ b/candidate/session/session_test.go @@ -161,3 +161,60 @@ func (ss *SessionTestSuite) TestSession_Run() { require.Nil(ss.T(), up.Error, "Expected Error to be nil, saw: %s", up.Error) } } + +func (ss *SessionTestSuite) TestSession_AutoRun() { + var err error + + ss.session, err = session.New(ss.config(&session.Config{TTL: testTTL, AutoRun: true})) + require.Nil(ss.T(), err, "Error constructing session: %s", err) + + require.True(ss.T(), ss.session.Running(), "AutoRun session not automatically started") +} + +func (ss *SessionTestSuite) TestSession_SessionKilled() { + var ( + initialID string + err error + ) + + upChan := make(chan session.Update, 1) + updateFunc := func(up session.Update) { + upChan <- up + } + + ss.session, err = session.New(ss.config(&session.Config{TTL: testTTL, UpdateFunc: updateFunc})) + require.Nil(ss.T(), err, "Error constructing session: %s", err) + + ss.session.Run() + +TestLoop: + for i := 0; ; i++ { + select { + case up := <-upChan: + if i == 0 { + if up.ID == "" { + ss.FailNowf("Expected to have session on first pass", "Session create failed: %#v", up) + break TestLoop + } + initialID = up.ID + // take a nap... + time.Sleep(time.Second) + if _, err := ss.client.Session().Destroy(up.ID, nil); err != nil { + ss.FailNowf("Failed to arbitrarily destroy session", "Error: %s", err) + break TestLoop + } + } else if i == 1 { + if up.ID == "" { + ss.FailNowf("Expected to have new session on 2nd pass", "Session create failed: %#v", up) + break TestLoop + } + if up.ID == initialID { + ss.FailNowf("Expected different upstream session id", "Initial: %q; New: %q", initialID, up.ID) + break TestLoop + } + // if we got a new id, great! + break TestLoop + } + } + } +} diff --git a/candidate/session/util.go b/candidate/session/util.go index c094d28..24c318a 100644 --- a/candidate/session/util.go +++ b/candidate/session/util.go @@ -32,3 +32,11 @@ func ParseName(name string) (*NameParts, error) { return nil, fmt.Errorf("expected 2 or 3 parts in session name \"%s\", saw only \"%d\"", name, len(split)) } } + +// sendUpdate will attempt to notify whoever cares that the session state has changed +func sendUpdate(fn UpdateFunc, up Update) { + if fn == nil { + return + } + fn(up) +} diff --git a/candidate_test.go b/candidate_test.go deleted file mode 100644 index ff6bcd3..0000000 --- a/candidate_test.go +++ /dev/null @@ -1,250 +0,0 @@ -package consultant_test - -import ( - "fmt" - "github.com/hashicorp/consul/api" - cst "github.com/hashicorp/consul/testutil" - "github.com/myENA/consultant" - "github.com/myENA/consultant/candidate" - "github.com/myENA/consultant/testutil" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "sync" - "testing" -) - -// DEPRECATED: this test suite is to ensure backwards compatibility until we remove it. - -const ( - candidateLockKey = "consultant/tests/candidate-lock" - candidateLockTTL = "5s" -) - -type CandidateTestSuite struct { - suite.Suite - - // these values are cyclical, and should be re-defined per test method - server *cst.TestServer - client *consultant.Client -} - -func TestCandidate(t *testing.T) { - suite.Run(t, &CandidateTestSuite{}) -} - -// SetupTest is called before each method is run. -func (cs *CandidateTestSuite) SetupTest() { - cs.server, cs.client = testutil.MakeServerAndClient(cs.T(), nil) -} - -// TearDownTest is called after each method has been run. -func (cs *CandidateTestSuite) TearDownTest() { - if cs.client != nil { - cs.client = nil - } - if cs.server != nil { - // TODO: Stop seems to return an error when the process is killed... - cs.server.Stop() - cs.server = nil - } -} - -func (cs *CandidateTestSuite) TearDownSuite() { - cs.TearDownTest() -} - -func (cs *CandidateTestSuite) makeCandidate(num int) *consultant.Candidate { - candidate, err := consultant.NewCandidate(cs.client, fmt.Sprintf("test-%d", num), candidateLockKey, candidateLockTTL) - if err != nil { - cs.T().Fatalf("err: %v", err) - } - - return candidate -} - -func (cs *CandidateTestSuite) TestSimpleElectionCycle() { - var candidate1, candidate2, candidate3, leaderCandidate *consultant.Candidate - var leader *api.SessionEntry - var err error - - wg := new(sync.WaitGroup) - - wg.Add(3) - - go func() { - candidate1 = cs.makeCandidate(1) - candidate1.Wait() - wg.Done() - }() - go func() { - candidate2 = cs.makeCandidate(2) - candidate2.Wait() - wg.Done() - }() - go func() { - candidate3 = cs.makeCandidate(3) - candidate3.Wait() - wg.Done() - }() - - wg.Wait() - - leader, err = candidate1.LeaderService() - require.Nil(cs.T(), err, fmt.Sprintf("Unable to locate leader session entry: %v", err)) - - // attempt to locate elected leader - switch leader.ID { - case candidate1.SessionID(): - leaderCandidate = candidate1 - case candidate2.SessionID(): - leaderCandidate = candidate2 - case candidate3.SessionID(): - leaderCandidate = candidate3 - } - - require.NotNil( - cs.T(), - leaderCandidate, - fmt.Sprintf( - "Expected one of \"%+v\", saw \"%s\"", - []string{candidate1.SessionID(), candidate2.SessionID(), candidate3.SessionID()}, - leader.ID)) - - leadersFound := 0 - for i, candidate := range []*consultant.Candidate{candidate1, candidate2, candidate3} { - if leaderCandidate == candidate { - leadersFound = 1 - continue - } - - require.True( - cs.T(), - 0 == leadersFound || 1 == leadersFound, - fmt.Sprintf("leaderCandidate matched to more than 1 candidate. Iteration \"%d\"", i)) - - require.False( - cs.T(), - candidate.Elected(), - fmt.Sprintf("Candidate \"%d\" is not elected but says that it is...", i)) - } - - wg.Add(3) - - go func() { - candidate1.Resign() - wg.Done() - }() - go func() { - candidate2.Resign() - wg.Done() - }() - go func() { - candidate3.Resign() - wg.Done() - }() - - wg.Wait() - - leader, err = candidate1.LeaderService() - require.NotNil(cs.T(), err, "Expected empty key error, got nil") - require.Nil(cs.T(), leader, fmt.Sprintf("Expected nil leader, got %v", leader)) - - // election re-enter attempt - - wg.Add(3) - - go func() { - candidate1 = cs.makeCandidate(1) - candidate1.Wait() - wg.Done() - }() - go func() { - candidate2 = cs.makeCandidate(2) - candidate2.Wait() - wg.Done() - }() - go func() { - candidate3 = cs.makeCandidate(3) - candidate3.Wait() - wg.Done() - }() - - wg.Wait() - - leader, err = candidate1.LeaderService() - require.Nil(cs.T(), err, fmt.Sprintf("Unable to locate re-entered leader session entry: %v", err)) - - // attempt to locate elected leader - switch leader.ID { - case candidate1.SessionID(): - leaderCandidate = candidate1 - case candidate2.SessionID(): - leaderCandidate = candidate2 - case candidate3.SessionID(): - leaderCandidate = candidate3 - default: - leaderCandidate = nil - } - - require.NotNil( - cs.T(), - leaderCandidate, - fmt.Sprintf( - "Expected one of \"%+v\", saw \"%s\"", - []string{candidate1.SessionID(), candidate2.SessionID(), candidate3.SessionID()}, - leader.ID)) - - leadersFound = 0 - for i, cand := range []*consultant.Candidate{candidate1, candidate2, candidate3} { - if leaderCandidate == cand { - leadersFound = 1 - continue - } - - require.True( - cs.T(), - 0 == leadersFound || 1 == leadersFound, - fmt.Sprintf("leaderCandidate matched to more than 1 candidate. Iteration \"%d\"", i)) - - require.False( - cs.T(), - cand.Elected(), - fmt.Sprintf("Candidate \"%d\" is not elected but says that it is...", i)) - } -} - -func (cs *CandidateTestSuite) TestSessionAnarchy() { - cand := cs.makeCandidate(1) - - updates := make([]candidate.ElectionUpdate, 0) - updatesMu := sync.Mutex{} - - cand.Watch("", func(update candidate.ElectionUpdate) { - updatesMu.Lock() - cs.T().Logf("Update received: %#v", update) - updates = append(updates, update) - updatesMu.Unlock() - }) - cand.Wait() - - sid := cand.SessionID() - require.NotEmpty(cs.T(), sid, "Expected sid to contain value") - - cs.client.Session().Destroy(sid, nil) - - require.Equal( - cs.T(), - candidate.StateRunning, - cand.State(), - "Expected candidate state to still be %d after session destroyed", - candidate.StateRunning) - - cand.Wait() - - require.NotEmpty(cs.T(), cand.SessionID(), "Expected new session id") - require.NotEqual(cs.T(), sid, cand.SessionID(), "Expected new session id") - - updatesMu.Lock() - require.Len(cs.T(), updates, 3, "Expected to see 3 updates") - updatesMu.Unlock() -}