Skip to content

Commit

Permalink
test: add mono vertex e2e tests (#1945)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Keran Yang <[email protected]>
Co-authored-by: Keran Yang <[email protected]>
  • Loading branch information
yhl25 and KeranYang authored Aug 16, 2024
1 parent c4b5d05 commit 7b85e89
Show file tree
Hide file tree
Showing 52 changed files with 562 additions and 192 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ jobs:
fail-fast: false
matrix:
driver: [jetstream]
case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, jetstream-e2e, sdks-e2e, reduce-one-e2e, reduce-two-e2e, udsource-e2e, api-e2e, sideinputs-e2e, idle-source-e2e]
case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, jetstream-e2e, sdks-e2e, reduce-one-e2e, reduce-two-e2e, udsource-e2e, api-e2e, sideinputs-e2e, idle-source-e2e, monovertex-e2e]
include:
- driver: redis
case: e2e
Expand Down
20 changes: 10 additions & 10 deletions pkg/reconciler/monovertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,21 +186,21 @@ func (mr *monoVertexReconciler) reconcilePods(ctx context.Context, monoVtx *dfv1
}
}
if needToCreate {
labels := map[string]string{}
podLabels := map[string]string{}
annotations := map[string]string{}
if x := monoVtx.Spec.Metadata; x != nil {
for k, v := range x.Annotations {
annotations[k] = v
}
for k, v := range x.Labels {
labels[k] = v
podLabels[k] = v
}
}
labels[dfv1.KeyPartOf] = dfv1.Project
labels[dfv1.KeyManagedBy] = dfv1.ControllerMonoVertex
labels[dfv1.KeyComponent] = dfv1.ComponentMonoVertex
labels[dfv1.KeyAppName] = monoVtx.Name
labels[dfv1.KeyMonoVertexName] = monoVtx.Name
podLabels[dfv1.KeyPartOf] = dfv1.Project
podLabels[dfv1.KeyManagedBy] = dfv1.ControllerMonoVertex
podLabels[dfv1.KeyComponent] = dfv1.ComponentMonoVertex
podLabels[dfv1.KeyAppName] = monoVtx.Name
podLabels[dfv1.KeyMonoVertexName] = monoVtx.Name
annotations[dfv1.KeyHash] = hash
annotations[dfv1.KeyReplica] = strconv.Itoa(replica)
// Defaults to udf
Expand All @@ -209,7 +209,7 @@ func (mr *monoVertexReconciler) reconcilePods(ctx context.Context, monoVtx *dfv1
ObjectMeta: metav1.ObjectMeta{
Namespace: monoVtx.Namespace,
Name: podNamePrefix + sharedutil.RandomLowerCaseString(5),
Labels: labels,
Labels: podLabels,
Annotations: annotations,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(monoVtx.GetObjectMeta(), dfv1.MonoVertexGroupVersionKind)},
},
Expand Down Expand Up @@ -392,10 +392,10 @@ func (mr *monoVertexReconciler) createOrUpdateDaemonDeployment(ctx context.Conte
return nil
}

func (r *monoVertexReconciler) findExistingPods(ctx context.Context, monoVtx *dfv1.MonoVertex) (map[string]corev1.Pod, error) {
func (mr *monoVertexReconciler) findExistingPods(ctx context.Context, monoVtx *dfv1.MonoVertex) (map[string]corev1.Pod, error) {
pods := &corev1.PodList{}
selector, _ := labels.Parse(dfv1.KeyComponent + "=" + dfv1.ComponentMonoVertex + "," + dfv1.KeyMonoVertexName + "=" + monoVtx.Name)
if err := r.client.List(ctx, pods, &client.ListOptions{Namespace: monoVtx.Namespace, LabelSelector: selector}); err != nil {
if err := mr.client.List(ctx, pods, &client.ListOptions{Namespace: monoVtx.Namespace, LabelSelector: selector}); err != nil {
return nil, fmt.Errorf("failed to list mono vertex pods: %w", err)
}
result := make(map[string]corev1.Pod)
Expand Down
20 changes: 10 additions & 10 deletions test/diamond-e2e/diamond_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *DiamondSuite) TestJoinOnReducePipeline() {

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
w := s.Given().Pipeline("@testdata/join-on-reduce-pipeline.yaml").
w := s.Given().Pipeline("@testdata/join-on-reduce.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
Expand Down Expand Up @@ -74,13 +74,13 @@ func (s *DiamondSuite) TestJoinOnReducePipeline() {
}()
// todo: this only tests for one occurrence: ideally should verify all
w.Expect().
SinkContains("sink", "40"). // per 10-second window: (10 * 2) * 2 atoi vertices
SinkContains("sink", "80") // per 10-second window: 10 * (1 + 3) * 2 atoi vertices
RedisSinkContains("join-on-reduce-sink", "40"). // per 10-second window: (10 * 2) * 2 atoi vertices
RedisSinkContains("join-on-reduce-sink", "80") // per 10-second window: 10 * (1 + 3) * 2 atoi vertices
done <- struct{}{}
}

func (s *DiamondSuite) TestJoinOnMapPipeline() {
w := s.Given().Pipeline("@testdata/join-on-map-pipeline.yaml").
w := s.Given().Pipeline("@testdata/join-on-map.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
Expand All @@ -93,8 +93,8 @@ func (s *DiamondSuite) TestJoinOnMapPipeline() {
w.SendMessageTo(pipelineName, "in-1", NewHttpPostRequest().WithBody([]byte("2")))

w.Expect().
SinkContains("sink", "1").
SinkContains("sink", "2")
RedisSinkContains("join-on-map-sink", "1").
RedisSinkContains("join-on-map-sink", "2")
}

func (s *DiamondSuite) TestJoinOnSinkVertex() {
Expand All @@ -110,8 +110,8 @@ func (s *DiamondSuite) TestJoinOnSinkVertex() {
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("888888"))).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("888889")))

w.Expect().SinkContains("out", "888888")
w.Expect().SinkContains("out", "888889")
w.Expect().RedisSinkContains("join-on-sink-out", "888888")
w.Expect().RedisSinkContains("join-on-sink-out", "888889")
}

func (s *DiamondSuite) TestCycleToSelf() {
Expand All @@ -136,7 +136,7 @@ func (s *DiamondSuite) TestCycleToSelf() {
}
}
for i := 0; i < 10; i++ {
w.Expect().SinkContains("out", msgs[i])
w.Expect().RedisSinkContains("cycle-to-self-out", msgs[i])
}

}
Expand All @@ -162,7 +162,7 @@ func (s *DiamondSuite) TestCycleBackward() {
}
}
for i := 0; i < 10; i++ {
w.Expect().SinkContains("out", msgs[i])
w.Expect().RedisSinkContains("cycle-backward-out", msgs[i])
}
}

