From af34ec81d8ac2f669f27bdd3dbeda739086ff304 Mon Sep 17 00:00:00 2001 From: Daniel Paul Carbone Date: Fri, 21 Sep 2018 15:12:59 -0500 Subject: [PATCH] Adding "AutoRun" to Candidate, and doing some cleanup on and moving Candidate tests into candidate package --- candidate.go | 4 +- candidate/candidate.go | 24 +++- candidate/candidate_test.go | 240 ++++++++++++++++++++++++++++++--- candidate/session/session.go | 2 +- candidate_test.go | 250 ----------------------------------- 5 files changed, 242 insertions(+), 278 deletions(-) delete mode 100644 candidate_test.go diff --git a/candidate.go b/candidate.go index 88d509b..6c5fe84 100644 --- a/candidate.go +++ b/candidate.go @@ -39,13 +39,11 @@ func NewCandidate(client *Client, candidateID, key, ttl string) (*Candidate, err ID: candidateID, SessionTTL: ttl, Client: client.Client, + AutoRun: true, }) if err != nil { return nil, err } - - c.Run() - return &Candidate{Candidate: c}, nil } diff --git a/candidate/candidate.go b/candidate/candidate.go index f844050..addaccc 100644 --- a/candidate/candidate.go +++ b/candidate/candidate.go @@ -74,6 +74,11 @@ type ( // // Consul API client. If not specified, one will be created using api.DefaultConfig() Client *api.Client + + // AutoRun [optional] + // + // If set to true, the Candidate will immediately enter its election pool after successful construction + AutoRun bool } Candidate struct { @@ -99,15 +104,19 @@ type ( ) func New(conf *Config) (*Candidate, error) { - var id, kvKey, sessionTTL string - var client *api.Client - var err error + var ( + id, kvKey, sessionTTL string + client *api.Client + autoRun bool + err error + ) if conf != nil { kvKey = conf.KVKey id = conf.ID sessionTTL = conf.SessionTTL client = conf.Client + autoRun = conf.AutoRun } if kvKey == "" { @@ -153,6 +162,11 @@ func New(conf *Config) (*Candidate, error) { return nil, err } + if autoRun { + c.log.Debug("AutoRun enabled") + c.Run() + } + return c, nil } @@ -300,6 +314,10 @@ func (c *Candidate) UpdateWatchers() { func (c *Candidate) WaitFor(td time.Duration) error { var err error + if !c.Running() { + return fmt.Errorf("candidate %s is not in running", c.ID()) + } + timer := time.NewTimer(td) waitLoop: diff --git a/candidate/candidate_test.go b/candidate/candidate_test.go index e16a14a..3ec2101 100644 --- a/candidate/candidate_test.go +++ b/candidate/candidate_test.go @@ -1,6 +1,7 @@ package candidate_test import ( + "fmt" "github.com/hashicorp/consul/api" cst "github.com/hashicorp/consul/testutil" "github.com/myENA/consultant" @@ -9,12 +10,14 @@ import ( "github.com/myENA/consultant/util" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "sync" "testing" ) const ( testKVKey = "consultant/test/candidate-test" testID = "test-candidate" + lockTTL = "5s" ) func init() { @@ -26,37 +29,30 @@ type CandidateTestSuite struct { server *cst.TestServer client *api.Client - - candidate *candidate.Candidate } func TestCandidate(t *testing.T) { - suite.Run(t, &CandidateTestSuite{}) + suite.Run(t, new(CandidateTestSuite)) } -func (cs *CandidateTestSuite) SetupSuite() { +func (cs *CandidateTestSuite) SetupTest() { server, client := testutil.MakeServerAndClient(cs.T(), nil) cs.server = server cs.client = client.Client } -func (cs *CandidateTestSuite) TearDownSuite() { - if cs.candidate != nil { - cs.candidate.Resign() +func (cs *CandidateTestSuite) TearDownTest() { + if cs.client != nil { + cs.client = nil } - cs.candidate = nil if cs.server != nil { cs.server.Stop() + cs.server = nil } - cs.server = nil - cs.client = nil } -func (cs *CandidateTestSuite) TearDownTest() { - if cs.candidate != nil { - cs.candidate.Resign() - } - cs.candidate = nil +func (cs *CandidateTestSuite) TearDownSuite() { + cs.TearDownTest() } func (cs *CandidateTestSuite) config(conf *candidate.Config) *candidate.Config { @@ -73,21 +69,36 @@ func (cs *CandidateTestSuite) configKeyed(conf *candidate.Config) *candidate.Con return conf } +func (cs *CandidateTestSuite) makeCandidate(num int, conf *candidate.Config) *candidate.Candidate { + lc := new(candidate.Config) + if conf != nil { + *lc = *conf + } + lc.ID = fmt.Sprintf("test-%d", num) + if lc.SessionTTL == "" { + lc.SessionTTL = lockTTL + } + cand, err := candidate.New(cs.configKeyed(lc)) + if err != nil { + cs.T().Fatalf("err: %v", err) + } + + return cand +} + func (cs *CandidateTestSuite) TestNew_EmptyKey() { _, err := candidate.New(cs.config(nil)) require.NotNil(cs.T(), err, "Expected Empty Key error") } func (cs *CandidateTestSuite) TestNew_EmptyID() { - var err error - - cs.candidate, err = candidate.New(cs.configKeyed(nil)) + cand, err := candidate.New(cs.configKeyed(nil)) require.Nil(cs.T(), err, "Error creating candidate: %s", err) - + defer cand.Resign() if myAddr, err := util.MyAddress(); err != nil { - require.NotZero(cs.T(), cs.candidate.ID(), "Expected Candidate ID to not be empty") + require.NotZero(cs.T(), cand.ID(), "Expected Candidate ID to not be empty") } else { - require.Equal(cs.T(), myAddr, cs.candidate.ID(), "Expected Candidate ID to be \"%s\", saw \"%s\"", myAddr, cs.candidate.ID()) + require.Equal(cs.T(), myAddr, cand.ID(), "Expected Candidate ID to be \"%s\", saw \"%s\"", myAddr, cand.ID()) } } @@ -100,3 +111,190 @@ func (cs *CandidateTestSuite) TestNew_InvalidID() { _, err = candidate.New(cs.configKeyed(&candidate.Config{ID: "Große"})) require.Equal(cs.T(), candidate.InvalidCandidateID, err, "Expected \"Große\" to return invalid ID error, saw %+v", err) } + +func (cs *CandidateTestSuite) TestRun_SimpleElectionCycle() { + var candidate1, candidate2, candidate3, leaderCandidate *candidate.Candidate + var leader *api.SessionEntry + var err error + + wg := new(sync.WaitGroup) + + wg.Add(3) + + go func() { + candidate1 = cs.makeCandidate(1, &candidate.Config{AutoRun: true}) + candidate1.Wait() + wg.Done() + }() + go func() { + candidate2 = cs.makeCandidate(2, &candidate.Config{AutoRun: true}) + candidate2.Wait() + wg.Done() + }() + go func() { + candidate3 = cs.makeCandidate(3, &candidate.Config{AutoRun: true}) + candidate3.Wait() + wg.Done() + }() + + wg.Wait() + + leader, err = candidate1.LeaderService() + require.Nil(cs.T(), err, fmt.Sprintf("Unable to locate leader session entry: %v", err)) + + // attempt to locate elected leader + switch leader.ID { + case candidate1.SessionID(): + leaderCandidate = candidate1 + case candidate2.SessionID(): + leaderCandidate = candidate2 + case candidate3.SessionID(): + leaderCandidate = candidate3 + } + + require.NotNil( + cs.T(), + leaderCandidate, + fmt.Sprintf( + "Expected one of \"%+v\", saw \"%s\"", + []string{candidate1.SessionID(), candidate2.SessionID(), candidate3.SessionID()}, + leader.ID)) + + leadersFound := 0 + for i, cand := range []*candidate.Candidate{candidate1, candidate2, candidate3} { + if leaderCandidate == cand { + leadersFound = 1 + continue + } + + require.True( + cs.T(), + 0 == leadersFound || 1 == leadersFound, + fmt.Sprintf("leaderCandidate matched to more than 1 candidate. Iteration \"%d\"", i)) + + require.False( + cs.T(), + cand.Elected(), + fmt.Sprintf("Candidate \"%d\" is not elected but says that it is...", i)) + } + + wg.Add(3) + + go func() { + candidate1.Resign() + wg.Done() + }() + go func() { + candidate2.Resign() + wg.Done() + }() + go func() { + candidate3.Resign() + wg.Done() + }() + + wg.Wait() + + leader, err = candidate1.LeaderService() + require.NotNil(cs.T(), err, "Expected empty key error, got nil") + require.Nil(cs.T(), leader, fmt.Sprintf("Expected nil leader, got %v", leader)) + + // election re-enter attempt + + wg.Add(3) + + go func() { + candidate1 = cs.makeCandidate(1, &candidate.Config{AutoRun: true}) + candidate1.Wait() + wg.Done() + }() + go func() { + candidate2 = cs.makeCandidate(2, &candidate.Config{AutoRun: true}) + candidate2.Wait() + wg.Done() + }() + go func() { + candidate3 = cs.makeCandidate(3, &candidate.Config{AutoRun: true}) + candidate3.Wait() + wg.Done() + }() + + wg.Wait() + + leader, err = candidate1.LeaderService() + require.Nil(cs.T(), err, fmt.Sprintf("Unable to locate re-entered leader session entry: %v", err)) + + // attempt to locate elected leader + switch leader.ID { + case candidate1.SessionID(): + leaderCandidate = candidate1 + case candidate2.SessionID(): + leaderCandidate = candidate2 + case candidate3.SessionID(): + leaderCandidate = candidate3 + default: + leaderCandidate = nil + } + + require.NotNil( + cs.T(), + leaderCandidate, + fmt.Sprintf( + "Expected one of \"%+v\", saw \"%s\"", + []string{candidate1.SessionID(), candidate2.SessionID(), candidate3.SessionID()}, + leader.ID)) + + leadersFound = 0 + for i, cand := range []*candidate.Candidate{candidate1, candidate2, candidate3} { + if leaderCandidate == cand { + leadersFound = 1 + continue + } + + require.True( + cs.T(), + 0 == leadersFound || 1 == leadersFound, + fmt.Sprintf("leaderCandidate matched to more than 1 candidate. Iteration \"%d\"", i)) + + require.False( + cs.T(), + cand.Elected(), + fmt.Sprintf("Candidate \"%d\" is not elected but says that it is...", i)) + } +} + +func (cs *CandidateTestSuite) TestRun_SessionAnarchy() { + cand := cs.makeCandidate(1, &candidate.Config{AutoRun: true}) + + updates := make([]candidate.ElectionUpdate, 0) + updatesMu := sync.Mutex{} + + cand.Watch("", func(update candidate.ElectionUpdate) { + updatesMu.Lock() + cs.T().Logf("Update received: %#v", update) + updates = append(updates, update) + updatesMu.Unlock() + }) + cand.Wait() + + sid := cand.SessionID() + require.NotEmpty(cs.T(), sid, "Expected sid to contain value") + + cs.client.Session().Destroy(sid, nil) + + require.Equal( + cs.T(), + candidate.StateRunning, + cand.State(), + "Expected candidate state to still be %d after session destroyed", + candidate.StateRunning) + + cand.Wait() + + require.NotEmpty(cs.T(), cand.SessionID(), "Expected new session id") + require.NotEqual(cs.T(), sid, cand.SessionID(), "Expected new session id") + + updatesMu.Lock() + require.Len(cs.T(), updates, 3, "Expected to see 3 updates") + updatesMu.Unlock() +} diff --git a/candidate/session/session.go b/candidate/session/session.go index c068b56..bc9e302 100644 --- a/candidate/session/session.go +++ b/candidate/session/session.go @@ -65,7 +65,7 @@ type ( // AutoRun [optional] // - // Whether the session should start immediately on successful creation + // Whether the session should start immediately after successful construction AutoRun bool } diff --git a/candidate_test.go b/candidate_test.go deleted file mode 100644 index ff6bcd3..0000000 --- a/candidate_test.go +++ /dev/null @@ -1,250 +0,0 @@ -package consultant_test - -import ( - "fmt" - "github.com/hashicorp/consul/api" - cst "github.com/hashicorp/consul/testutil" - "github.com/myENA/consultant" - "github.com/myENA/consultant/candidate" - "github.com/myENA/consultant/testutil" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "sync" - "testing" -) - -// DEPRECATED: this test suite is to ensure backwards compatibility until we remove it. - -const ( - candidateLockKey = "consultant/tests/candidate-lock" - candidateLockTTL = "5s" -) - -type CandidateTestSuite struct { - suite.Suite - - // these values are cyclical, and should be re-defined per test method - server *cst.TestServer - client *consultant.Client -} - -func TestCandidate(t *testing.T) { - suite.Run(t, &CandidateTestSuite{}) -} - -// SetupTest is called before each method is run. -func (cs *CandidateTestSuite) SetupTest() { - cs.server, cs.client = testutil.MakeServerAndClient(cs.T(), nil) -} - -// TearDownTest is called after each method has been run. -func (cs *CandidateTestSuite) TearDownTest() { - if cs.client != nil { - cs.client = nil - } - if cs.server != nil { - // TODO: Stop seems to return an error when the process is killed... - cs.server.Stop() - cs.server = nil - } -} - -func (cs *CandidateTestSuite) TearDownSuite() { - cs.TearDownTest() -} - -func (cs *CandidateTestSuite) makeCandidate(num int) *consultant.Candidate { - candidate, err := consultant.NewCandidate(cs.client, fmt.Sprintf("test-%d", num), candidateLockKey, candidateLockTTL) - if err != nil { - cs.T().Fatalf("err: %v", err) - } - - return candidate -} - -func (cs *CandidateTestSuite) TestSimpleElectionCycle() { - var candidate1, candidate2, candidate3, leaderCandidate *consultant.Candidate - var leader *api.SessionEntry - var err error - - wg := new(sync.WaitGroup) - - wg.Add(3) - - go func() { - candidate1 = cs.makeCandidate(1) - candidate1.Wait() - wg.Done() - }() - go func() { - candidate2 = cs.makeCandidate(2) - candidate2.Wait() - wg.Done() - }() - go func() { - candidate3 = cs.makeCandidate(3) - candidate3.Wait() - wg.Done() - }() - - wg.Wait() - - leader, err = candidate1.LeaderService() - require.Nil(cs.T(), err, fmt.Sprintf("Unable to locate leader session entry: %v", err)) - - // attempt to locate elected leader - switch leader.ID { - case candidate1.SessionID(): - leaderCandidate = candidate1 - case candidate2.SessionID(): - leaderCandidate = candidate2 - case candidate3.SessionID(): - leaderCandidate = candidate3 - } - - require.NotNil( - cs.T(), - leaderCandidate, - fmt.Sprintf( - "Expected one of \"%+v\", saw \"%s\"", - []string{candidate1.SessionID(), candidate2.SessionID(), candidate3.SessionID()}, - leader.ID)) - - leadersFound := 0 - for i, candidate := range []*consultant.Candidate{candidate1, candidate2, candidate3} { - if leaderCandidate == candidate { - leadersFound = 1 - continue - } - - require.True( - cs.T(), - 0 == leadersFound || 1 == leadersFound, - fmt.Sprintf("leaderCandidate matched to more than 1 candidate. Iteration \"%d\"", i)) - - require.False( - cs.T(), - candidate.Elected(), - fmt.Sprintf("Candidate \"%d\" is not elected but says that it is...", i)) - } - - wg.Add(3) - - go func() { - candidate1.Resign() - wg.Done() - }() - go func() { - candidate2.Resign() - wg.Done() - }() - go func() { - candidate3.Resign() - wg.Done() - }() - - wg.Wait() - - leader, err = candidate1.LeaderService() - require.NotNil(cs.T(), err, "Expected empty key error, got nil") - require.Nil(cs.T(), leader, fmt.Sprintf("Expected nil leader, got %v", leader)) - - // election re-enter attempt - - wg.Add(3) - - go func() { - candidate1 = cs.makeCandidate(1) - candidate1.Wait() - wg.Done() - }() - go func() { - candidate2 = cs.makeCandidate(2) - candidate2.Wait() - wg.Done() - }() - go func() { - candidate3 = cs.makeCandidate(3) - candidate3.Wait() - wg.Done() - }() - - wg.Wait() - - leader, err = candidate1.LeaderService() - require.Nil(cs.T(), err, fmt.Sprintf("Unable to locate re-entered leader session entry: %v", err)) - - // attempt to locate elected leader - switch leader.ID { - case candidate1.SessionID(): - leaderCandidate = candidate1 - case candidate2.SessionID(): - leaderCandidate = candidate2 - case candidate3.SessionID(): - leaderCandidate = candidate3 - default: - leaderCandidate = nil - } - - require.NotNil( - cs.T(), - leaderCandidate, - fmt.Sprintf( - "Expected one of \"%+v\", saw \"%s\"", - []string{candidate1.SessionID(), candidate2.SessionID(), candidate3.SessionID()}, - leader.ID)) - - leadersFound = 0 - for i, cand := range []*consultant.Candidate{candidate1, candidate2, candidate3} { - if leaderCandidate == cand { - leadersFound = 1 - continue - } - - require.True( - cs.T(), - 0 == leadersFound || 1 == leadersFound, - fmt.Sprintf("leaderCandidate matched to more than 1 candidate. Iteration \"%d\"", i)) - - require.False( - cs.T(), - cand.Elected(), - fmt.Sprintf("Candidate \"%d\" is not elected but says that it is...", i)) - } -} - -func (cs *CandidateTestSuite) TestSessionAnarchy() { - cand := cs.makeCandidate(1) - - updates := make([]candidate.ElectionUpdate, 0) - updatesMu := sync.Mutex{} - - cand.Watch("", func(update candidate.ElectionUpdate) { - updatesMu.Lock() - cs.T().Logf("Update received: %#v", update) - updates = append(updates, update) - updatesMu.Unlock() - }) - cand.Wait() - - sid := cand.SessionID() - require.NotEmpty(cs.T(), sid, "Expected sid to contain value") - - cs.client.Session().Destroy(sid, nil) - - require.Equal( - cs.T(), - candidate.StateRunning, - cand.State(), - "Expected candidate state to still be %d after session destroyed", - candidate.StateRunning) - - cand.Wait() - - require.NotEmpty(cs.T(), cand.SessionID(), "Expected new session id") - require.NotEqual(cs.T(), sid, cand.SessionID(), "Expected new session id") - - updatesMu.Lock() - require.Len(cs.T(), updates, 3, "Expected to see 3 updates") - updatesMu.Unlock() -}