Skip to content

Commit

Permalink
added api to delete mono-veretx and segregated pipeline/monovertex me…
Browse files Browse the repository at this point in the history
…tric tests

Signed-off-by: adarsh0728 <[email protected]>
  • Loading branch information
adarsh0728 committed Dec 13, 2024
1 parent a77881b commit fe4a157
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 99 deletions.
21 changes: 21 additions & 0 deletions server/apis/v1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,27 @@ func (h *handler) GetMonoVertex(c *gin.Context) {
c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, monoVertexResp))
}

// DeleteMonoVertex is used to delete a mono vertex
func (h *handler) DeleteMonoVertex(c *gin.Context) {
ns, monoVertex := c.Param("namespace"), c.Param("mono-vertex")

// Check if the mono vertex exists
_, err := h.numaflowClient.MonoVertices(ns).Get(c, monoVertex, metav1.GetOptions{})
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to fetch mono vertex %q in namespace %q, %s", monoVertex, ns, err.Error()))
return
}

// Delete the mono vertex
err = h.numaflowClient.MonoVertices(ns).Delete(c, monoVertex, metav1.DeleteOptions{})
if err != nil {
h.respondWithError(c, fmt.Sprintf("Failed to delete mono vertex %q in namespace %q, %s", monoVertex, ns, err.Error()))
return
}

c.JSON(http.StatusOK, NewNumaflowAPIResponse(nil, nil))
}

