Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use same server-info file for all map modes #1828

Merged
merged 13 commits into from
Jul 18, 2024
6 changes: 1 addition & 5 deletions docs/user-guide/user-defined-functions/map/map.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,14 @@ spec:
In cases the map function generates more than one output (e.g., flat map), the UDF can be
configured to run in a streaming mode instead of batching, which is the default mode.
In streaming mode, the messages will be pushed to the downstream vertices once generated
instead of in a batch at the end. The streaming mode can be enabled by setting the annotation
`numaflow.numaproj.io/map-stream` to `true` in the vertex spec.
instead of in a batch at the end.

Note that to maintain data orderliness, we restrict the read batch size to be `1`.

```yaml
spec:
vertices:
- name: my-vertex
metadata:
annotations:
numaflow.numaproj.io/map-stream: "true"
limits:
# mapstreaming won't work if readBatchSize is != 1
readBatchSize: 1
Expand Down
8 changes: 0 additions & 8 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ const (
EnvNamespace = "NUMAFLOW_NAMESPACE"
EnvPipelineName = "NUMAFLOW_PIPELINE_NAME"
EnvVertexName = "NUMAFLOW_VERTEX_NAME"
EnvMapStreaming = "NUMAFLOW_MAP_STREAMING"
EnvBatchMap = "NUMAFLOW_BATCH_MAP"
EnvCallbackEnabled = "NUMAFLOW_CALLBACK_ENABLED"
EnvCallbackURL = "NUMAFLOW_CALLBACK_URL"
EnvPod = "NUMAFLOW_POD"
Expand Down Expand Up @@ -211,12 +209,6 @@ const (
// KeysDelimitter is the delimitter used to join keys
KeysDelimitter = ":"

// UDF map streaming
MapUdfStreamKey = "numaflow.numaproj.io/map-stream"

// BatchMapUdfStreamKey is the annotation for enabling UDF Batch Map
BatchMapUdfStreamKey = "numaflow.numaproj.io/batch-map"

// Pipeline health status
PipelineStatusHealthy = "healthy"
PipelineStatusUnknown = "unknown"
Expand Down
2 changes: 0 additions & 2 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,8 @@ func (v Vertex) commonEnvs() []corev1.EnvVar {
{Name: EnvReplica, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + KeyReplica + "']"}}},
{Name: EnvPipelineName, Value: v.Spec.PipelineName},
{Name: EnvVertexName, Value: v.Spec.Name},
{Name: EnvMapStreaming, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + MapUdfStreamKey + "']"}}},
{Name: EnvCallbackEnabled, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + CallbackEnabledKey + "']"}}},
{Name: EnvCallbackURL, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + CallbackURLKey + "']"}}},
{Name: EnvBatchMap, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.annotations['" + BatchMapUdfStreamKey + "']"}}},
}
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/sdkclient/grpc/grpc_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package grpc
import (
"fmt"
"log"
"strconv"

"github.com/numaproj/numaflow-go/pkg/info"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

resolver "github.com/numaproj/numaflow/pkg/sdkclient/grpc_resolver"
sdkserverinfo "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
)

// ConnectToServer connects to the server with the given socket address based on the server info protocol.
Expand All @@ -34,14 +36,16 @@ func ConnectToServer(udsSockAddr string, serverInfo *info.ServerInfo, maxMessage
var sockAddr string

// Check if Multiproc server mode is enabled
if _, ok := serverInfo.Metadata["MULTIPROC"]; ok {
if multiProcServer, ok := serverInfo.Metadata[sdkserverinfo.MultiProcMetadata]; ok {
// Extract the server ports from the server info file
numServers, _ := strconv.Atoi(multiProcServer)
// In Multiprocessing server mode we have multiple servers forks
// and each server will listen on a different port.
// On the client side we will create a connection to each of these server instances.
// The client will use a custom resolver to resolve the server address.
// The custom resolver will return the list of server addresses from the server info file.
// The client will use the list of server addresses to create the multiple connections.
if err := resolver.RegMultiProcResolver(serverInfo); err != nil {
if err := resolver.RegMultiProcResolver(numServers); err != nil {
return nil, fmt.Errorf("failed to start Multiproc Client: %w", err)
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/sdkclient/grpc_resolver/client_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"strconv"
"strings"

"github.com/numaproj/numaflow-go/pkg/info"
"google.golang.org/grpc/resolver"

"github.com/numaproj/numaflow/pkg/sdkclient"
Expand Down Expand Up @@ -99,9 +98,7 @@ func buildConnAddrs(numServers int) []string {

// RegMultiProcResolver is used to populate the connection properties based
// on multiprocessing TCP or UDS connection
func RegMultiProcResolver(svrInfo *info.ServerInfo) error {
// Extract the server ports from the server info file and convert it to a list
numServers, _ := strconv.Atoi(svrInfo.Metadata["MULTIPROC"])
func RegMultiProcResolver(numServers int) error {
log.Println("Multiprocessing Servers :", numServers)
conn := buildConnAddrs(numServers)
res := newMultiProcResolverBuilder(conn)
Expand Down
18 changes: 18 additions & 0 deletions pkg/sdkclient/serverinfo/serverinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,24 @@ import (
"github.com/numaproj/numaflow"
)

// Metadata keys used in the server info file
const (
// MultiProcMetadata is the field used to indicate that MultiProc map mode is enabled
// The value contains the number of servers spawned.
MultiProcMetadata = "MULTIPROC"
// MapModeMetadata field is used to indicate which map mode is enabled
// If none is set, we consider unary map as default
MapModeMetadata = "MAP_MODE"
)

type MapMode string

const (
UnaryMap MapMode = "unary-map"
StreamMap MapMode = "stream-map"
BatchMap MapMode = "batch-map"
)

// SDKServerInfo wait for the server to start and return the server info.
func SDKServerInfo(inputOptions ...Option) (*info.ServerInfo, error) {
var opts = DefaultOptions()
Expand Down
49 changes: 25 additions & 24 deletions pkg/udf/map_udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,24 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error {
}

opts := []forward.Option{forward.WithLogger(log)}
enableMapUdfStream := false
enableBatchMapUdf := false
maxMessageSize := sharedutil.LookupEnvIntOr(dfv1.EnvGRPCMaxMessageSize, sdkclient.DefaultGRPCMaxMessageSize)
enableMapUdfStream := sharedutil.LookupEnvBoolOr(dfv1.EnvMapStreaming, false)
enableBatchMapUdf := sharedutil.LookupEnvBoolOr(dfv1.EnvBatchMap, false)

// We can have the vertex running only of the map modes
if enableMapUdfStream && enableBatchMapUdf {
return fmt.Errorf("vertex cannot have both map stream and batch map modes enabled")
// Wait for map server info to be ready, we use the same info file for all the map modes
serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.MapServerInfoFile))
if err != nil {
return err
}

if enableMapUdfStream {
// Read the server info file to read which map mode is enabled
// Based on the value set, we will create the corresponding handler and clients
mapMode, ok := serverInfo.Metadata[sdkserverinfo.MapModeMetadata]

if ok && (sdkserverinfo.MapMode(mapMode) == sdkserverinfo.StreamMap) {
log.Info("Map mode enabled: Stream Map")
// Map Stream mode
// Wait for server info to be ready
serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.MapStreamServerInfoFile))
if err != nil {
return err
}
enableMapUdfStream = true
kohlisid marked this conversation as resolved.
Show resolved Hide resolved

mapStreamClient, err := mapstreamer.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
if err != nil {
Expand All @@ -167,14 +169,10 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error {
}()
opts = append(opts, forward.WithUDFStreamingMap(mapStreamHandler))

} else if enableBatchMapUdf {
} else if ok && (sdkserverinfo.MapMode(mapMode) == sdkserverinfo.BatchMap) {
log.Info("Map mode enabled: Batch Map")
// if Batch Map mode is enabled create the client and handler for that accordingly

// Wait for server info to be ready
serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.BatchMapServerInfoFile))
if err != nil {
return err
}
enableBatchMapUdf = true
kohlisid marked this conversation as resolved.
Show resolved Hide resolved

// create the client and handler for batch map interface
batchMapClient, err := batchmapper.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
Expand All @@ -193,14 +191,12 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error {
}
}()
opts = append(opts, forward.WithUDFBatchMap(batchMapHandler))

} else {
log.Info("Map mode enabled: Unary Map")
// Default is to enable unary map mode

// Wait for server info to be ready
serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.MapServerInfoFile))
if err != nil {
return err
}
// If the MapMode metadata is not available, we will start map by default this will ensure
// backward compatibility in case of version mismatch for map

// create the client and handler for map interface
mapClient, err := mapper.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
Expand All @@ -222,6 +218,11 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error {
opts = append(opts, forward.WithUDFUnaryMap(mapHandler))
}

// We can have the vertex running only of the map modes
if enableMapUdfStream && enableBatchMapUdf {
return fmt.Errorf("vertex cannot have both map stream and batch map modes enabled")
}

for index, bufferPartition := range fromBuffer {
// Populate shuffle function map
shuffleFuncMap := make(map[string]*shuffle.Shuffle)
Expand Down
26 changes: 18 additions & 8 deletions test/sdks-e2e/sdks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func (s *SDKsSuite) TestUDFunctionAndSink() {
VertexPodLogContains("go-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("go-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("python-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa"))
VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("java-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("java-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa"))

w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello,hello"))).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello")))
Expand All @@ -76,6 +78,8 @@ func (s *SDKsSuite) TestMapStreamUDFunctionAndSink() {
VertexPodLogContains("go-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("python-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa"))
//VertexPodLogContains("java-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
// VertexPodLogContains("java-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa"))

w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello,hello,hello"))).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello")))
Expand All @@ -86,8 +90,10 @@ func (s *SDKsSuite) TestMapStreamUDFunctionAndSink() {
VertexPodLogContains("go-udsink-2", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(4))
w.Expect().
VertexPodLogContains("python-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(4))
w.Expect().
VertexPodLogContains("java-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(4))

// FIXME(map-batch): enable Java
//w.Expect().
// VertexPodLogContains("java-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(4))
}

func (s *SDKsSuite) TestBatchMapUDFunctionAndSink() {
Expand All @@ -101,17 +107,21 @@ func (s *SDKsSuite) TestBatchMapUDFunctionAndSink() {
VertexPodsRunning().
VertexPodLogContains("in", LogSourceVertexStarted).
VertexPodLogContains("go-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("go-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa"))
//VertexPodLogContains("python-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
//VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa"))
VertexPodLogContains("go-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("python-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("python-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa"))
// FIXME(map-batch): enable Java
//VertexPodLogContains("java-split", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
// VertexPodLogContains("java-udsink", SinkVertexStarted, PodLogCheckOptionWithContainer("numa"))

w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello,hello"))).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("hello")))

w.Expect().
VertexPodLogContains("go-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3))
VertexPodLogContains("go-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3)).
VertexPodLogContains("python-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3))
// FIXME(map-batch): enable Java
//VertexPodLogContains("java-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3)).
//VertexPodLogContains("python-udsink", "hello", PodLogCheckOptionWithContainer("udsink"), PodLogCheckOptionWithCount(3))
}

func (s *SDKsSuite) TestReduceSDK() {
Expand Down
60 changes: 28 additions & 32 deletions test/sdks-e2e/testdata/flatmap-batch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ spec:
http: {}
- name: go-split
partitions: 2
metadata:
annotations:
numaflow.numaproj.io/batch-map: "true"
scale:
min: 1
udf:
Expand All @@ -28,31 +25,30 @@ spec:
# https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/log
image: quay.io/numaio/numaflow-go/sink-log:stable
imagePullPolicy: Always
#
## FIXME(map-batch): enable python and Java
# - name: python-split
# scale:
# min: 1
# udf:
# container:
# args:
# - python
# - example.py
# # Split input message into an array with comma, https://github.com/numaproj/numaflow-python/tree/main/examples/map/flatmap
# image: quay.io/numaio/numaflow-python/map-flatmap:stable
# imagePullPolicy: Always
# - name: python-udsink
# scale:
# min: 1
# sink:
# udsink:
# container:
# args:
# - python
# - example.py
# # https://github.com/numaproj/numaflow-python/tree/main/examples/sink/log
# image: quay.io/numaio/numaflow-python/sink-log:stable
# imagePullPolicy: Always
- name: python-split
scale:
min: 1
udf:
container:
args:
- python
- example.py
# Split input message into an array with comma, https://github.com/numaproj/numaflow-python/tree/main/examples/batchmap/flatmap
image: quay.io/numaio/numaflow-python/batch-map-flatmap:stable
imagePullPolicy: Always
- name: python-udsink
scale:
min: 1
sink:
udsink:
container:
args:
- python
- example.py
# https://github.com/numaproj/numaflow-python/tree/main/examples/sink/log
image: quay.io/numaio/numaflow-python/sink-log:stable
imagePullPolicy: Always
## FIXME(map-batch): enable Java
# - name: java-split
# scale:
# min: 1
Expand All @@ -75,10 +71,10 @@ spec:
to: go-split
- from: go-split
to: go-udsink
# - from: in
# to: python-split
# - from: python-split
# to: python-udsink
- from: in
to: python-split
- from: python-split
to: python-udsink
# - from: in
# to: java-split
# - from: java-split
Expand Down
Loading
Loading