diff --git a/pkg/sink/sink.go b/pkg/sink/sink.go index 6bbea87d7..6c56e7fa1 100644 --- a/pkg/sink/sink.go +++ b/pkg/sink/sink.go @@ -89,7 +89,8 @@ func (r Sink) HandleEvent(response http.ResponseWriter, request *http.Request) { // Execute each Trigger for _, t := range el.Spec.Triggers { go func(t triggersv1.EventListenerTrigger) { - if err := r.processTrigger(&t, request, event, eventID, eventLog); err != nil { + localRequest := request.Clone(request.Context()) + if err := r.processTrigger(&t, localRequest, event, eventID, eventLog); err != nil { result <- http.StatusAccepted return } diff --git a/pkg/sink/sink_test.go b/pkg/sink/sink_test.go index 7826e13b9..44ac3961b 100644 --- a/pkg/sink/sink_test.go +++ b/pkg/sink/sink_test.go @@ -248,19 +248,7 @@ func TestHandleEvent(t *testing.T) { wantActions = append(wantActions, action) } // Sort actions (we do not know what order they executed in) - gotActions := dynamicClient.Actions() - sort.SliceStable(gotActions, func(i int, j int) bool { - objectI := gotActions[i].(ktesting.CreateAction).GetObject() - objectJ := gotActions[j].(ktesting.CreateAction).GetObject() - unstructuredI, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(objectI) - unstructuredJ, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(objectJ) - nameI := (&unstructured.Unstructured{Object: unstructuredI}).GetName() - nameJ := (&unstructured.Unstructured{Object: unstructuredJ}).GetName() - if nameI == "" || nameJ == "" { - t.Errorf("Error getting resource name from action; names are empty") - } - return nameI < nameJ - }) + gotActions := sortCreateActions(t, dynamicClient.Actions()) if diff := cmp.Diff(wantActions, gotActions); diff != "" { t.Errorf("Actions mismatch (-want +got): %s", diff) } @@ -449,6 +437,201 @@ func TestHandleEventWithInterceptors(t *testing.T) { } } +// nameInterceptor is an HTTP server that reads a "Name" from the header, and +// writes the name in its body as {"name": "VALUE"}. +// It expects a request with the header "Name". +// The response body will always return with {"name": "VALUE"} where VALUE is +// the value of the first element in the header "Name". +type nameInterceptor struct{} + +func (f *nameInterceptor) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Copy over all headers + for k := range r.Header { + for _, v := range r.Header[k] { + w.Header().Add(k, v) + } + } + // Read the Name header + var name string + if nameValue, ok := r.Header["Name"]; ok { + name = nameValue[0] + } + // Write the name to the body + body := fmt.Sprintf(`{"name": "%s"}`, name) + fmt.Printf("~~~ body: %s\n", body) + w.Write([]byte(body)) +} + +func TestHandleEventWithWebhookInterceptors(t *testing.T) { + namespace := "foo" + eventBody := json.RawMessage(`{}`) + numTriggers := 10 + + resourceTemplate := pipelinev1.PipelineResource{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "tekton.dev/v1alpha1", + Kind: "PipelineResource", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "$(params.name)", + Namespace: namespace, + }, + Spec: pipelinev1.PipelineResourceSpec{ + Type: pipelinev1.PipelineResourceTypeGit, + Params: []pipelinev1.ResourceParam{{ + Name: "url", + Value: "testurl", + }}, + }, + } + resourceTemplateBytes, err := json.Marshal(resourceTemplate) + if err != nil { + t.Fatalf("Error unmarshalling pipelineResource: %s", err) + } + + tt := bldr.TriggerTemplate("tt", namespace, + bldr.TriggerTemplateSpec( + bldr.TriggerTemplateParam("name", "", ""), + bldr.TriggerResourceTemplate(resourceTemplateBytes), + )) + tb := bldr.TriggerBinding("tb", namespace, + bldr.TriggerBindingSpec( + bldr.TriggerBindingParam("name", "$(body.name)"), + )) + + interceptorObjectRef := &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "Service", + Name: "foo", + } + var triggers []triggersv1.EventListenerTrigger + for i := 0; i < numTriggers; i++ { + trigger := triggersv1.EventListenerTrigger{ + Bindings: []*triggersv1.EventListenerBinding{{Name: "tb"}}, + Template: triggersv1.EventListenerTemplate{Name: "tt"}, + Interceptors: []*triggersv1.EventInterceptor{{ + Webhook: &triggersv1.WebhookInterceptor{ + ObjectRef: interceptorObjectRef, + Header: []pipelinev1.Param{bldr.Param("Name", fmt.Sprintf("my-resource-%d", i))}, + }, + }}, + } + triggers = append(triggers, trigger) + } + el := &triggersv1.EventListener{ + ObjectMeta: metav1.ObjectMeta{ + Name: "el", + Namespace: namespace, + }, + Spec: triggersv1.EventListenerSpec{ + Triggers: triggers, + }, + } + + kubeClient := fakekubeclientset.NewSimpleClientset() + test.AddTektonResources(kubeClient) + + triggersClient := faketriggersclientset.NewSimpleClientset() + if _, err := triggersClient.TektonV1alpha1().TriggerTemplates(namespace).Create(tt); err != nil { + t.Fatalf("Error creating TriggerTemplate: %s", err) + } + if _, err := triggersClient.TektonV1alpha1().TriggerBindings(namespace).Create(tb); err != nil { + t.Fatalf("Error creating TriggerBinding: %s", err) + } + if _, err = triggersClient.TektonV1alpha1().EventListeners(namespace).Create(el); err != nil { + t.Fatalf("Error creating EventListener: %s", err) + } + + logger, _ := logging.NewLogger("", "") + + dynamicClient := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme()) + dynamicSet := dynamicclientset.New(tekton.WithClient(dynamicClient)) + + // Redirect all requests to the fake server. + srv := httptest.NewServer(&nameInterceptor{}) + defer srv.Close() + client := srv.Client() + u, _ := url.Parse(srv.URL) + client.Transport = &http.Transport{ + Proxy: http.ProxyURL(u), + } + + r := Sink{ + HTTPClient: srv.Client(), + EventListenerName: el.Name, + EventListenerNamespace: namespace, + DynamicClient: dynamicSet, + DiscoveryClient: kubeClient.Discovery(), + TriggersClient: triggersClient, + KubeClientSet: kubeClient, + Logger: logger, + } + ts := httptest.NewServer(http.HandlerFunc(r.HandleEvent)) + defer ts.Close() + + var wg sync.WaitGroup + wg.Add(numTriggers) + dynamicClient.PrependReactor("*", "*", func(action ktesting.Action) (handled bool, ret runtime.Object, err error) { + defer wg.Done() + return false, nil, nil + }) + + resp, err := http.Post(ts.URL, "application/json", bytes.NewReader(eventBody)) + if err != nil { + t.Fatalf("Error creating Post request: %s", err) + } + + if resp.StatusCode != http.StatusCreated { + t.Fatalf("Response code doesn't match: %v", resp.Status) + } + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Error reading response body: %s", err) + } + + wantBody := Response{ + EventListener: el.Name, + Namespace: el.Namespace, + EventID: eventID, + } + + got := Response{} + if err := json.Unmarshal(body, &got); err != nil { + t.Fatalf("Error unmarshalling response body: %s", err) + } + if diff := cmp.Diff(wantBody, got); diff != "" { + t.Errorf("did not get expected response back -want,+got: %s", diff) + } + + // We expect that the EventListener will be able to immediately handle the event so we + // can use a very short timeout + if waitTimeout(&wg, time.Second) { + t.Fatalf("timed out waiting for reactor to fire") + } + gvr := schema.GroupVersionResource{ + Group: "tekton.dev", + Version: "v1alpha1", + Resource: "pipelineresources", + } + var wantActions []ktesting.Action + for i := 0; i < numTriggers; i++ { + wantResource := resourceTemplate.DeepCopy() + wantResource.ObjectMeta.Name = fmt.Sprintf("my-resource-%d", i) + wantResource.ObjectMeta.Labels = map[string]string{ + resourceLabel: "el", + triggerLabel: "", + eventIDLabel: eventID, + } + action := ktesting.NewCreateAction(gvr, "foo", test.ToUnstructured(t, wantResource)) + wantActions = append(wantActions, action) + } + // Sort actions (we do not know what order they executed in) + gotActions := sortCreateActions(t, dynamicClient.Actions()) + if diff := cmp.Diff(wantActions, gotActions); diff != "" { + t.Errorf("Actions mismatch (-want +got): %s", diff) + } +} + // sequentialInterceptor is a HTTP server that will return sequential responses. // It expects a request of the form `{"i": n}`. // The response body will always return with the next value set, whereas the @@ -604,3 +787,22 @@ func TestExecuteInterceptor_error(t *testing.T) { t.Error("expected sequential interceptor to not be called") } } + +// Sort CreateActions by the name of their resource. +// The Actions must be CreateActions, and they must have an Object that has a +// name. +func sortCreateActions(t *testing.T, actions []ktesting.Action) []ktesting.Action { + sort.SliceStable(actions, func(i int, j int) bool { + objectI := actions[i].(ktesting.CreateAction).GetObject() + objectJ := actions[j].(ktesting.CreateAction).GetObject() + unstructuredI, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(objectI) + unstructuredJ, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(objectJ) + nameI := (&unstructured.Unstructured{Object: unstructuredI}).GetName() + nameJ := (&unstructured.Unstructured{Object: unstructuredJ}).GetName() + if nameI == "" || nameJ == "" { + t.Errorf("Error getting resource name from action; names are empty") + } + return nameI < nameJ + }) + return actions +}