From 783228ee1463fc0f63f4b155ce548fb71c93500d Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Mon, 15 Jul 2024 11:50:05 -0700 Subject: [PATCH 1/8] impl Signed-off-by: Sidhant Kohli --- pkg/apis/numaflow/v1alpha1/const.go | 8 ---- pkg/apis/numaflow/v1alpha1/vertex_types.go | 2 - pkg/sdkclient/serverinfo/serverinfo.go | 12 ++++++ pkg/udf/map_udf.go | 48 +++++++++++----------- test/sdks-e2e/testdata/flatmap-batch.yaml | 3 -- test/sdks-e2e/testdata/flatmap-stream.yaml | 3 -- 6 files changed, 35 insertions(+), 41 deletions(-) diff --git a/pkg/apis/numaflow/v1alpha1/const.go b/pkg/apis/numaflow/v1alpha1/const.go index deffb31ca4..c105c39a9e 100644 --- a/pkg/apis/numaflow/v1alpha1/const.go +++ b/pkg/apis/numaflow/v1alpha1/const.go @@ -100,8 +100,6 @@ const ( EnvNamespace = "NUMAFLOW_NAMESPACE" EnvPipelineName = "NUMAFLOW_PIPELINE_NAME" EnvVertexName = "NUMAFLOW_VERTEX_NAME" - EnvMapStreaming = "NUMAFLOW_MAP_STREAMING" - EnvBatchMap = "NUMAFLOW_BATCH_MAP" EnvCallbackEnabled = "NUMAFLOW_CALLBACK_ENABLED" EnvCallbackURL = "NUMAFLOW_CALLBACK_URL" EnvPod = "NUMAFLOW_POD" @@ -203,12 +201,6 @@ const ( // KeysDelimitter is the delimitter used to join keys KeysDelimitter = ":" - // UDF map streaming - MapUdfStreamKey = "numaflow.numaproj.io/map-stream" - - // BatchMapUdfStreamKey is the annotation for enabling UDF Batch Map - BatchMapUdfStreamKey = "numaflow.numaproj.io/batch-map" - // Pipeline health status PipelineStatusHealthy = "healthy" PipelineStatusUnknown = "unknown" diff --git a/pkg/apis/numaflow/v1alpha1/vertex_types.go b/pkg/apis/numaflow/v1alpha1/vertex_types.go index 0d1e760b0d..ed97fa9cd7 100644 --- a/pkg/apis/numaflow/v1alpha1/vertex_types.go +++ b/pkg/apis/numaflow/v1alpha1/vertex_types.go @@ -175,10 +175,8 @@ func (v Vertex) commonEnvs() []corev1.EnvVar { {Name: EnvReplica, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + KeyReplica + "']"}}}, {Name: EnvPipelineName, Value: v.Spec.PipelineName}, {Name: EnvVertexName, Value: v.Spec.Name}, - {Name: EnvMapStreaming, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + MapUdfStreamKey + "']"}}}, {Name: EnvCallbackEnabled, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + CallbackEnabledKey + "']"}}}, {Name: EnvCallbackURL, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + CallbackURLKey + "']"}}}, - {Name: EnvBatchMap, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + BatchMapUdfStreamKey + "']"}}}, } } diff --git a/pkg/sdkclient/serverinfo/serverinfo.go b/pkg/sdkclient/serverinfo/serverinfo.go index 5f63112b28..3956942c43 100644 --- a/pkg/sdkclient/serverinfo/serverinfo.go +++ b/pkg/sdkclient/serverinfo/serverinfo.go @@ -31,6 +31,18 @@ import ( "github.com/numaproj/numaflow" ) +// MapModeMetadata field is used to indicate which map mode is enabled +// If none is set, we consider unary map as default +const MapModeMetadata = "MAP_MODE" + +type MapMode string + +const ( + UnaryMap MapMode = "unary-map" + StreamMap MapMode = "stream-map" + BatchMap MapMode = "batch-map" +) + // SDKServerInfo wait for the server to start and return the server info. func SDKServerInfo(inputOptions ...Option) (*info.ServerInfo, error) { var opts = DefaultOptions() diff --git a/pkg/udf/map_udf.go b/pkg/udf/map_udf.go index 94fcc4aa91..a563087fe0 100644 --- a/pkg/udf/map_udf.go +++ b/pkg/udf/map_udf.go @@ -132,22 +132,23 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { } opts := []forward.Option{forward.WithLogger(log)} + enableMapUdfStream := false + enableBatchMapUdf := false maxMessageSize := sharedutil.LookupEnvIntOr(dfv1.EnvGRPCMaxMessageSize, sdkclient.DefaultGRPCMaxMessageSize) - enableMapUdfStream := sharedutil.LookupEnvBoolOr(dfv1.EnvMapStreaming, false) - enableBatchMapUdf := sharedutil.LookupEnvBoolOr(dfv1.EnvBatchMap, false) - // We can have the vertex running only of the map modes - if enableMapUdfStream && enableBatchMapUdf { - return fmt.Errorf("vertex cannot have both map stream and batch map modes enabled") + // Wait for map server info to be ready, we use the same info file for all the map modes + serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.MapServerInfoFile)) + if err != nil { + return err } - if enableMapUdfStream { + // Read the server info file to read which map mode is enabled + // Based on the value set, we will create the corresponding handler and clients + mapMode, ok := serverInfo.Metadata[sdkserverinfo.MapModeMetadata] + + if ok && mapMode == string(sdkserverinfo.StreamMap) { // Map Stream mode - // Wait for server info to be ready - serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.MapStreamServerInfoFile)) - if err != nil { - return err - } + enableMapUdfStream = true mapStreamClient, err := mapstreamer.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize)) if err != nil { @@ -167,14 +168,9 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { }() opts = append(opts, forward.WithUDFStreamingMap(mapStreamHandler)) - } else if enableBatchMapUdf { + } else if ok && mapMode == string(sdkserverinfo.BatchMap) { // if Batch Map mode is enabled create the client and handler for that accordingly - - // Wait for server info to be ready - serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.BatchMapServerInfoFile)) - if err != nil { - return err - } + enableBatchMapUdf = true // create the client and handler for batch map interface batchMapClient, err := batchmapper.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize)) @@ -193,14 +189,11 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { } }() opts = append(opts, forward.WithUDFBatchMap(batchMapHandler)) - } else { - // Default is to enable unary map mode - // Wait for server info to be ready - serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.MapServerInfoFile)) - if err != nil { - return err - } + } else if !ok || mapMode == string(sdkserverinfo.UnaryMap) { + // Default is to enable unary map mode + // If the MapMode metadata is not available, we will start map by default this will ensure + // backward compatibility in case of version mismatch for map // create the client and handler for map interface mapClient, err := mapper.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize)) @@ -222,6 +215,11 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { opts = append(opts, forward.WithUDFUnaryMap(mapHandler)) } + // We can have the vertex running only of the map modes + if enableMapUdfStream && enableBatchMapUdf { + return fmt.Errorf("vertex cannot have both map stream and batch map modes enabled") + } + for index, bufferPartition := range fromBuffer { // Populate shuffle function map shuffleFuncMap := make(map[string]*shuffle.Shuffle) diff --git a/test/sdks-e2e/testdata/flatmap-batch.yaml b/test/sdks-e2e/testdata/flatmap-batch.yaml index ca3410a41a..57ba8f9afe 100644 --- a/test/sdks-e2e/testdata/flatmap-batch.yaml +++ b/test/sdks-e2e/testdata/flatmap-batch.yaml @@ -9,9 +9,6 @@ spec: http: {} - name: go-split partitions: 2 - metadata: - annotations: - numaflow.numaproj.io/batch-map: "true" scale: min: 1 udf: diff --git a/test/sdks-e2e/testdata/flatmap-stream.yaml b/test/sdks-e2e/testdata/flatmap-stream.yaml index c1ae7ae37f..f0aee9ec13 100644 --- a/test/sdks-e2e/testdata/flatmap-stream.yaml +++ b/test/sdks-e2e/testdata/flatmap-stream.yaml @@ -9,9 +9,6 @@ spec: http: {} - name: go-split partitions: 3 - metadata: - annotations: - numaflow.numaproj.io/map-stream: "true" limits: readBatchSize: 1 scale: From f089f8200df7577c3fe0133f914cb74388fd54ba Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Mon, 15 Jul 2024 14:49:32 -0700 Subject: [PATCH 2/8] impl Signed-off-by: Sidhant Kohli --- pkg/udf/map_udf.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/udf/map_udf.go b/pkg/udf/map_udf.go index a563087fe0..b2a32b577e 100644 --- a/pkg/udf/map_udf.go +++ b/pkg/udf/map_udf.go @@ -147,6 +147,7 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { mapMode, ok := serverInfo.Metadata[sdkserverinfo.MapModeMetadata] if ok && mapMode == string(sdkserverinfo.StreamMap) { + log.Info("Map mode enabled: Stream Map") // Map Stream mode enableMapUdfStream = true @@ -169,6 +170,7 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { opts = append(opts, forward.WithUDFStreamingMap(mapStreamHandler)) } else if ok && mapMode == string(sdkserverinfo.BatchMap) { + log.Info("Map mode enabled: Batch Map") // if Batch Map mode is enabled create the client and handler for that accordingly enableBatchMapUdf = true @@ -190,7 +192,8 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { }() opts = append(opts, forward.WithUDFBatchMap(batchMapHandler)) - } else if !ok || mapMode == string(sdkserverinfo.UnaryMap) { + } else { + log.Info("Map mode enabled: Unary Map") // Default is to enable unary map mode // If the MapMode metadata is not available, we will start map by default this will ensure // backward compatibility in case of version mismatch for map From 70cad84d2825d8d798e93b4dc90dca9a9a5253e1 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Mon, 15 Jul 2024 23:18:04 -0700 Subject: [PATCH 3/8] impl Signed-off-by: Sidhant Kohli --- pkg/udf/map_udf.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/udf/map_udf.go b/pkg/udf/map_udf.go index b2a32b577e..8e7902366e 100644 --- a/pkg/udf/map_udf.go +++ b/pkg/udf/map_udf.go @@ -146,7 +146,7 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { // Based on the value set, we will create the corresponding handler and clients mapMode, ok := serverInfo.Metadata[sdkserverinfo.MapModeMetadata] - if ok && mapMode == string(sdkserverinfo.StreamMap) { + if ok && (sdkserverinfo.MapMode(mapMode) == sdkserverinfo.StreamMap) { log.Info("Map mode enabled: Stream Map") // Map Stream mode enableMapUdfStream = true @@ -169,7 +169,7 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { }() opts = append(opts, forward.WithUDFStreamingMap(mapStreamHandler)) - } else if ok && mapMode == string(sdkserverinfo.BatchMap) { + } else if ok && (sdkserverinfo.MapMode(mapMode) == sdkserverinfo.BatchMap) { log.Info("Map mode enabled: Batch Map") // if Batch Map mode is enabled create the client and handler for that accordingly enableBatchMapUdf = true From 07d7cd005bbbb3b980fb022ff327e6ccdabbd214 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Tue, 16 Jul 2024 11:22:46 -0700 Subject: [PATCH 4/8] refactor Signed-off-by: Sidhant Kohli --- pkg/sdkclient/grpc/grpc_utils.go | 8 ++++++-- pkg/sdkclient/grpc_resolver/client_resolver.go | 5 +---- pkg/sdkclient/serverinfo/serverinfo.go | 12 +++++++++--- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/pkg/sdkclient/grpc/grpc_utils.go b/pkg/sdkclient/grpc/grpc_utils.go index 8cc1405649..6d3574a290 100644 --- a/pkg/sdkclient/grpc/grpc_utils.go +++ b/pkg/sdkclient/grpc/grpc_utils.go @@ -19,12 +19,14 @@ package grpc import ( "fmt" "log" + "strconv" "github.com/numaproj/numaflow-go/pkg/info" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" resolver "github.com/numaproj/numaflow/pkg/sdkclient/grpc_resolver" + sdkserverinfo "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" ) // ConnectToServer connects to the server with the given socket address based on the server info protocol. @@ -34,14 +36,16 @@ func ConnectToServer(udsSockAddr string, serverInfo *info.ServerInfo, maxMessage var sockAddr string // Check if Multiproc server mode is enabled - if _, ok := serverInfo.Metadata["MULTIPROC"]; ok { + if multiProcServer, ok := serverInfo.Metadata[sdkserverinfo.MultiProcMetadata]; ok { + // Extract the server ports from the server info file + numServers, _ := strconv.Atoi(multiProcServer) // In Multiprocessing server mode we have multiple servers forks // and each server will listen on a different port. // On the client side we will create a connection to each of these server instances. // The client will use a custom resolver to resolve the server address. // The custom resolver will return the list of server addresses from the server info file. // The client will use the list of server addresses to create the multiple connections. - if err := resolver.RegMultiProcResolver(serverInfo); err != nil { + if err := resolver.RegMultiProcResolver(numServers); err != nil { return nil, fmt.Errorf("failed to start Multiproc Client: %w", err) } diff --git a/pkg/sdkclient/grpc_resolver/client_resolver.go b/pkg/sdkclient/grpc_resolver/client_resolver.go index 79f75a7293..9d3a4c8e9b 100644 --- a/pkg/sdkclient/grpc_resolver/client_resolver.go +++ b/pkg/sdkclient/grpc_resolver/client_resolver.go @@ -21,7 +21,6 @@ import ( "strconv" "strings" - "github.com/numaproj/numaflow-go/pkg/info" "google.golang.org/grpc/resolver" "github.com/numaproj/numaflow/pkg/sdkclient" @@ -99,9 +98,7 @@ func buildConnAddrs(numServers int) []string { // RegMultiProcResolver is used to populate the connection properties based // on multiprocessing TCP or UDS connection -func RegMultiProcResolver(svrInfo *info.ServerInfo) error { - // Extract the server ports from the server info file and convert it to a list - numServers, _ := strconv.Atoi(svrInfo.Metadata["MULTIPROC"]) +func RegMultiProcResolver(numServers int) error { log.Println("Multiprocessing Servers :", numServers) conn := buildConnAddrs(numServers) res := newMultiProcResolverBuilder(conn) diff --git a/pkg/sdkclient/serverinfo/serverinfo.go b/pkg/sdkclient/serverinfo/serverinfo.go index 3956942c43..aa1cdde29d 100644 --- a/pkg/sdkclient/serverinfo/serverinfo.go +++ b/pkg/sdkclient/serverinfo/serverinfo.go @@ -31,9 +31,15 @@ import ( "github.com/numaproj/numaflow" ) -// MapModeMetadata field is used to indicate which map mode is enabled -// If none is set, we consider unary map as default -const MapModeMetadata = "MAP_MODE" +// Metadata keys used in the server info file +const ( + // MultiProcMetadata is the field used to indicate that MultiProc map mode is enabled + // The value contains the number of servers spawned. + MultiProcMetadata = "MULTIPROC" + // MapModeMetadata field is used to indicate which map mode is enabled + // If none is set, we consider unary map as default + MapModeMetadata = "MAP_MODE" +) type MapMode string From 4f2623b5e9d0c2961bf1f69de75b523d520bfc04 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Tue, 16 Jul 2024 13:01:59 -0700 Subject: [PATCH 5/8] refactor Signed-off-by: Sidhant Kohli --- docs/user-guide/user-defined-functions/map/map.md | 6 +----- test/sdks-e2e/testdata/flatmap-stream.yaml | 6 ------ 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/docs/user-guide/user-defined-functions/map/map.md b/docs/user-guide/user-defined-functions/map/map.md index 2d82542d27..2cc5267b61 100644 --- a/docs/user-guide/user-defined-functions/map/map.md +++ b/docs/user-guide/user-defined-functions/map/map.md @@ -32,8 +32,7 @@ spec: In cases the map function generates more than one output (e.g., flat map), the UDF can be configured to run in a streaming mode instead of batching, which is the default mode. In streaming mode, the messages will be pushed to the downstream vertices once generated -instead of in a batch at the end. The streaming mode can be enabled by setting the annotation -`numaflow.numaproj.io/map-stream` to `true` in the vertex spec. +instead of in a batch at the end. Note that to maintain data orderliness, we restrict the read batch size to be `1`. @@ -41,9 +40,6 @@ Note that to maintain data orderliness, we restrict the read batch size to be `1 spec: vertices: - name: my-vertex - metadata: - annotations: - numaflow.numaproj.io/map-stream: "true" limits: # mapstreaming won't work if readBatchSize is != 1 readBatchSize: 1 diff --git a/test/sdks-e2e/testdata/flatmap-stream.yaml b/test/sdks-e2e/testdata/flatmap-stream.yaml index f0aee9ec13..503ffcaf3a 100644 --- a/test/sdks-e2e/testdata/flatmap-stream.yaml +++ b/test/sdks-e2e/testdata/flatmap-stream.yaml @@ -38,9 +38,6 @@ spec: imagePullPolicy: Always - name: python-split partitions: 3 - metadata: - annotations: - numaflow.numaproj.io/map-stream: "true" limits: readBatchSize: 1 scale: @@ -67,9 +64,6 @@ spec: imagePullPolicy: Always - name: java-split partitions: 3 - metadata: - annotations: - numaflow.numaproj.io/map-stream: "true" limits: readBatchSize: 1 scale: From e50ef244023ef9e1d8dff50e5396cea4586e90e6 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Wed, 17 Jul 2024 18:19:15 -0700 Subject: [PATCH 6/8] disable java test Signed-off-by: Sidhant Kohli --- test/sdks-e2e/sdks_test.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/test/sdks-e2e/sdks_test.go b/test/sdks-e2e/sdks_test.go index ea8acdfe4e..6ec127b7a9 100644 --- a/test/sdks-e2e/sdks_test.go +++ b/test/sdks-e2e/sdks_test.go @@ -50,7 +50,9 @@ func (s *SDKsSuite) TestUDFunctionAndSink() { VertexPodLogContains("go-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")). VertexPodLogContains("go-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")). VertexPodLogContains("python-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")). - VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")) + VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")). + VertexPodLogContains("java-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")). + VertexPodLogContains("java-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")) w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello,hello"))). SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello"))) @@ -76,6 +78,8 @@ func (s *SDKsSuite) TestMapStreamUDFunctionAndSink() { VertexPodLogContains("go-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")). VertexPodLogContains("python-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")). VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")) + //VertexPodLogContains("java-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")). + // VertexPodLogContains("java-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")) w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello,hello,hello"))). SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello"))) @@ -86,8 +90,10 @@ func (s *SDKsSuite) TestMapStreamUDFunctionAndSink() { VertexPodLogContains("go-udsink-2", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(4)) w.Expect(). VertexPodLogContains("python-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(4)) - w.Expect(). - VertexPodLogContains("java-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(4)) + + // FIXME(map-batch): enable Java + //w.Expect(). + // VertexPodLogContains("java-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(4)) } func (s *SDKsSuite) TestBatchMapUDFunctionAndSink() { @@ -102,14 +108,18 @@ func (s *SDKsSuite) TestBatchMapUDFunctionAndSink() { VertexPodLogContains("in", LogSourceVertexStarted). VertexPodLogContains("go-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")). VertexPodLogContains("go-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")) + // FIXME(map-batch): enable python and Java //VertexPodLogContains("python-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")). - //VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")) + //VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")). + //VertexPodLogContains("java-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")). + // VertexPodLogContains("java-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")) w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello,hello"))). SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello"))) w.Expect(). VertexPodLogContains("go-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3)) + // FIXME(map-batch): enable python and Java //VertexPodLogContains("java-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3)). //VertexPodLogContains("python-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3)) } From d35a3f7aa5df9f82b3733eec98973c03cc6a277f Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Wed, 17 Jul 2024 18:25:36 -0700 Subject: [PATCH 7/8] disable java test Signed-off-by: Sidhant Kohli --- test/sdks-e2e/sdks_test.go | 14 +++--- test/sdks-e2e/testdata/flatmap-batch.yaml | 57 +++++++++++------------ 2 files changed, 35 insertions(+), 36 deletions(-) diff --git a/test/sdks-e2e/sdks_test.go b/test/sdks-e2e/sdks_test.go index 6ec127b7a9..8e47db6b4d 100644 --- a/test/sdks-e2e/sdks_test.go +++ b/test/sdks-e2e/sdks_test.go @@ -107,10 +107,10 @@ func (s *SDKsSuite) TestBatchMapUDFunctionAndSink() { VertexPodsRunning(). VertexPodLogContains("in", LogSourceVertexStarted). VertexPodLogContains("go-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")). - VertexPodLogContains("go-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")) - // FIXME(map-batch): enable python and Java - //VertexPodLogContains("python-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")). - //VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")). + VertexPodLogContains("go-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")). + VertexPodLogContains("python-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")). + VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")) + // FIXME(map-batch): enable Java //VertexPodLogContains("java-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")). // VertexPodLogContains("java-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")) @@ -118,10 +118,10 @@ func (s *SDKsSuite) TestBatchMapUDFunctionAndSink() { SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello"))) w.Expect(). - VertexPodLogContains("go-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3)) - // FIXME(map-batch): enable python and Java + VertexPodLogContains("go-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3)). + VertexPodLogContains("python-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3)) + // FIXME(map-batch): enable Java //VertexPodLogContains("java-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3)). - //VertexPodLogContains("python-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3)) } func (s *SDKsSuite) TestReduceSDK() { diff --git a/test/sdks-e2e/testdata/flatmap-batch.yaml b/test/sdks-e2e/testdata/flatmap-batch.yaml index 57ba8f9afe..f6e3f88482 100644 --- a/test/sdks-e2e/testdata/flatmap-batch.yaml +++ b/test/sdks-e2e/testdata/flatmap-batch.yaml @@ -25,31 +25,30 @@ spec: # https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/log image: quay.io/numaio/numaflow-go/sink-log:stable imagePullPolicy: Always -# -## FIXME(map-batch): enable python and Java -# - name: python-split -# scale: -# min: 1 -# udf: -# container: -# args: -# - python -# - example.py -# # Split input message into an array with comma, https://github.com/numaproj/numaflow-python/tree/main/examples/map/flatmap -# image: quay.io/numaio/numaflow-python/map-flatmap:stable -# imagePullPolicy: Always -# - name: python-udsink -# scale: -# min: 1 -# sink: -# udsink: -# container: -# args: -# - python -# - example.py -# # https://github.com/numaproj/numaflow-python/tree/main/examples/sink/log -# image: quay.io/numaio/numaflow-python/sink-log:stable -# imagePullPolicy: Always + - name: python-split + scale: + min: 1 + udf: + container: + args: + - python + - example.py + # Split input message into an array with comma, https://github.com/numaproj/numaflow-python/tree/main/examples/batchmap/flatmap + image: quay.io/numaio/numaflow-python/batch-map-flatmap:stable + imagePullPolicy: Always + - name: python-udsink + scale: + min: 1 + sink: + udsink: + container: + args: + - python + - example.py + # https://github.com/numaproj/numaflow-python/tree/main/examples/sink/log + image: quay.io/numaio/numaflow-python/sink-log:stable + imagePullPolicy: Always +## FIXME(map-batch): enable Java # - name: java-split # scale: # min: 1 @@ -72,10 +71,10 @@ spec: to: go-split - from: go-split to: go-udsink -# - from: in -# to: python-split -# - from: python-split -# to: python-udsink + - from: in + to: python-split + - from: python-split + to: python-udsink # - from: in # to: java-split # - from: java-split From cd98820934721e02377ae9421bb33bca2a6eaa32 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Thu, 18 Jul 2024 14:33:00 -0700 Subject: [PATCH 8/8] disable java test Signed-off-by: Sidhant Kohli --- test/sdks-e2e/testdata/flatmap-stream.yaml | 49 +++++++++++----------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/test/sdks-e2e/testdata/flatmap-stream.yaml b/test/sdks-e2e/testdata/flatmap-stream.yaml index 503ffcaf3a..c8ba177696 100644 --- a/test/sdks-e2e/testdata/flatmap-stream.yaml +++ b/test/sdks-e2e/testdata/flatmap-stream.yaml @@ -62,26 +62,27 @@ spec: # https://github.com/numaproj/numaflow-python/tree/main/examples/sink/log image: quay.io/numaio/numaflow-python/sink-log:stable imagePullPolicy: Always - - name: java-split - partitions: 3 - limits: - readBatchSize: 1 - scale: - min: 1 - udf: - container: - # Split input message into an array with comma, see https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/mapstream/flatmapstream - image: quay.io/numaio/numaflow-java/flat-map-stream:stable - imagePullPolicy: Always - - name: java-udsink - scale: - min: 1 - sink: - udsink: - container: - # https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple - image: quay.io/numaio/numaflow-java/simple-sink:stable - imagePullPolicy: Always +## FIXME(map-batch): enable Java +# - name: java-split +# partitions: 3 +# limits: +# readBatchSize: 1 +# scale: +# min: 1 +# udf: +# container: +# # Split input message into an array with comma, see https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/mapstream/flatmapstream +# image: quay.io/numaio/numaflow-java/flat-map-stream:stable +# imagePullPolicy: Always +# - name: java-udsink +# scale: +# min: 1 +# sink: +# udsink: +# container: +# # https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/sink/simple +# image: quay.io/numaio/numaflow-java/simple-sink:stable +# imagePullPolicy: Always edges: - from: in to: go-split @@ -93,7 +94,7 @@ spec: to: python-split - from: python-split to: python-udsink - - from: in - to: java-split - - from: java-split - to: java-udsink +# - from: in +# to: java-split +# - from: java-split +# to: java-udsink