Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Session and Candidate interaction cleanup #14

Merged
merged 1 commit into from
Sep 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions candidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,11 @@ func NewCandidate(client *Client, candidateID, key, ttl string) (*Candidate, err
ID: candidateID,
SessionTTL: ttl,
Client: client.Client,
AutoRun: true,
})
if err != nil {
return nil, err
}

c.Run()

return &Candidate{Candidate: c}, nil
}

Expand Down
239 changes: 129 additions & 110 deletions candidate/candidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ type (
//
// Consul API client. If not specified, one will be created using api.DefaultConfig()
Client *api.Client

// AutoRun [optional]
//
// If set to true, the Candidate will immediately enter its election pool after successful construction
AutoRun bool
}

Candidate struct {
Expand All @@ -99,15 +104,19 @@ type (
)

func New(conf *Config) (*Candidate, error) {
var id, kvKey, sessionTTL string
var client *api.Client
var err error
var (
id, kvKey, sessionTTL string
client *api.Client
autoRun bool
err error
)

if conf != nil {
kvKey = conf.KVKey
id = conf.ID
sessionTTL = conf.SessionTTL
client = conf.Client
autoRun = conf.AutoRun
}

if kvKey == "" {
Expand Down Expand Up @@ -148,10 +157,16 @@ func New(conf *Config) (*Candidate, error) {
sessionTTL: sessionTTL,
}

// attempt to create persistent session manager...
if c.session, err = c.createSession(); err != nil {
return nil, err
}

if autoRun {
c.log.Debug("AutoRun enabled")
c.Run()
}

return c, nil
}

Expand Down Expand Up @@ -299,6 +314,10 @@ func (c *Candidate) UpdateWatchers() {
func (c *Candidate) WaitFor(td time.Duration) error {
var err error

if !c.Running() {
return fmt.Errorf("candidate %s is not in running", c.ID())
}

timer := time.NewTimer(td)

waitLoop:
Expand Down Expand Up @@ -335,8 +354,8 @@ func (c *Candidate) WaitUntil(t time.Time) error {
}

// Wait will block until a leader has been elected, regardless of candidate.
func (c *Candidate) Wait() {
c.WaitFor(1<<63 - 1)
func (c *Candidate) Wait() error {
return c.WaitFor(1<<63 - 1)
}

func (c *Candidate) State() State {
Expand All @@ -350,22 +369,28 @@ func (c *Candidate) Running() bool {
return c.State() == StateRunning
}

// sessionUpdate is the receiver for the session update callback
func (c *Candidate) sessionUpdate(update session.Update) {
if c.session != nil && c.session.ID() == update.ID {
if c.Running() {
c.sessionUpdateChan <- update
} else {
c.log.Printf("We are no longer in the running, cannot process session update: %#v", update)
c.mu.Lock()
if c.session.ID() == update.ID {
c.mu.Unlock()
select {
case c.sessionUpdateChan <- update:
default:
c.log.Printf("Unable to push session update onto channel. Update: %#v")
}
} else {
c.log.Printf("Received update from rogue session: %s", update.ID)
c.mu.Unlock()
c.log.Printf("Received update from session %q but our local session is %q...", update.ID, c.session.ID())
}
}

// acquire will attempt to do just that. Caller must hold lock!
func (c *Candidate) acquire(sid string) (bool, error) {
var err error
var elected bool
var (
elected bool
err error
)

kvpValue := &LeaderKVValue{
LeaderAddress: c.id,
Expand Down Expand Up @@ -398,14 +423,15 @@ func (c *Candidate) createSession() (*session.Session, error) {
}

func (c *Candidate) lockRunner() {
var sid string
var elected bool
var resigned chan struct{}
var updated bool
var sessionUpdate session.Update
var err error

sessionErrorsSeen, sessionStoppedUpdatesSeen := 0, 0
// TODO: this could stand for some further cleanup...
var (
sid string
elected, updated bool
resigned chan struct{}
sessionUpdate session.Update
consecutiveSessionErrorCount int
err error
)

// run initial session
c.session.Run()
Expand All @@ -425,100 +451,103 @@ acquisition:
select {
case <-acquireTicker.C:
c.mu.Lock()

if c.session == nil {
// it is possible for the session to be nil if the sessionUpdate case is unable to recreate upon error
// threshold being met

// set elected state to false, will be updated later.
if c.session.Running() {
// if our session manager is still running
if sid = c.session.ID(); sid == "" {
// this should only ever happen very early on in the election process
elected = false
updated = c.elected != nil && *c.elected != elected
c.log.Debugf("Acquire tick: Session does not exist, will try locking again in %d seconds...", int64(interval.Seconds()))
} else if elected, err = c.acquire(sid); err != nil {
// most likely hit due to transport error.
updated = c.elected != nil && *c.elected != elected
c.log.Printf("Acquire tick: Error attempting to acquire lock: %s", err)
} else {
// if c.elected is nil, indicating this is the initial election loop, or if the election state
// changed mark update as true
updated = c.elected == nil || *c.elected != elected
}
} else {
// if we are below the threshold, just try to restart existing session
c.log.Printf("Acquire tick: Session is in stopped state, attempting to restart...")
elected = false

// only send an update if a previous election attempt was successful and our state changed
updated = c.elected != nil && *c.elected != elected
c.session.Run()
}

c.log.Print("Acquire tick: No session, will try to create...")
if c.session, err = c.createSession(); err != nil {
c.log.Printf("Acquire tick: Error creating session, will try again in %d seconds. Err: %s", int64(interval.Seconds()), err)
// if updated
if updated {
if elected {
c.log.Debug("We have won the election")
} else {
c.session.Run()
c.log.Printf("Acquire tick: Session created successfully")
c.log.Debug("We have lost the election")
}
} else if sid = c.session.ID(); sid == "" {
// this should only ever happen very early on in the election process
elected = false
updated = c.elected != nil && *c.elected != elected
c.log.Debugf("Acquire tick: Session does not exist, will try locking again in %d seconds...", int64(interval.Seconds()))
} else if elected, err = c.acquire(sid); err != nil {
// most likely hit due to transport error.
elected = false
updated = c.elected != nil && *c.elected != elected
c.log.Printf("Acquire tick: Error attempting to acquire lock: %s", err)

// update internal state
*c.elected = elected

// send notifications
up.Elected = elected
c.mu.Unlock()

c.watchers.notify(*up)
} else {
updated = c.elected == nil || *c.elected != elected
c.mu.Unlock()
}

c.mu.Unlock()

case sessionUpdate = <-c.sessionUpdateChan:
c.mu.Lock()

// if there was an update either creating or renewing our session
if sessionUpdate.Error != nil {
sessionErrorsSeen++
sessionStoppedUpdatesSeen = 0

c.log.Printf("Session Update: Error (%d in a row): %s", sessionErrorsSeen, sessionUpdate.Error)

// if we breach this threshold, stop our current session and attempt to make a new one next pass
if sessionErrorsSeen >= 2 {
c.log.Print("Session Update: 2 successive errors seen, will construct new session")
c.session.Stop()
// if there was an update either creating or renewing our session
consecutiveSessionErrorCount++
c.log.Printf("Session Update: Error (%d in a row): %s", consecutiveSessionErrorCount, sessionUpdate.Error)
if sessionUpdate.State == session.StateRunning && consecutiveSessionErrorCount > 2 {
// if the session is still running but we've seen more than 2 errors, attempt a stop -> start cycle
c.log.Print("Session Update: 2 successive errors seen, stopping session")
if err = c.session.Stop(); err != nil {
c.log.Printf("Session update: Error stopping session: %s", err)
}
elected = false
c.session = nil
updated = c.elected != nil && *c.elected != elected
}
// do not modify elected state here unless we've breached the threshold. could just be a temporary
// issue
} else if sessionUpdate.State == session.StateStopped {
sessionStoppedUpdatesSeen++
sessionErrorsSeen = 0

// if somehow the session state became stopped (this should basically never happen...), do not attempt
// to kickstart session here. test if we need to update candidate state and notify watchers, then move
// on. next acquire tick will attempt to restart session.
consecutiveSessionErrorCount = 0
elected = false

c.log.Printf("Session Update: Stopped state seen (%d in row): %#v", sessionStoppedUpdatesSeen, sessionUpdate)

if sessionStoppedUpdatesSeen >= 2 {
c.log.Print("Session Update: Stopped state seen 2 successive times, will construct new session")
if c.session, err = c.createSession(); err != nil {
c.log.Printf("Unable to recreate will try again in %d seconds. Err: %s.", int64(interval.Seconds()), err)
} else {
c.log.Print("Session created successfully")
c.session.Run()
}
}
updated = c.elected != nil && *c.elected != elected
c.log.Printf("Session Update: Stopped state seen: %#v", sessionUpdate)
} else {
// if we got a non-error / non-stopped update, there is nothing to do.
consecutiveSessionErrorCount = 0
c.log.Debugf("Session Update: Received %#v", sessionUpdate)
}

updated = c.elected != nil && *c.elected != elected

c.mu.Unlock()

case resigned = <-c.resign:
break acquisition
}

// if updated
if updated {
if elected {
c.log.Debug("We have won the election")
// if updated
if updated {
// this should only ever hit if we breach the error threshold or our session stopped running
if elected {
c.log.Debug("We have won the election")
} else {
c.log.Debug("We have lost the election")
}
// update internal state
*c.elected = elected
// modify update payload
up.Elected = elected
c.mu.Unlock()
// send notifications after unlocking
c.watchers.notify(*up)
} else {
c.log.Debug("We have lost the election")
c.mu.Unlock()
}

// update internal state
*c.elected = elected

// send notifications
up.Elected = elected
c.watchers.notify(*up)
case resigned = <-c.resign:
break acquisition
}
}

Expand All @@ -528,40 +557,30 @@ acquisition:

c.mu.Lock()

// modify state
// modify internal state
*c.elected = false
c.state = StateResigned

// send notifications
up.Elected = false
up.State = StateResigned
c.watchers.notify(*up)

done := make(chan struct{})

// test for session nil, if so spin up new routine that waits for session to be destroyed.
// TODO: do something more clever with the session's update system instead of this chan?
if c.session != nil {
go func() {
// stop session, this might block for a bit
c.session.Stop()
done <- struct{}{}
}()
} else {
done <- struct{}{}
if err = c.session.Stop(); err != nil {
c.log.Printf("Error stopping session: %s", err)
}
}

// release lock before the final steps so the object is usable
c.mu.Unlock()

// if need be, wait for session to term
<-done
close(done)

// notify our caller that we've finished with resignation
// just in case....
if resigned != nil {
// notify caller that we've stopped
resigned <- struct{}{}
}

c.watchers.notify(*up)

c.log.Print("Resigned")
}
Loading