Skip to content

Commit

Permalink
Add await logic for DaemonSets (#2953)
Browse files Browse the repository at this point in the history
This adds await logic for DaemonSets with RollingUpdate or OnDelete
update strategies.

The implementation is largely based on the existing StatefulSet logic
with two high-level simplifications:

1. We use
[kstatus](https://pkg.go.dev/sigs.k8s.io/cli-utils/pkg/kstatus/status)
to decide when a DaemonSet is ready.
2. We use a `PodAggregator` to handle reporting pod statuses.

Importantly, unlike StatefulSet this means we do not currently inspect
pods to decide readiness -- we only use them for informational purposes.
I _think_ this is sufficient but I could easily be missing something. I
haven't been able to simulate situations where this logic doesn't fully
capture readiness and we would need to inspect pod statuses.

A failing e2e test was added in YAML under the awkwardly name 
`tests/sdk/java` path.

Unit tests were added around the public `Creation`, `Update`, etc.
methods in order to more fully exercise timeouts and retries. To that
end I introduced a mock clock package which might be controversial. IMO
Go doesn't have a great OSS mock clock but something like this can be
very helpful for testing.

I'm still somewhat confused by the role of `await.Read` since it doesn't
actually await anything, but it's implemented similar to StatefulSet as
a one-shot read + readiness check.

Fixes #609
Refs #2800
Refs #2799
Refs #2798
  • Loading branch information
blampe authored May 17, 2024
1 parent 9d759ca commit 04fb15c
Show file tree
Hide file tree
Showing 20 changed files with 1,897 additions and 28 deletions.
21 changes: 15 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
## Unreleased

- Fix Release behavior to deep merge `valueYamlFiles` to match Helm. (https://github.com/pulumi/pulumi-kubernetes/pull/2963)
- Fix Chart previews when the cluster is unreachable. (https://github.com/pulumi/pulumi-kubernetes/pull/2992)
- Fix a panic that could occur when a missing field became `null`. (https://github.com/pulumi/pulumi-kubernetes/issues/1970)
- Add field manager's name to server-side apply conflict errors. (https://github.com/pulumi/pulumi-kubernetes/pull/2983)
- Helm Chart V4 (https://github.com/pulumi/pulumi-kubernetes/pull/2947)
- New annotation: deletionPropagationPolicy (https://github.com/pulumi/pulumi-kubernetes/pull/3011)
### Added

- Added a new Helm Chart v4 resource. (https://github.com/pulumi/pulumi-kubernetes/pull/2947)
- Added a new `deletionPropagationPolicy` annotation. (https://github.com/pulumi/pulumi-kubernetes/pull/3011)
- Server-side apply conflict errors now include the original field manager's name. (https://github.com/pulumi/pulumi-kubernetes/pull/2983)

### Changed

- Pulumi will now wait for DaemonSets to become ready. (https://github.com/pulumi/pulumi-kubernetes/pull/2953)
- The Release resource's merge behavior for `valueYamlFiles` now more closely matches Helm's behavior. (https://github.com/pulumi/pulumi-kubernetes/pull/2963)

### Fixed

- Chart previews no longer fail when the cluster is unreachable. (https://github.com/pulumi/pulumi-kubernetes/pull/2992)
- Fixed a panic that could occur when a missing field became `null`. (https://github.com/pulumi/pulumi-kubernetes/issues/1970)

## 4.11.0 (April 17, 2024)

Expand Down
1 change: 1 addition & 0 deletions provider/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/golang/protobuf v1.5.4
github.com/google/gnostic-models v0.6.8
github.com/imdario/mergo v0.3.16
github.com/jonboulle/clockwork v0.4.0
github.com/mitchellh/mapstructure v1.5.0
github.com/onsi/ginkgo/v2 v2.15.0
github.com/onsi/gomega v1.31.0
Expand Down
2 changes: 2 additions & 0 deletions provider/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1247,6 +1247,8 @@ github.com/joefitzgerald/rainbow-reporter v0.1.0/go.mod h1:481CNgqmVHQZzdIbN52Cu
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
Expand Down
20 changes: 17 additions & 3 deletions provider/pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"

fluxssa "github.com/fluxcd/pkg/ssa"
"github.com/jonboulle/clockwork"
"github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/clients"
"github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/cluster"
"github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/host"
Expand Down Expand Up @@ -77,6 +78,8 @@ type ProviderConfig struct {

// explicit awaiters (for testing purposes)
awaiters map[string]awaitSpec

clock clockwork.Clock
}

type CreateConfig struct {
Expand Down Expand Up @@ -169,7 +172,7 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) {

var outputs *unstructured.Unstructured
var client dynamic.ResourceInterface
err := retry.SleepingRetry(
retrier := retry.SleepingRetry(
func(i uint) error {
// Recreate the client for resource, in case the client's cache of the server API was
// invalidated. For example, when a CRD is created, it will invalidate the client cache;
Expand Down Expand Up @@ -235,8 +238,13 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) {
return nil
}).
WithMaxRetries(5).
WithBackoffFactor(2).
Do(apierrors.IsNotFound, meta.IsNoMatchError)
WithBackoffFactor(2)

if c.clock != nil {
retrier = retrier.WithSleep(c.clock.Sleep)
}

err := retrier.Do(apierrors.IsNotFound, meta.IsNoMatchError)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -271,6 +279,7 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) {
logger: c.DedupLogger,
timeout: timeout,
clusterVersion: c.ClusterVersion,
clock: c.clock,
}
waitErr := awaiter.awaitCreation(conf)
if waitErr != nil {
Expand All @@ -287,6 +296,8 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) {
// If the client fails to get the live object for some reason, DO NOT return the error. This
// will leak the fact that the object was successfully created. Instead, fall back to the
// last-seen live object.

// TODO: We should be able to use the last-seen object from the await's watch.
live, err := client.Get(c.Context, outputs.GetName(), metav1.GetOptions{})
if err != nil {
return outputs, nil
Expand Down Expand Up @@ -332,6 +343,7 @@ func Read(c ReadConfig) (*unstructured.Unstructured, error) {
currentOutputs: outputs,
logger: c.DedupLogger,
clusterVersion: c.ClusterVersion,
clock: c.clock,
}
waitErr := awaiter.awaitRead(conf)
if waitErr != nil {
Expand Down Expand Up @@ -417,6 +429,7 @@ func Update(c UpdateConfig) (*unstructured.Unstructured, error) {
logger: c.DedupLogger,
timeout: timeout,
clusterVersion: c.ClusterVersion,
clock: c.clock,
},
lastOutputs: liveOldObj,
}
Expand Down Expand Up @@ -824,6 +837,7 @@ func Deletion(c DeleteConfig) error {
logger: c.DedupLogger,
timeout: timeout,
clusterVersion: c.ClusterVersion,
clock: c.clock,
},
clientForResource: client,
})
Expand Down
50 changes: 42 additions & 8 deletions provider/pkg/await/awaiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"reflect"
"time"

"github.com/jonboulle/clockwork"
checkerlog "github.com/pulumi/cloud-ready-checks/pkg/checker/logging"
"github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/clients"
"github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/cluster"
Expand Down Expand Up @@ -51,6 +52,7 @@ type createAwaitConfig struct {
currentOutputs *unstructured.Unstructured
timeout *time.Duration
clusterVersion *cluster.ServerVersion
clock clockwork.Clock
}

func (cac *createAwaitConfig) logStatus(sev diag.Severity, message string) {
Expand All @@ -61,6 +63,14 @@ func (cac *createAwaitConfig) logMessage(message checkerlog.Message) {
cac.logger.LogMessage(message)
}

// Clock returns a real or mock clock for the config as appropriate.
func (cac *createAwaitConfig) Clock() clockwork.Clock {
if cac.clock != nil {
return cac.clock
}
return clockwork.NewRealClock()
}

// updateAwaitConfig specifies on which conditions we are to consider a resource "fully updated",
// i.e., the spec of the API object has changed and the controllers have reached a steady state. For
// example, we might consider a `Deployment` "fully updated" only when the previous generation of
Expand All @@ -77,10 +87,12 @@ type deleteAwaitConfig struct {
clientForResource dynamic.ResourceInterface
}

type createAwaiter func(createAwaitConfig) error
type updateAwaiter func(updateAwaitConfig) error
type readAwaiter func(createAwaitConfig) error
type deletionAwaiter func(deleteAwaitConfig) error
type (
createAwaiter func(createAwaitConfig) error
updateAwaiter func(updateAwaitConfig) error
readAwaiter func(createAwaitConfig) error
deletionAwaiter func(deleteAwaitConfig) error
)

func (cac *createAwaitConfig) getTimeout(defaultSeconds int) time.Duration {
if cac.timeout != nil {
Expand All @@ -105,6 +117,9 @@ const (
appsV1StatefulSet = "apps/v1/StatefulSet"
appsV1Beta1StatefulSet = "apps/v1beta1/StatefulSet"
appsV1Beta2StatefulSet = "apps/v1beta2/StatefulSet"
appsV1DaemonSet = "apps/v1/DaemonSet"
appsV1Beta1DaemonSet = "apps/v1beta1/DaemonSet"
appsv1Beta2DaemonSet = "apps/v1beta2/DaemonSet"
autoscalingV1HorizontalPodAutoscaler = "autoscaling/v1/HorizontalPodAutoscaler"
batchV1Job = "batch/v1/Job"
coreV1ConfigMap = "v1/ConfigMap"
Expand All @@ -120,6 +135,7 @@ const (
coreV1ServiceAccount = "v1/ServiceAccount"
extensionsV1Beta1Deployment = "extensions/v1beta1/Deployment"
extensionsV1Beta1Ingress = "extensions/v1beta1/Ingress"
extensionsV1Beta1DaemonSet = "extensions/v1beta1/DaemonSet"
networkingV1Ingress = "networking.k8s.io/v1/Ingress"
networkingV1Beta1Ingress = "networking.k8s.io/v1beta1/Ingress"
rbacAuthorizationV1ClusterRole = "rbac.authorization.k8s.io/v1/ClusterRole"
Expand Down Expand Up @@ -189,10 +205,28 @@ var statefulsetAwaiter = awaitSpec{
awaitDeletion: untilAppsStatefulSetDeleted,
}

var daemonsetAwaiter = awaitSpec{
awaitCreation: func(c createAwaitConfig) error {
return newDaemonSetAwaiter(updateAwaitConfig{createAwaitConfig: c}).Await()
},
awaitUpdate: func(u updateAwaitConfig) error {
return newDaemonSetAwaiter(u).Await()
},
awaitRead: func(c createAwaitConfig) error {
return newDaemonSetAwaiter(updateAwaitConfig{createAwaitConfig: c}).Read()
},
awaitDeletion: func(c deleteAwaitConfig) error {
return newDaemonSetAwaiter(updateAwaitConfig{createAwaitConfig: c.createAwaitConfig}).Delete()
},
}

// NOTE: Some GVKs below are blank so that we can distinguish between resource types that we know
// about, but don't require await logic, vs. resource types that we don't know about.

var awaiters = map[string]awaitSpec{
appsV1DaemonSet: daemonsetAwaiter,
appsV1Beta1DaemonSet: daemonsetAwaiter,
appsv1Beta2DaemonSet: daemonsetAwaiter,
appsV1Deployment: deploymentAwaiter,
appsV1Beta1Deployment: deploymentAwaiter,
appsV1Beta2Deployment: deploymentAwaiter,
Expand Down Expand Up @@ -238,11 +272,11 @@ var awaiters = map[string]awaitSpec{
coreV1ServiceAccount: {
awaitCreation: untilCoreV1ServiceAccountInitialized,
},
extensionsV1Beta1DaemonSet: daemonsetAwaiter,
extensionsV1Beta1Deployment: deploymentAwaiter,

extensionsV1Beta1Ingress: ingressAwaiter,
networkingV1Beta1Ingress: ingressAwaiter,
networkingV1Ingress: ingressAwaiter,
extensionsV1Beta1Ingress: ingressAwaiter,
networkingV1Beta1Ingress: ingressAwaiter,
networkingV1Ingress: ingressAwaiter,

rbacAuthorizationV1ClusterRole: { /* NONE */ },
rbacAuthorizationV1ClusterRoleBinding: { /* NONE */ },
Expand Down
Loading

0 comments on commit 04fb15c

Please sign in to comment.