Expand Down
4 changes: 4 additions & 0 deletions test/diamond-e2e/testdata/cycle-backward.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ spec:
# A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink
image: quay.io/numaio/numaflow-go/redis-sink:stable
imagePullPolicy: Always
env:
- name: SINK_HASH_KEY
# The key is set in the format of "pipeline_name-vertex_name"
value: "cycle-backward-out"
edges:
- from: in
to: cat
Expand Down
4 changes: 4 additions & 0 deletions test/diamond-e2e/testdata/cycle-to-self.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ spec:
# A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink
image: quay.io/numaio/numaflow-go/redis-sink:stable
imagePullPolicy: Always
env:
- name: SINK_HASH_KEY
# The key is set in the format of "pipeline_name-vertex_name"
value: "cycle-to-self-out"
edges:
- from: in
to: retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ spec:
# A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink
image: quay.io/numaio/numaflow-go/redis-sink:stable
imagePullPolicy: Always
env:
- name: SINK_HASH_KEY
# The key is set in the format of "pipeline_name-vertex_name"
value: "join-on-map-sink"
edges:
- from: in-0
to: cat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ spec:
# A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink
image: quay.io/numaio/numaflow-go/redis-sink:stable
imagePullPolicy: Always
env:
- name: SINK_HASH_KEY
# The key is set in the format of "pipeline_name-vertex_name"
value: "join-on-reduce-sink"
edges:
- from: in
to: atoi-0
Expand Down
5 changes: 4 additions & 1 deletion test/diamond-e2e/testdata/join-on-sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ spec:
# A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink
image: quay.io/numaio/numaflow-go/redis-sink:stable
imagePullPolicy: Always

