Skip to content

Commit

Permalink
Unittests for await logic (Delete) (#3012)
Browse files Browse the repository at this point in the history
<!--Thanks for your contribution. See [CONTRIBUTING](CONTRIBUTING.md)
    for Pulumi's contribution guidelines.

    Help us merge your changes more quickly by adding more details such
    as labels, milestones, and reviewers.-->

### Proposed changes

<!--Give us a brief description of what you've done and what it solves.
-->

### Related issues (optional)

<!--Refer to related PRs or issues: #1234, or 'Fixes #1234' or 'Closes
#1234'.
Or link to full URLs to issues or pull requests in other GitHub
repositories. -->

Closes #2800
  • Loading branch information
EronWright authored May 20, 2024
1 parent 04fb15c commit f404883
Showing 1 changed file with 353 additions and 3 deletions.
356 changes: 353 additions & 3 deletions provider/pkg/await/await_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
clienttesting "k8s.io/client-go/testing"
kubetesting "k8s.io/client-go/testing"
)

var (
Expand All @@ -42,7 +42,8 @@ var (
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]any{
"name": "foo",
"name": "foo",
"namespace": "default",
},
"spec": map[string]any{
"containers": []any{
Expand Down Expand Up @@ -351,6 +352,344 @@ func Test_Creation(t *testing.T) {
}
}

func Test_Deletion(t *testing.T) {
type testCtx struct {
ctx context.Context
cancel context.CancelFunc
host *fakehost.HostClient
config *DeleteConfig
disco *fake.SimpleDiscovery
mapper *fake.SimpleRESTMapper
client *fake.SimpleDynamicClient
}
type args struct {
serverSideApply bool
resType tokens.Type
objects []runtime.Object
inputs *unstructured.Unstructured
outputs *unstructured.Unstructured
name string
}
type client struct {
RESTMapperF func(mapper meta.ResettableRESTMapper) meta.ResettableRESTMapper
GenericClientF func(client dynamic.Interface) dynamic.Interface
}

type reactionF func(t *testing.T, ctx testCtx, action kubetesting.Action) (handled bool, ret runtime.Object, err error)

type watchReactionF func(t *testing.T, ctx testCtx, action kubetesting.Action) (handled bool, ret watch.Interface, err error)

type expectF func(t *testing.T, ctx testCtx, err error)

// reactions

suppressDeletion := func(t *testing.T, ctx testCtx, action kubetesting.Action) (bool, runtime.Object, error) {
return true, nil, nil
}

cancelAwait := func(t *testing.T, ctx testCtx, action kubetesting.Action) (bool, runtime.Object, error) {
ctx.cancel()
return false, nil, nil
}

// awaiters

awaitNoop := func(t *testing.T, ctx testCtx) deletionAwaiter {
return func(dac deleteAwaitConfig) error {
return nil
}
}

awaitError := func(t *testing.T, ctx testCtx) deletionAwaiter {
return func(dac deleteAwaitConfig) error {
return serviceUnavailableErr
}
}

awaitUnexpected := func(t *testing.T, ctx testCtx) deletionAwaiter {
return func(dac deleteAwaitConfig) error {
require.Fail(t, "Unexpected call to awaiter")
return nil
}
}

// expectations

failed := func(target error) expectF {
return func(t *testing.T, ctx testCtx, err error) {
require.ErrorAs(t, err, &target)
}
}

succeeded := func() expectF {
return func(t *testing.T, ctx testCtx, err error) {
require.NoError(t, err)
}
}
deleted := func(ns, name string) expectF {
return func(t *testing.T, ctx testCtx, err error) {
gvr, err := clients.GVRForGVK(ctx.mapper, ctx.config.Inputs.GroupVersionKind())
require.NoError(t, err)
_, err = ctx.client.Tracker().Get(gvr, ns, name)
require.Truef(t, apierrors.IsNotFound(err), "expected notfound, got an object")
}
}

tests := []struct {
name string
client client
args args
expect []expectF
reaction []reactionF
watcher *watch.RaceFreeFakeWatcher
awaiter func(t *testing.T, ctx testCtx) deletionAwaiter
}{
{
name: "ServiceUnavailable",
client: client{
RESTMapperF: func(mapper meta.ResettableRESTMapper) meta.ResettableRESTMapper {
return FailedRESTMapper(mapper, serviceUnavailableErr)
},
},
args: args{
resType: tokens.Type("kubernetes:core/v1:Pod"),
name: "foo",
objects: []runtime.Object{validPodUnstructured},
inputs: validPodUnstructured,
outputs: validPodUnstructured,
},
expect: []expectF{failed(serviceUnavailableErr)},
},
{
name: "Namespaced",
args: args{
resType: tokens.Type("kubernetes:core/v1:Pod"),
name: "foo",
objects: []runtime.Object{validPodUnstructured},
inputs: validPodUnstructured,
outputs: validPodUnstructured,
},
awaiter: awaitNoop,
expect: []expectF{succeeded(), deleted("default", "foo")},
},
{
name: "NonNamespaced",
args: args{
resType: tokens.Type("kubernetes:rbac.authorization.k8s.io/v1:ClusterRole"),
name: "foo",
objects: []runtime.Object{validClusterRoleUnstructured},
inputs: validClusterRoleUnstructured,
outputs: validClusterRoleUnstructured,
},
awaiter: awaitNoop,
expect: []expectF{succeeded(), deleted("default", "foo")},
},
{
name: "Gone",
args: args{
resType: tokens.Type("kubernetes:core/v1:Pod"),
name: "foo",
objects: []runtime.Object{ /* empty */ },
inputs: validPodUnstructured,
outputs: validPodUnstructured,
},
awaiter: awaitUnexpected,
expect: []expectF{succeeded()},
},
{
name: "SkipAwait",
args: args{
resType: tokens.Type("kubernetes:core/v1:Pod"),
name: "foo",
objects: []runtime.Object{validPodUnstructured},
inputs: withSkipAwait(validPodUnstructured),
outputs: validPodUnstructured,
},
reaction: []reactionF{suppressDeletion}, // suppress deletion to safeguard that the built-in watcher is not used.
awaiter: awaitUnexpected,
expect: []expectF{succeeded()},
},
{
name: "AwaitError",
args: args{
resType: tokens.Type("kubernetes:core/v1:Pod"),
name: "foo",
objects: []runtime.Object{validPodUnstructured},
inputs: validPodUnstructured,
outputs: validPodUnstructured,
},
awaiter: awaitError,
expect: []expectF{failed(serviceUnavailableErr)},
},
{
name: "Deleted",
args: args{
resType: tokens.Type("kubernetes:core/v1:Pod"),
name: "foo",
objects: []runtime.Object{validPodUnstructured},
inputs: validPodUnstructured,
outputs: validPodUnstructured,
},
awaiter: nil,
expect: []expectF{succeeded(), deleted("default", "foo")},
},
{
name: "WatchTimeout",
args: args{
resType: tokens.Type("kubernetes:core/v1:Pod"),
name: "foo",
objects: []runtime.Object{validPodUnstructured},
inputs: validPodUnstructured,
outputs: validPodUnstructured,
},
reaction: []reactionF{suppressDeletion},
awaiter: nil,
watcher: withWatchClosed(watch.NewRaceFreeFake()),
expect: []expectF{failed(&timeoutError{})},
},
{
name: "WatchTimeoutWithRecovery",
args: args{
resType: tokens.Type("kubernetes:core/v1:Pod"),
name: "foo",
objects: []runtime.Object{validPodUnstructured},
inputs: validPodUnstructured,
outputs: validPodUnstructured,
},
reaction: nil,
awaiter: nil,
watcher: withWatchClosed(watch.NewRaceFreeFake()),
expect: []expectF{succeeded()},
},
{
name: "WatchError",
args: args{
resType: tokens.Type("kubernetes:core/v1:Pod"),
name: "foo",
objects: []runtime.Object{validPodUnstructured},
inputs: validPodUnstructured,
outputs: validPodUnstructured,
},
reaction: []reactionF{suppressDeletion},
awaiter: nil,
watcher: withWatchError(watch.NewRaceFreeFake(), apierrors.NewTimeoutError("test", 30)),
expect: []expectF{failed(&initializationError{})},
},
{
name: "WatchErrorWithRecovery",
args: args{
resType: tokens.Type("kubernetes:core/v1:Pod"),
name: "foo",
objects: []runtime.Object{validPodUnstructured},
inputs: validPodUnstructured,
outputs: validPodUnstructured,
},
reaction: nil,
awaiter: nil,
watcher: withWatchError(watch.NewRaceFreeFake(), apierrors.NewTimeoutError("test", 30)),
expect: []expectF{succeeded()},
},
{
name: "Cancel",
args: args{
resType: tokens.Type("kubernetes:core/v1:Pod"),
name: "foo",
objects: []runtime.Object{validPodUnstructured},
inputs: validPodUnstructured,
outputs: validPodUnstructured,
},
reaction: []reactionF{cancelAwait, suppressDeletion},
awaiter: nil,
expect: []expectF{failed(&cancellationError{})},
},
{
name: "CancelWithRecovery",
args: args{
resType: tokens.Type("kubernetes:core/v1:Pod"),
name: "foo",
objects: []runtime.Object{validPodUnstructured},
inputs: validPodUnstructured,
outputs: validPodUnstructured,
},
reaction: []reactionF{cancelAwait},
awaiter: nil,
expect: []expectF{succeeded()},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt := tt
require.NotEmpty(t, tt.name, "Test case must have a name")

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

host := &fakehost.HostClient{}
client, disco, mapper, clientset := fake.NewSimpleDynamicClient(fake.WithObjects(tt.args.objects...))
resources, err := openapi.GetResourceSchemasForClient(disco)
require.NoError(t, err)

if tt.client.GenericClientF != nil {
client.GenericClient = tt.client.GenericClientF(client.GenericClient)
}
if tt.client.RESTMapperF != nil {
client.RESTMapper = tt.client.RESTMapperF(client.RESTMapper)
}

urn := resource.NewURN(tokens.QName("teststack"), tokens.PackageName("testproj"), tokens.Type(""), tt.args.resType, "testresource")
config := DeleteConfig{
ProviderConfig: ProviderConfig{
Context: ctx,
Host: host,
URN: urn,
InitialAPIVersion: corev1.SchemeGroupVersion.String(),
FieldManager: "test",
ClusterVersion: testServerVersion,
ClientSet: client,
DedupLogger: logging.NewLogger(context.Background(), host, urn),
Resources: resources,
ServerSideApply: tt.args.serverSideApply,
awaiters: map[string]awaitSpec{},
},
Inputs: tt.args.inputs,
Outputs: tt.args.outputs,
Name: tt.args.name,
}
testCtx := testCtx{
ctx: ctx,
cancel: cancel,
host: host,
config: &config,
disco: disco,
mapper: mapper,
client: clientset,
}
for i := len(tt.reaction) - 1; i >= 0; i-- {
reaction := tt.reaction[i]
clientset.PrependReactor("*", "*", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
return reaction(t, testCtx, action)
})
}
if tt.watcher != nil {
clientset.PrependWatchReactor("*", func(action kubetesting.Action) (handled bool, ret watch.Interface, err error) {
return true, tt.watcher, nil
})
}
if tt.awaiter != nil {
id := fmt.Sprintf("%s/%s", tt.args.inputs.GetAPIVersion(), tt.args.inputs.GetKind())
config.awaiters[id] = awaitSpec{
awaitDeletion: tt.awaiter(t, testCtx),
}
}
err = Deletion(config)
for _, e := range tt.expect {
e(t, testCtx, err)
}
})
}
}

func TestAwaitSSAConflict(t *testing.T) {
client, _, _, clientset := fake.NewSimpleDynamicClient()

Expand All @@ -372,7 +711,7 @@ func TestAwaitSSAConflict(t *testing.T) {
}

// Intercept the SSA and respond with a conflict error.
clientset.PrependReactor("patch", "pods", func(_ clienttesting.Action) (bool, runtime.Object, error) {
clientset.PrependReactor("patch", "pods", func(_ kubetesting.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewApplyConflict(nil, "conflict")
})

Expand Down Expand Up @@ -439,6 +778,17 @@ func withGenerateName(obj *unstructured.Unstructured) *unstructured.Unstructured
return copy
}

func withWatchError(watcher *watch.RaceFreeFakeWatcher, err *apierrors.StatusError) *watch.RaceFreeFakeWatcher {
obj := err.Status()
watcher.Error(&obj)
return watcher
}

func withWatchClosed(watcher *watch.RaceFreeFakeWatcher) *watch.RaceFreeFakeWatcher {
watcher.Stop()
return watcher
}

// --------------------------------------------------------------------------

// Mock implementations of Kubernetes client stuff.
Expand Down

0 comments on commit f404883

Please sign in to comment.