Skip to content

Commit

Permalink
feat: use same server-info file for all map modes (#1828)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Jul 18, 2024
1 parent e042287 commit 1db0d09
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 118 deletions.
6 changes: 1 addition & 5 deletions docs/user-guide/user-defined-functions/map/map.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,14 @@ spec:
In cases the map function generates more than one output (e.g., flat map), the UDF can be
configured to run in a streaming mode instead of batching, which is the default mode.
In streaming mode, the messages will be pushed to the downstream vertices once generated
instead of in a batch at the end. The streaming mode can be enabled by setting the annotation
`numaflow.numaproj.io/map-stream` to `true` in the vertex spec.
instead of in a batch at the end.
Note that to maintain data orderliness, we restrict the read batch size to be `1`.

```yaml
spec:
vertices:
- name: my-vertex
metadata:
annotations:
numaflow.numaproj.io/map-stream: "true"
limits:
# mapstreaming won't work if readBatchSize is != 1
readBatchSize: 1
Expand Down
8 changes: 0 additions & 8 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ const (
EnvNamespace = "NUMAFLOW_NAMESPACE"
EnvPipelineName = "NUMAFLOW_PIPELINE_NAME"
EnvVertexName = "NUMAFLOW_VERTEX_NAME"
EnvMapStreaming = "NUMAFLOW_MAP_STREAMING"
EnvBatchMap = "NUMAFLOW_BATCH_MAP"
EnvCallbackEnabled = "NUMAFLOW_CALLBACK_ENABLED"
EnvCallbackURL = "NUMAFLOW_CALLBACK_URL"
EnvPod = "NUMAFLOW_POD"
Expand Down Expand Up @@ -211,12 +209,6 @@ const (
// KeysDelimitter is the delimitter used to join keys
KeysDelimitter = ":"

// UDF map streaming
MapUdfStreamKey = "numaflow.numaproj.io/map-stream"

// BatchMapUdfStreamKey is the annotation for enabling UDF Batch Map
BatchMapUdfStreamKey = "numaflow.numaproj.io/batch-map"

// Pipeline health status
PipelineStatusHealthy = "healthy"
PipelineStatusUnknown = "unknown"
Expand Down
2 changes: 0 additions & 2 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,8 @@ func (v Vertex) commonEnvs() []corev1.EnvVar {
{Name: EnvReplica, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + KeyReplica + "']"}}},
{Name: EnvPipelineName, Value: v.Spec.PipelineName},
{Name: EnvVertexName, Value: v.Spec.Name},
{Name: EnvMapStreaming, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + MapUdfStreamKey + "']"}}},
{Name: EnvCallbackEnabled, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + CallbackEnabledKey + "']"}}},
{Name: EnvCallbackURL, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + CallbackURLKey + "']"}}},
{Name: EnvBatchMap, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + BatchMapUdfStreamKey + "']"}}},
}
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/sdkclient/grpc/grpc_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package grpc
import (
"fmt"
"log"
"strconv"

"github.com/numaproj/numaflow-go/pkg/info"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

resolver "github.com/numaproj/numaflow/pkg/sdkclient/grpc_resolver"
sdkserverinfo "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
)

// ConnectToServer connects to the server with the given socket address based on the server info protocol.
Expand All @@ -34,14 +36,16 @@ func ConnectToServer(udsSockAddr string, serverInfo *info.ServerInfo, maxMessage
var sockAddr string

// Check if Multiproc server mode is enabled
if _, ok := serverInfo.Metadata["MULTIPROC"]; ok {
if multiProcServer, ok := serverInfo.Metadata[sdkserverinfo.MultiProcMetadata]; ok {
// Extract the server ports from the server info file
numServers, _ := strconv.Atoi(multiProcServer)
// In Multiprocessing server mode we have multiple servers forks
// and each server will listen on a different port.
// On the client side we will create a connection to each of these server instances.
// The client will use a custom resolver to resolve the server address.
// The custom resolver will return the list of server addresses from the server info file.
// The client will use the list of server addresses to create the multiple connections.
if err := resolver.RegMultiProcResolver(serverInfo); err != nil {
if err := resolver.RegMultiProcResolver(numServers); err != nil {
return nil, fmt.Errorf("failed to start Multiproc Client: %w", err)
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/sdkclient/grpc_resolver/client_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"strconv"
"strings"

"github.com/numaproj/numaflow-go/pkg/info"
"google.golang.org/grpc/resolver"

"github.com/numaproj/numaflow/pkg/sdkclient"
Expand Down Expand Up @@ -99,9 +98,7 @@ func buildConnAddrs(numServers int) []string {

// RegMultiProcResolver is used to populate the connection properties based
// on multiprocessing TCP or UDS connection
func RegMultiProcResolver(svrInfo *info.ServerInfo) error {
// Extract the server ports from the server info file and convert it to a list
numServers, _ := strconv.Atoi(svrInfo.Metadata["MULTIPROC"])
func RegMultiProcResolver(numServers int) error {
log.Println("Multiprocessing Servers :", numServers)
conn := buildConnAddrs(numServers)
res := newMultiProcResolverBuilder(conn)
Expand Down
18 changes: 18 additions & 0 deletions pkg/sdkclient/serverinfo/serverinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,24 @@ import (
"github.com/numaproj/numaflow"
)

// Metadata keys used in the server info file
const (
// MultiProcMetadata is the field used to indicate that MultiProc map mode is enabled
// The value contains the number of servers spawned.
MultiProcMetadata = "MULTIPROC"
// MapModeMetadata field is used to indicate which map mode is enabled
// If none is set, we consider unary map as default
MapModeMetadata = "MAP_MODE"
)

type MapMode string

const (
UnaryMap MapMode = "unary-map"
StreamMap MapMode = "stream-map"
BatchMap MapMode = "batch-map"
)

// SDKServerInfo wait for the server to start and return the server info.
func SDKServerInfo(inputOptions ...Option) (*info.ServerInfo, error) {
var opts = DefaultOptions()
Expand Down
49 changes: 25 additions & 24 deletions pkg/udf/map_udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,24 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error {
}

opts := []forward.Option{forward.WithLogger(log)}
enableMapUdfStream := false
enableBatchMapUdf := false
maxMessageSize := sharedutil.LookupEnvIntOr(dfv1.EnvGRPCMaxMessageSize, sdkclient.DefaultGRPCMaxMessageSize)
enableMapUdfStream := sharedutil.LookupEnvBoolOr(dfv1.EnvMapStreaming, false)
enableBatchMapUdf := sharedutil.LookupEnvBoolOr(dfv1.EnvBatchMap, false)

// We can have the vertex running only of the map modes
if enableMapUdfStream && enableBatchMapUdf {
return fmt.Errorf("vertex cannot have both map stream and batch map modes enabled")
// Wait for map server info to be ready, we use the same info file for all the map modes
serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.MapServerInfoFile))
if err != nil {
return err
}

if enableMapUdfStream {
// Read the server info file to read which map mode is enabled
// Based on the value set, we will create the corresponding handler and clients
mapMode, ok := serverInfo.Metadata[sdkserverinfo.MapModeMetadata]

if ok && (sdkserverinfo.MapMode(mapMode) == sdkserverinfo.StreamMap) {
log.Info("Map mode enabled: Stream Map")
// Map Stream mode
// Wait for server info to be ready
serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.MapStreamServerInfoFile))
if err != nil {
return err
}
enableMapUdfStream = true

mapStreamClient, err := mapstreamer.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
if err != nil {
Expand All @@ -167,14 +169,10 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error {
}()
opts = append(opts, forward.WithUDFStreamingMap(mapStreamHandler))

} else if enableBatchMapUdf {
} else if ok && (sdkserverinfo.MapMode(mapMode) == sdkserverinfo.BatchMap) {
log.Info("Map mode enabled: Batch Map")
// if Batch Map mode is enabled create the client and handler for that accordingly

// Wait for server info to be ready
serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.BatchMapServerInfoFile))
if err != nil {
return err
}
enableBatchMapUdf = true

// create the client and handler for batch map interface
batchMapClient, err := batchmapper.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
Expand All @@ -193,14 +191,12 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error {
}
}()
opts = append(opts, forward.WithUDFBatchMap(batchMapHandler))

} else {
log.Info("Map mode enabled: Unary Map")
// Default is to enable unary map mode

// Wait for server info to be ready
serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.MapServerInfoFile))
if err != nil {
return err
}
// If the MapMode metadata is not available, we will start map by default this will ensure
// backward compatibility in case of version mismatch for map

// create the client and handler for map interface
mapClient, err := mapper.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
Expand All @@ -222,6 +218,11 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error {
opts = append(opts, forward.WithUDFUnaryMap(mapHandler))
}

// We can have the vertex running only of the map modes
if enableMapUdfStream && enableBatchMapUdf {
return fmt.Errorf("vertex cannot have both map stream and batch map modes enabled")
}

for index, bufferPartition := range fromBuffer {
// Populate shuffle function map
shuffleFuncMap := make(map[string]*shuffle.Shuffle)
Expand Down
26 changes: 18 additions & 8 deletions test/sdks-e2e/sdks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func (s *SDKsSuite) TestUDFunctionAndSink() {
VertexPodLogContains("go-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("go-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("python-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa"))
VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("java-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("java-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa"))

w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello,hello"))).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello")))
Expand All @@ -76,6 +78,8 @@ func (s *SDKsSuite) TestMapStreamUDFunctionAndSink() {
VertexPodLogContains("go-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("python-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa"))
//VertexPodLogContains("java-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
// VertexPodLogContains("java-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa"))

w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello,hello,hello"))).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello")))
Expand All @@ -86,8 +90,10 @@ func (s *SDKsSuite) TestMapStreamUDFunctionAndSink() {
VertexPodLogContains("go-udsink-2", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(4))
w.Expect().
VertexPodLogContains("python-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(4))
w.Expect().
VertexPodLogContains("java-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(4))

// FIXME(map-batch): enable Java
//w.Expect().
// VertexPodLogContains("java-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(4))
}

func (s *SDKsSuite) TestBatchMapUDFunctionAndSink() {
Expand All @@ -101,17 +107,21 @@ func (s *SDKsSuite) TestBatchMapUDFunctionAndSink() {
VertexPodsRunning().
VertexPodLogContains("in", LogSourceVertexStarted).
VertexPodLogContains("go-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("go-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa"))
//VertexPodLogContains("python-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
//VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa"))
VertexPodLogContains("go-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("python-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa"))
// FIXME(map-batch): enable Java
//VertexPodLogContains("java-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
// VertexPodLogContains("java-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa"))

w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello,hello"))).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello")))

w.Expect().
VertexPodLogContains("go-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3))
VertexPodLogContains("go-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3)).
VertexPodLogContains("python-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3))
// FIXME(map-batch): enable Java
//VertexPodLogContains("java-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3)).
//VertexPodLogContains("python-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3))
}

func (s *SDKsSuite) TestReduceSDK() {
Expand Down
60 changes: 28 additions & 32 deletions test/sdks-e2e/testdata/flatmap-batch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ spec:
http: {}
- name: go-split
partitions: 2
metadata:
annotations:
numaflow.numaproj.io/batch-map: "true"
scale:
min: 1
udf:
Expand All @@ -28,31 +25,30 @@ spec:
# https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/log
image: quay.io/numaio/numaflow-go/sink-log:stable
imagePullPolicy: Always
#
## FIXME(map-batch): enable python and Java
# - name: python-split
# scale:
# min: 1
# udf:
# container:
# args:
# - python
# - example.py
# # Split input message into an array with comma, https://github.com/numaproj/numaflow-python/tree/main/examples/map/flatmap
# image: quay.io/numaio/numaflow-python/map-flatmap:stable
# imagePullPolicy: Always
# - name: python-udsink
# scale:
# min: 1
# sink:
# udsink:
# container:
# args:
# - python
# - example.py
# # https://github.com/numaproj/numaflow-python/tree/main/examples/sink/log
# image: quay.io/numaio/numaflow-python/sink-log:stable
# imagePullPolicy: Always
- name: python-split
scale:
min: 1
udf:
container:
args:
- python
- example.py
# Split input message into an array with comma, https://github.com/numaproj/numaflow-python/tree/main/examples/batchmap/flatmap
image: quay.io/numaio/numaflow-python/batch-map-flatmap:stable
imagePullPolicy: Always
- name: python-udsink
scale:
min: 1
sink:
udsink:
container:
args:
- python
- example.py
# https://github.com/numaproj/numaflow-python/tree/main/examples/sink/log
image: quay.io/numaio/numaflow-python/sink-log:stable
imagePullPolicy: Always
## FIXME(map-batch): enable Java
# - name: java-split
# scale:
# min: 1
Expand All @@ -75,10 +71,10 @@ spec:
to: go-split
- from: go-split
to: go-udsink
# - from: in
# to: python-split
# - from: python-split
# to: python-udsink
- from: in
to: python-split
- from: python-split
to: python-udsink
# - from: in
# to: java-split
# - from: java-split
Expand Down
Loading

0 comments on commit 1db0d09

Please sign in to comment.