env:
- name: SINK_HASH_KEY
# The key is set in the format of "pipeline_name-vertex_name"
value: "join-on-sink-out"
edges:
- from: in
to: even-or-odd
Expand Down
6 changes: 2 additions & 4 deletions test/e2e-api/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package main

import (
"context"
"fmt"
"log"
"net/http"
"net/url"
Expand Down Expand Up @@ -62,16 +61,15 @@ func (h *RedisController) GetMsgCountContains(w http.ResponseWriter, r *http.Req

redisClient := h.getRedisClient()

pipelineName := r.URL.Query().Get("pipelineName")
sinkName := r.URL.Query().Get("sinkName")
keyName := r.URL.Query().Get("keyName")
targetStr, err := url.QueryUnescape(r.URL.Query().Get("targetStr"))
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

count, err := redisClient.HGet(context.Background(), fmt.Sprintf("%s:%s", pipelineName, sinkName), targetStr).Result()
count, err := redisClient.HGet(context.Background(), keyName, targetStr).Result()

if err != nil {
log.Println(err)
Expand Down
31 changes: 15 additions & 16 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,14 @@ func (s *FunctionalSuite) TestUDFFiltering() {
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(expect3))).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(expect4)))

w.Expect().SinkContains("out", expect3)
w.Expect().SinkContains("out", expect4)
w.Expect().SinkNotContains("out", expect0)
w.Expect().SinkNotContains("out", expect1)
w.Expect().SinkNotContains("out", expect2)
w.Expect().RedisSinkContains("udf-filtering-out", expect3)
w.Expect().RedisSinkContains("udf-filtering-out", expect4)
w.Expect().RedisSinkNotContains("udf-filtering-out", expect0)
w.Expect().RedisSinkNotContains("udf-filtering-out", expect1)
w.Expect().RedisSinkNotContains("udf-filtering-out", expect2)
}

func (s *FunctionalSuite) TestConditionalForwarding() {

// FIXME: flaky when redis is used as isb
if strings.ToUpper(os.Getenv("ISBSVC")) == "REDIS" {
s.T().SkipNow()
Expand All @@ -196,17 +195,17 @@ func (s *FunctionalSuite) TestConditionalForwarding() {
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("888889"))).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("not an integer")))

w.Expect().SinkContains("even-sink", "888888")
w.Expect().SinkNotContains("even-sink", "888889")
w.Expect().SinkNotContains("even-sink", "not an integer")
w.Expect().RedisSinkContains("even-odd-even-sink", "888888")
w.Expect().RedisSinkNotContains("even-odd-even-sink", "888889")
w.Expect().RedisSinkNotContains("even-odd-even-sink", "not an integer")

w.Expect().SinkContains("odd-sink", "888889")
w.Expect().SinkNotContains("odd-sink", "888888")
w.Expect().SinkNotContains("odd-sink", "not an integer")
w.Expect().RedisSinkContains("even-odd-odd-sink", "888889")
w.Expect().RedisSinkNotContains("even-odd-odd-sink", "888888")
w.Expect().RedisSinkNotContains("even-odd-odd-sink", "not an integer")

w.Expect().SinkContains("number-sink", "888888")
w.Expect().SinkContains("number-sink", "888889")
w.Expect().SinkNotContains("number-sink", "not an integer")
w.Expect().RedisSinkContains("even-odd-number-sink", "888888")
w.Expect().RedisSinkContains("even-odd-number-sink", "888889")
w.Expect().RedisSinkNotContains("even-odd-number-sink", "not an integer")
}

