Skip to content

Commit

Permalink
Use kube API URL in join token to validate worker connectivity
Browse files Browse the repository at this point in the history
Signed-off-by: Kimmo Lehto <[email protected]>
  • Loading branch information
kke committed Dec 4, 2024
1 parent 388ec1e commit ccaea92
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 123 deletions.
16 changes: 14 additions & 2 deletions phase/initialize_k0s.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1"
"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster"
Expand Down Expand Up @@ -115,11 +116,22 @@ func (p *InitializeK0s) Run() error {
return err
}

log.Infof("%s: waiting for kubernetes api to respond", h)
if err := retry.Timeout(context.TODO(), retry.DefaultTimeout, node.KubeAPIReadyFunc(h, p.Config)); err != nil {
log.Infof("%s: wait for kubernetes to reach ready state", h)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := retry.Context(ctx, func(_ context.Context) error {
out, err := h.ExecOutput(h.Configurer.KubectlCmdf(h, h.K0sDataDir(), "get --raw='/readyz'"), exec.Sudo(h))
if out != "ok" {
return fmt.Errorf("kubernetes api /readyz responded with %q", out)
}
return err
})
if err != nil {
return fmt.Errorf("kubernetes not ready: %w", err)
}

h.Metadata.Ready = true

return nil
})
if err != nil {
Expand Down
100 changes: 54 additions & 46 deletions phase/install_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ func (p *InstallControllers) Prepare(config *v1beta1.Cluster) error {
p.hosts = p.Config.Spec.Hosts.Controllers().Filter(func(h *cluster.Host) bool {
return !h.Reset && !h.Metadata.NeedsUpgrade && (h != p.leader && h.Metadata.K0sRunningVersion == nil)
})
log.Debug("hosts selected for phase:")
for _, h := range p.hosts {
log.Debugf(" - %s", h)
}
log.Debug("leader:")
log.Debugf(" - %s", p.leader)

return nil
}
Expand Down Expand Up @@ -65,13 +71,13 @@ func (p *InstallControllers) CleanUp() {

func (p *InstallControllers) After() error {
for i, h := range p.hosts {
if h.Metadata.K0sJoinTokenID == "" {
h.Metadata.K0sTokenData.Token = ""
if h.Metadata.K0sTokenData.Token == "" {
continue
}
h.Metadata.K0sJoinToken = ""
err := p.Wet(p.leader, fmt.Sprintf("invalidate k0s join token for controller %s", h), func() error {
log.Debugf("%s: invalidating join token for controller %d", p.leader, i+1)
return p.leader.Exec(p.leader.Configurer.K0sCmdf("token invalidate --data-dir=%s %s", p.leader.K0sDataDir(), h.Metadata.K0sJoinTokenID), exec.Sudo(p.leader))
return p.leader.Exec(p.leader.Configurer.K0sCmdf("token invalidate --data-dir=%s %s", p.leader.K0sDataDir(), h.Metadata.K0sTokenData.ID), exec.Sudo(p.leader))
})
if err != nil {
log.Warnf("%s: failed to invalidate worker join token: %v", p.leader, err)
Expand All @@ -88,54 +94,51 @@ func (p *InstallControllers) After() error {

// Run the phase
func (p *InstallControllers) Run() error {
url := p.Config.Spec.InternalKubeAPIURL()
healthz := fmt.Sprintf("%s/healthz", url)

err := p.parallelDo(p.hosts, func(h *cluster.Host) error {
if p.IsWet() || !p.leader.Metadata.DryRunFakeLeader {
log.Infof("%s: validating api connection to %s", h, url)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := retry.Context(ctx, node.HTTPStatusFunc(h, healthz, 200, 401)); err != nil {
return fmt.Errorf("failed to connect from controller to kubernetes api at %s - check networking", url)
}
} else {
log.Warnf("%s: dry-run: skipping api connection validation to %s because cluster is not running", h, url)
}
return nil
})
if err != nil {
return err
}

for _, h := range p.hosts {
var token string
var tokenID string

if p.IsWet() {
log.Infof("%s: generating token", p.leader)
token, err = p.Config.Spec.K0s.GenerateToken(
log.Infof("%s: generate join token for %s", p.leader, h)
token, err := p.Config.Spec.K0s.GenerateToken(
p.leader,
"controller",
time.Duration(10)*time.Minute,
)
if err != nil {
return err
}
h.Metadata.K0sJoinToken = token
tokenID, err = cluster.TokenID(token)
tokenData, err := cluster.ParseToken(token)
if err != nil {
return err
}
log.Debugf("%s: join token ID: %s", p.leader, tokenID)
h.Metadata.K0sJoinTokenID = tokenID
h.Metadata.K0sTokenData = tokenData
} else {
p.DryMsgf(p.leader, "generate a k0s join token for controller %s", h)
h.Metadata.K0sJoinTokenID = "dry-run"
h.Metadata.K0sTokenData.ID = "dry-run"
h.Metadata.K0sTokenData.URL = p.Config.Spec.KubeAPIURL()
}

log.Infof("%s: writing join token", h)
if err := h.Configurer.WriteFile(h, h.K0sJoinTokenPath(), h.Metadata.K0sJoinToken, "0640"); err != nil {
}
err := p.parallelDo(p.hosts, func(h *cluster.Host) error {
if p.IsWet() || !p.leader.Metadata.DryRunFakeLeader {
log.Infof("%s: validating api connection to %s using join token", h, h.Metadata.K0sTokenData.URL)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := retry.Context(ctx, node.HTTPStatusFunc(h, h.Metadata.K0sTokenData.URL, 200, 401, 404)); err != nil {
return fmt.Errorf("failed to connect from controller to kubernetes api - check networking: %w", err)
}
} else {
log.Warnf("%s: dry-run: skipping api connection validation to because cluster is not actually running", h)
}
return nil
})
if err != nil {
return err
}
return p.parallelDo(p.hosts, func(h *cluster.Host) error {
tokenPath := h.K0sJoinTokenPath()
log.Infof("%s: writing join token to %s", h, tokenPath)
err := p.Wet(h, fmt.Sprintf("write k0s join token to %s", tokenPath), func() error {
return h.Configurer.WriteFile(h, tokenPath, h.Metadata.K0sTokenData.Token, "0600")
})
if err != nil {
return err
}

Expand Down Expand Up @@ -180,17 +183,22 @@ func (p *InstallControllers) Run() error {
return err
}

if err := p.waitJoined(h); err != nil {
return err
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := retry.Context(ctx, func(_ context.Context) error {
out, err := h.ExecOutput(h.Configurer.KubectlCmdf(h, h.K0sDataDir(), "get --raw='/readyz?verbose=true'"), exec.Sudo(h))
if err != nil {
return fmt.Errorf("readiness endpoint reports %q: %w", out, err)
}
return nil
})
if err != nil {
return fmt.Errorf("controller did not reach ready state: %w", err)
}
}
h.Metadata.Ready = true
}

return nil
}
h.Metadata.Ready = true
}

func (p *InstallControllers) waitJoined(h *cluster.Host) error {
log.Infof("%s: waiting for kubernetes api to respond", h)
return retry.Timeout(context.TODO(), retry.DefaultTimeout, node.KubeAPIReadyFunc(h, p.Config))
return nil
})
}
75 changes: 39 additions & 36 deletions phase/install_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,27 +66,26 @@ func (p *InstallWorkers) CleanUp() {
func (p *InstallWorkers) After() error {
if NoWait {
for _, h := range p.hosts {
if h.Metadata.K0sJoinToken != "" {
if h.Metadata.K0sTokenData.Token != "" {
log.Warnf("%s: --no-wait given, created join tokens will remain valid for 10 minutes", p.leader)
break
}
}
return nil
}
for i, h := range p.hosts {
if h.Metadata.K0sJoinTokenID == "" {
h.Metadata.K0sTokenData.Token = ""
if h.Metadata.K0sTokenData.ID == "" {
continue
}
h.Metadata.K0sJoinToken = ""
err := p.Wet(p.leader, fmt.Sprintf("invalidate k0s join token for worker %s", h), func() error {
log.Debugf("%s: invalidating join token for worker %d", p.leader, i+1)
return p.leader.Exec(p.leader.Configurer.K0sCmdf("token invalidate --data-dir=%s %s", p.leader.K0sDataDir(), h.Metadata.K0sJoinTokenID), exec.Sudo(p.leader))
return p.leader.Exec(p.leader.Configurer.K0sCmdf("token invalidate --data-dir=%s %s", p.leader.K0sDataDir(), h.Metadata.K0sTokenData.ID), exec.Sudo(p.leader))
})
if err != nil {
log.Warnf("%s: failed to invalidate worker join token: %v", p.leader, err)
}
_ = p.Wet(h, "overwrite k0s join token file", func() error {

if err := h.Configurer.WriteFile(h, h.K0sJoinTokenPath(), "# overwritten by k0sctl after join\n", "0600"); err != nil {
log.Warnf("%s: failed to overwrite the join token file at %s", h, h.K0sJoinTokenPath())
}
Expand All @@ -98,30 +97,9 @@ func (p *InstallWorkers) After() error {

// Run the phase
func (p *InstallWorkers) Run() error {
url := p.Config.Spec.InternalKubeAPIURL()
healthz := fmt.Sprintf("%s/healthz", url)

err := p.parallelDo(p.hosts, func(h *cluster.Host) error {
if p.IsWet() || !p.leader.Metadata.DryRunFakeLeader {
log.Infof("%s: validating api connection to %s", h, url)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := retry.Context(ctx, node.HTTPStatusFunc(h, healthz, 200, 401)); err != nil {
return fmt.Errorf("failed to connect from worker to kubernetes api at %s - check networking", url)
}
} else {
log.Warnf("%s: dry-run: skipping api connection validation to %s because cluster is not running", h, url)
}
return nil
})

if err != nil {
return err
}

for i, h := range p.hosts {
log.Infof("%s: generating a join token for worker %d", p.leader, i+1)
err = p.Wet(p.leader, fmt.Sprintf("generate a k0s join token for worker %s", h), func() error {
err := p.Wet(p.leader, fmt.Sprintf("generate a k0s join token for worker %s", h), func() error {
t, err := p.Config.Spec.K0s.GenerateToken(
p.leader,
"worker",
Expand All @@ -130,29 +108,54 @@ func (p *InstallWorkers) Run() error {
if err != nil {
return err
}
h.Metadata.K0sJoinToken = t

ti, err := cluster.TokenID(t)
td, err := cluster.ParseToken(t)
if err != nil {
return err
return fmt.Errorf("parse k0s token: %w", err)
}
h.Metadata.K0sJoinTokenID = ti

log.Debugf("%s: join token ID: %s", h, ti)
h.Metadata.K0sTokenData = td

return nil
}, func() error {
h.Metadata.K0sJoinTokenID = "dry-run"
h.Metadata.K0sTokenData.ID = "dry-run"
h.Metadata.K0sTokenData.URL = p.Config.Spec.KubeAPIURL()
return nil
})
if err != nil {
return err
}
}

err := p.parallelDo(p.hosts, func(h *cluster.Host) error {
if p.IsWet() || !p.leader.Metadata.DryRunFakeLeader {
log.Infof("%s: validating api connection to %s using join token", h, h.Metadata.K0sTokenData.URL)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := retry.Context(ctx, func(_ context.Context) error {
err := h.Exec(h.Configurer.KubectlCmdf(h, h.K0sDataDir(), "get --raw='/version' --kubeconfig=/dev/stdin"), exec.Sudo(h), exec.Stdin(string(h.Metadata.K0sTokenData.Kubeconfig)))
if err != nil {
return fmt.Errorf("failed to connect to kubernetes api using the join token - check networking: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf("connectivity check failed: %w", err)
}
} else {
log.Warnf("%s: dry-run: skipping api connection validation because cluster is not actually running", h)
}
return nil
})
if err != nil {
return err
}

return p.parallelDo(p.hosts, func(h *cluster.Host) error {
err := p.Wet(h, fmt.Sprintf("write k0s join token to %s", h.K0sJoinTokenPath()), func() error {
log.Infof("%s: writing join token", h)
return h.Configurer.WriteFile(h, h.K0sJoinTokenPath(), h.Metadata.K0sJoinToken, "0640")
tokenPath := h.K0sJoinTokenPath()
err := p.Wet(h, fmt.Sprintf("write k0s join token to %s", tokenPath), func() error {
log.Infof("%s: writing join token to %s", h, tokenPath)
return h.Configurer.WriteFile(h, tokenPath, h.Metadata.K0sTokenData.Token, "0600")
})
if err != nil {
return err
Expand Down
4 changes: 0 additions & 4 deletions phase/reinstall.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,5 @@ func (p *Reinstall) reinstall(h *cluster.Host) error {
return fmt.Errorf("restart after reinstall: %w", err)
}

if h != p.Config.Spec.K0sLeader() {
return nil
}

return nil
}
21 changes: 11 additions & 10 deletions phase/upgrade_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,17 @@ func (p *UpgradeControllers) Run() error {
}

if p.IsWet() {
if err := retry.Timeout(context.TODO(), retry.DefaultTimeout, node.KubeAPIReadyFunc(h, p.Config)); err != nil {
return fmt.Errorf("kube api did not become ready: %w", err)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := retry.Context(ctx, func(_ context.Context) error {
out, err := h.ExecOutput(h.Configurer.KubectlCmdf(h, h.K0sDataDir(), "get --raw='/readyz?verbose=true'"), exec.Sudo(h))
if err != nil {
return fmt.Errorf("readiness endpoint reports %q: %w", out, err)
}
return nil
})
if err != nil {
return fmt.Errorf("controller did not reach ready state: %w", err)
}
}

Expand All @@ -147,13 +156,5 @@ func (p *UpgradeControllers) Run() error {
return nil
}

log.Infof("%s: waiting for the scheduler to become ready", leader)
if err := retry.Timeout(context.TODO(), retry.DefaultTimeout, node.ScheduledEventsAfterFunc(leader, time.Now())); err != nil {
if !Force {
return fmt.Errorf("failed to observe scheduling events after api start-up, you can ignore this check by using --force: %w", err)
}
log.Warnf("%s: failed to observe scheduling events after api start-up: %s", leader, err)
}

return nil
}
3 changes: 1 addition & 2 deletions pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ type HostMetadata struct {
K0sInstalled bool
K0sExistingConfig string
K0sNewConfig string
K0sJoinToken string
K0sJoinTokenID string
K0sTokenData TokenData
K0sStatusArgs Flags
Arch string
IsK0sLeader bool
Expand Down
Loading

0 comments on commit ccaea92

Please sign in to comment.