// CreateMonoVertex is used to create a mono vertex
func (h *handler) CreateMonoVertex(c *gin.Context) {
if h.opts.readonly {
Expand Down
2 changes: 2 additions & 0 deletions server/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ func v1Routes(ctx context.Context, r gin.IRouter, dexObj *v1.DexObject, localUse
r.GET("/namespaces/:namespace/mono-vertices", handler.ListMonoVertices)
// Get the mono vertex information.
r.GET("/namespaces/:namespace/mono-vertices/:mono-vertex", handler.GetMonoVertex)
// Delete a mono-vertex.
r.DELETE("/namespaces/:namespace/mono-vertices/:mono-vertex", handler.DeleteMonoVertex)
// Get all the pods of a mono vertex.
r.GET("/namespaces/:namespace/mono-vertices/:mono-vertex/pods", handler.ListMonoVertexPods)
// Create a mono vertex.
Expand Down
139 changes: 42 additions & 97 deletions test/api-e2e/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type APISuite struct {
E2ESuite
}

func TestAPISuite(t *testing.T) {
suite.Run(t, new(APISuite))
}

func (s *APISuite) TestGetSysInfo() {
defer s.Given().When().UXServerPodPortForward(8043, 8443).TerminateAllPodPortForwards()

Expand Down Expand Up @@ -209,9 +213,17 @@ func (s *APISuite) TestAPIsForIsbAndPipelineAndMonoVertex() {
Expect().
Status(200).Body().Raw()
assert.Contains(s.T(), listMonoVertexBody, testMonoVertex1Name)

// deletes a mono-vertex
deleteMonoVertex := HTTPExpect(s.T(), "https://localhost:8145").DELETE(fmt.Sprintf("/api/v1/namespaces/%s/mono-vertices/%s", Namespace, testMonoVertex1Name)).
Expect().
Status(200).Body().Raw()
var deleteMonoVertexSuccessExpect = `"data":null`
assert.Contains(s.T(), deleteMonoVertex, deleteMonoVertexSuccessExpect)

}

func (s *APISuite) TestAPIsForMetricsAndWatermarkAndPods() {
func (s *APISuite) TestAPIsForMetricsAndWatermarkAndPodsForPipeline() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

Expand Down Expand Up @@ -275,73 +287,18 @@ func (s *APISuite) TestAPIsForMetricsAndWatermarkAndPods() {
Expect().
Status(200).Body().Raw()
assert.Contains(s.T(), getVerticesPodsBody, `simple-pipeline-input-0`)
}

func TestAPISuite(t *testing.T) {
suite.Run(t, new(APISuite))
}

func (s *APISuite) TestDiscoverMetricsForPipeline() {
defer s.Given().When().UXServerPodPortForward(8147, 8443).TerminateAllPodPortForwards()
namespaceBody := HTTPExpect(s.T(), "https://localhost:8147").GET("/api/v1/namespaces").
Expect().
Status(200).Body().Raw()
var namespaceExpect = `numaflow-system`
assert.Contains(s.T(), namespaceBody, namespaceExpect)

var pl1 v1alpha1.Pipeline
err := json.Unmarshal(testPipeline1, &pl1)
assert.NoError(s.T(), err)
createPipeline1 := HTTPExpect(s.T(), "https://localhost:8147").POST(fmt.Sprintf("/api/v1/namespaces/%s/pipelines", Namespace)).WithJSON(pl1).
Expect().
Status(200).Body().Raw()

var createPipelineSuccessExpect = `"data":null`
assert.Contains(s.T(), createPipeline1, createPipelineSuccessExpect)

// Call the DiscoverMetrics API for the vertex object
discoverMetricsBodyForVertex := HTTPExpect(s.T(), "https://localhost:8147").GET("/api/v1/metrics-discovery/object/vertex").
discoverMetricsBodyForVertex := HTTPExpect(s.T(), "https://localhost:8146").GET("/api/v1/metrics-discovery/object/vertex").
Expect().
Status(200).Body().Raw()

// Check that the response contains expected metrics for vertex object
assert.Contains(s.T(), discoverMetricsBodyForVertex, "forwarder_data_read_total")

// Call the DiscoverMetrics API for mono-vertex (ideally following url will be hit for a mono-vertex but should return response for a running pipeline as well)
discoverMetricsBodyForMonoVertex := HTTPExpect(s.T(), "https://localhost:8147").GET("/api/v1/metrics-discovery/object/mono-vertex").
Expect().
Status(200).Body().Raw()

// Check that the response contains expected metrics for mono-vertex
assert.Contains(s.T(), discoverMetricsBodyForMonoVertex, "monovtx_processing_time_bucket")
assert.Contains(s.T(), discoverMetricsBodyForMonoVertex, "monovtx_sink_time_bucket")
assert.Contains(s.T(), discoverMetricsBodyForMonoVertex, "monovtx_read_total")
assert.Contains(s.T(), discoverMetricsBodyForMonoVertex, "monovtx_pending")
}

func (s *APISuite) TestGetVertexPodsInfo() {

defer s.Given().When().UXServerPodPortForward(8148, 8443).TerminateAllPodPortForwards()

namespaceBody := HTTPExpect(s.T(), "https://localhost:8148").GET("/api/v1/namespaces").
Expect().
Status(200).Body().Raw()
var namespaceExpect = `numaflow-system`
assert.Contains(s.T(), namespaceBody, namespaceExpect)

var pl1 v1alpha1.Pipeline
err := json.Unmarshal(testPipeline1, &pl1)
assert.NoError(s.T(), err)
createPipeline1 := HTTPExpect(s.T(), "https://localhost:8148").POST(fmt.Sprintf("/api/v1/namespaces/%s/pipelines", Namespace)).WithJSON(pl1).
Expect().
Status(200).Body().Raw()

var createPipelineSuccessExpect = `"data":null`
assert.Contains(s.T(), createPipeline1, createPipelineSuccessExpect)

// Call the API to get vertex pods info
getVertexPodsInfoBody := HTTPExpect(s.T(), "https://localhost:8148").
GET(fmt.Sprintf("/api/v1/namespaces/%s/pipelines/%s/vertices/%s/pods-info", Namespace, pl1.Name, pl1.Spec.Vertices[0].Name)).
// Call the API to get input vertex pods info
getVertexPodsInfoBody := HTTPExpect(s.T(), "https://localhost:8146").
GET(fmt.Sprintf("/api/v1/namespaces/%s/pipelines/%s/vertices/%s/pods-info", Namespace, pipelineName, "input")).
Expect().
Status(200).Body().Raw()

Expand All @@ -353,50 +310,27 @@ func (s *APISuite) TestGetVertexPodsInfo() {
assert.Contains(s.T(), getVertexPodsInfoBody, `"containerDetailsMap":`) // Check for pod's containers
}

func (s *APISuite) TestGetMonoVertexPodsInfo() {
func (s *APISuite) TestMetricsAPIsForMonoVertex() {
_, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

defer s.Given().When().UXServerPodPortForward(8149, 8443).TerminateAllPodPortForwards()
w := s.Given().MonoVertex("@testdata/mono-vertex.yaml").
When().
CreateMonoVertexAndWait()
defer w.DeleteMonoVertexAndWait()

namespaceBody := HTTPExpect(s.T(), "https://localhost:8149").GET("/api/v1/namespaces").
Expect().
Status(200).Body().Raw()
var namespaceExpect = `numaflow-system`
assert.Contains(s.T(), namespaceBody, namespaceExpect)
monoVertexName := "mono-vertex"

// Create a mono vertex
var mv1 v1alpha1.MonoVertex
err := json.Unmarshal(testMonoVertex1, &mv1)
assert.NoError(s.T(), err)
createMonoVertex := HTTPExpect(s.T(), "https://localhost:8149").POST(fmt.Sprintf("/api/v1/namespaces/%s/mono-vertices", Namespace)).WithJSON(mv1).
Expect().
Status(200).Body().Raw()
var createMonoVertexSuccessExpect = `"data":null`
assert.Contains(s.T(), createMonoVertex, createMonoVertexSuccessExpect)
defer w.UXServerPodPortForward(8149, 8443).TerminateAllPodPortForwards()

// Wait for the mono vertex to be healthy
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
getMonoVertexBody := HTTPExpect(s.T(), "https://localhost:8149").GET(fmt.Sprintf("/api/v1/namespaces/%s/mono-vertices/%s", Namespace, mv1.Name)).
Expect().
Status(200).Body().Raw()
for !strings.Contains(getMonoVertexBody, `"status":"healthy"`) {
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
s.T().Fatalf("failed to get namespaces/mono-vertices: %v", ctx.Err())
}
default:
time.Sleep(100 * time.Millisecond)
getMonoVertexBody = HTTPExpect(s.T(), "https://localhost:8149").GET(fmt.Sprintf("/api/v1/namespaces/%s/mono-vertices/%s", Namespace, mv1.Name)).
Expect().
Status(200).Body().Raw()
}
}
assert.Contains(s.T(), getMonoVertexBody, fmt.Sprintf(`"name":"%s"`, mv1.Name))
w.Expect().MonoVertexPodsRunning()
// Expect the messages to reach the sink.
w.Expect().RedisSinkContains("mono-vertex", "199")
w.Expect().RedisSinkContains("mono-vertex", "200")

// Call the API to get mono vertex pods info
getMonoVertexPodsInfoBody := HTTPExpect(s.T(), "https://localhost:8149").
GET(fmt.Sprintf("/api/v1/namespaces/%s/mono-vertices/%s/pods-info", Namespace, mv1.Name)).
GET(fmt.Sprintf("/api/v1/namespaces/%s/mono-vertices/%s/pods-info", Namespace, monoVertexName)).
Expect().
Status(200).Body().Raw()

Expand All @@ -406,4 +340,15 @@ func (s *APISuite) TestGetMonoVertexPodsInfo() {
assert.Contains(s.T(), getMonoVertexPodsInfoBody, `"totalCPU":`) // Check for pod's cpu usage
assert.Contains(s.T(), getMonoVertexPodsInfoBody, `"totalMemory":`) // Check for pod's memory usage
assert.Contains(s.T(), getMonoVertexPodsInfoBody, `"containerDetailsMap":`) // Check for pod's containers

// Call the DiscoverMetrics API for mono-vertex
discoverMetricsBodyForMonoVertex := HTTPExpect(s.T(), "https://localhost:8149").GET("/api/v1/metrics-discovery/object/mono-vertex").
Expect().
Status(200).Body().Raw()

// Check that the response contains expected metrics for mono-vertex
assert.Contains(s.T(), discoverMetricsBodyForMonoVertex, "monovtx_processing_time_bucket")
assert.Contains(s.T(), discoverMetricsBodyForMonoVertex, "monovtx_sink_time_bucket")
assert.Contains(s.T(), discoverMetricsBodyForMonoVertex, "monovtx_read_total")
assert.Contains(s.T(), discoverMetricsBodyForMonoVertex, "monovtx_pending")
}
4 changes: 2 additions & 2 deletions test/api-e2e/testdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ var (
"source": {
"udsource": {
"container": {
"image": "quay.io/numaio/numaflow-java/source-simple-source:stable"
"image": "quay.io/numaio/numaflow-rs/simple-source:stable"
}
},
"transformer": {
Expand All @@ -180,7 +180,7 @@ var (
"sink": {
"udsink": {
"container": {
"image": "quay.io/numaio/numaflow-java/simple-sink:stable"
"image": "quay.io/numaio/numaflow-rs/sink-log:stable"
}
}
}
Expand Down
22 changes: 22 additions & 0 deletions test/api-e2e/testdata/mono-vertex.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: MonoVertex
metadata:
name: mono-vertex
spec:
scale:
min: 1
source:
udsource:
container:
image: quay.io/numaio/numaflow-go/source-simple-source:stable
imagePullPolicy: Always
sink:
udsink:
container:
# A redis sink for e2e testing, see https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/redis_sink
image: quay.io/numaio/numaflow-go/redis-sink:stable
imagePullPolicy: Always
env:
- name: SINK_HASH_KEY
# Use the name of the mono vertex as the key
value: "mono-vertex"

0 comments on commit fe4a157

Please sign in to comment.