func (s *FunctionalSuite) TestDropOnFull() {
Expand Down Expand Up @@ -354,7 +353,7 @@ func (s *FunctionalSuite) TestFallbackSink() {
// wait for all the pods to come up
w.Expect().VertexPodsRunning()

w.Expect().SinkContains("output", "fallback-message")
w.Expect().RedisSinkContains("simple-fallback-output", "fallback-message")
}

func TestFunctionalSuite(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions test/e2e/testdata/even-odd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,32 @@ spec:
# A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink
image: quay.io/numaio/numaflow-go/redis-sink:stable
imagePullPolicy: Always
env:
- name: SINK_HASH_KEY
# The key is set in the format of "pipeline_name-vertex_name"
value: "even-odd-even-sink"
- name: odd-sink
sink:
udsink:
container:
# A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink
image: quay.io/numaio/numaflow-go/redis-sink:stable
imagePullPolicy: Always
env:
- name: SINK_HASH_KEY
# The key is set in the format of "pipeline_name-vertex_name"
value: "even-odd-odd-sink"
- name: number-sink
sink:
udsink:
container:
# A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink
image: quay.io/numaio/numaflow-go/redis-sink:stable
imagePullPolicy: Always
env:
- name: SINK_HASH_KEY
# The key is set in the format of "pipeline_name-vertex_name"
value: "even-odd-number-sink"
edges:
- from: in
to: even-or-odd
Expand Down
4 changes: 4 additions & 0 deletions test/e2e/testdata/simple-fallback.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ spec:
# A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink
image: quay.io/numaio/numaflow-go/redis-sink:stable
imagePullPolicy: Always
env:
- name: SINK_HASH_KEY
# The key is set in the format of "pipeline_name-vertex_name"
value: "simple-fallback-output"
edges:
- from: in
to: cat
Expand Down
4 changes: 4 additions & 0 deletions test/e2e/testdata/udf-filtering.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ spec:
# A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis-sink
image: quay.io/numaio/numaflow-go/redis-sink:stable
imagePullPolicy: Always
env:
- name: SINK_HASH_KEY
# The key is set in the format of "pipeline_name-vertex_name"
value: "udf-filtering-out"
edges:
- from: in
to: p1
Expand Down
27 changes: 15 additions & 12 deletions test/fixtures/e2e_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ spec:

type E2ESuite struct {
suite.Suite
restConfig *rest.Config
isbSvcClient flowpkg.InterStepBufferServiceInterface
pipelineClient flowpkg.PipelineInterface
vertexClient flowpkg.VertexInterface
kubeClient kubernetes.Interface
stopch chan struct{}
restConfig *rest.Config
isbSvcClient flowpkg.InterStepBufferServiceInterface
pipelineClient flowpkg.PipelineInterface
vertexClient flowpkg.VertexInterface
monoVertexClient flowpkg.MonoVertexInterface
kubeClient kubernetes.Interface
stopch chan struct{}
}

func (s *E2ESuite) SetupSuite() {
Expand All @@ -98,6 +99,7 @@ func (s *E2ESuite) SetupSuite() {
s.isbSvcClient = flowversiond.NewForConfigOrDie(s.restConfig).NumaflowV1alpha1().InterStepBufferServices(Namespace)
s.pipelineClient = flowversiond.NewForConfigOrDie(s.restConfig).NumaflowV1alpha1().Pipelines(Namespace)
s.vertexClient = flowversiond.NewForConfigOrDie(s.restConfig).NumaflowV1alpha1().Vertices(Namespace)
s.monoVertexClient = flowversiond.NewForConfigOrDie(s.restConfig).NumaflowV1alpha1().MonoVertices(Namespace)

// Clean up resources if any
s.deleteResources([]schema.GroupVersionResource{
Expand Down Expand Up @@ -182,12 +184,13 @@ func (s *E2ESuite) deleteResources(resources []schema.GroupVersionResource) {

func (s *E2ESuite) Given() *Given {
return &Given{
t: s.T(),
isbSvcClient: s.isbSvcClient,
pipelineClient: s.pipelineClient,
vertexClient: s.vertexClient,
restConfig: s.restConfig,
kubeClient: s.kubeClient,
t: s.T(),
isbSvcClient: s.isbSvcClient,
pipelineClient: s.pipelineClient,
vertexClient: s.vertexClient,
monoVertexClient: s.monoVertexClient,
restConfig: s.restConfig,
kubeClient: s.kubeClient,
}
}

Expand Down
Loading

0 comments on commit 7b85e89

Please sign in to comment.