Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Mapping & Sink #1688

Closed
12 tasks done
magelisk opened this issue Apr 18, 2024 · 10 comments
Closed
12 tasks done

Batch Mapping & Sink #1688

magelisk opened this issue Apr 18, 2024 · 10 comments
Assignees
Labels
enhancement New feature or request
Milestone

Comments

@magelisk
Copy link
Contributor

magelisk commented Apr 18, 2024

Summary

I'mm looking for the ability for a mapper and sink to take a batch of items at once. Today these take single item at once, regardless of readBatchSize limiting the ability to take advantage of bulk APIs/processing,.

Use Cases

I have three main use cases that interest me. All of these desire to operate at near real-time, and the messages aren't necessarily required to be ordered for histogram/statistic type metrics, Thus, horizontal scaling should be easy for my use cases

  1. Bundling: I have a use case receiving 100k+ messages per sec, but each message is very small, < 1k. For throughput purposes it would make sense to take batches of those messages (say up to 1000), and group them into a single bundle. The overall MB of messages may not be changed, but it, from my rough experimentation, I can get more ISB throughput when operating on this smaller quantity of messages. Implementation of this bundling is of course user centric, but being able to receive mulitple messages at once would facilitate this.
    • Input would be N messages with 1-N responses (typically 1 in my personal use-case).
  2. Bundled transactions: Some operations can be generally more efficient by sending multiple data items through at once vs a single call for each. Ex. Elasticsearch BatchAPI, or pykafka producer (ignoring numaflow's built-in kafka sink).
    • Typically more of a 'sink' use case,
  3. Batching for performance: In a language like python, if you're doing the same operation on a lot of datasets using numpy/scipy/etc, it's useful to group them together and call the math logic once. If we could pull a batch of items at once, we can organize them for faster processing, and then write back out the results as appropriate.
    • This specific case would still be N messages in, N messages out, but could be combined with Bundling

If there's a way to accomplish any of this today, I'm happy to adopt that, but I don't think the reduce capability quite fits my expectations, particularly with the time based windows and limitations of horizontal scaling in order to meet the ordering expectations.


Message from the maintainers:

If you wish to see this enhancement implemented please add a 👍 reaction to this issue! We often sort issues this way to know what to prioritize.

@magelisk magelisk added the enhancement New feature or request label Apr 18, 2024
@vigith vigith added this to the 1.3 milestone Apr 18, 2024
@magelisk
Copy link
Contributor Author

magelisk commented Apr 23, 2024

Since I'm interested in this, I wanted to take a first attempt at implementing this following the patterns that currently exist. I'll expect to have some PRs up shortly for first few items I made on check list over this week. This will be my first contribution to this community, so will hopefully be able to work well with everyone to work through details.

I made some tasks to help track how I'm plan to commit items. My initial focus is on python as it's my primary use case, though if I can I will try to do other languages as well.

@whynowy
Copy link
Member

whynowy commented Apr 23, 2024

@magelisk - For sinks, the batch operation is already there in the interface exposed to the developers with streaming data from the gRPC clients, I don't expect there will be too much performance enhancement to send the data with a unary call from the client in a batch.

@vigith
Copy link
Member

vigith commented Apr 23, 2024

I think we should do batching on top on gRPC streaming, we have an issue to track this #1564

@magelisk
Copy link
Contributor Author

Thanks for the input. I hadn't used the streaming sink so I wasn't familiar with it's interface taking the iterator, that's very useful.

Following that and the recommendation that at this be built on top of the streaming interface I think I've wrapped my head around this. I see that #1564 was already given an assignment to @yhl25 . I don't know if any work has been done on this on his part (I couldn't find anything in his accessible repos), so I did an initial draft for my own personal learning opportunity. I don't wan to step on anyone, so feel free to kick it away, but hopefully I can contribute something to help.

protodef: numaproj/numaflow-go#129
numaflow core: #1707
python client: numaproj/numaflow-python#163

The biggest question of these - I did this with a NEW gRPC endpoint, so now the mapping streamer has to Fn handlers. This is a deviation from existing patterns but allows maintaining backwards combability. Not sure how the numaflow team hope to handle these kinds of breakages generally speaking.

@syayi
Copy link
Member

syayi commented May 1, 2024

@magelisk, thank you for being a contributor to Numaflow. We (the Numaflow team) would like to know how the community and you are using the platform, learn about the use cases, and take some feedback. Would you be open for a chat with us based on your availability?

@magelisk
Copy link
Contributor Author

magelisk commented May 1, 2024

@syayi would be happy to talk at some point. I'm generally pretty free, and US EST based

@syayi
Copy link
Member

syayi commented May 3, 2024

Thanks @magelisk ! Sure, drop me a note here or on slack based on what works best for you, and happy to schedule a call

@magelisk
Copy link
Contributor Author

We have a conversation tomorrow, but so that there's a written record I'll put this here now to solicit and document thoughts.

The currently implementation I did was one that has new gRPC call within existing service. This works, though breaks the 'function-only' method as-is. So I think there are a few options

  1. Remove/Replace existing map function which takes a single datum and let it take the multiple requiring caller to iterate over stream explicitly
    • Consistent with user defined sink patter
    • Not quite consistent with a map handler function that's typically used
  2. A new Service for this which contains single endpoint.
    • A fair bit of additional overhead/new classes that are nearly duplicates of existing. Could share common classes between protos, but again, this isn't something that's currently done so would need to balance DRY vs compartmentalization
    • Should maintain full backwards compability
  3. Use as-is in current MR (MapFn and BatchMapFn, and find some other work around, or adjust interface definition to have styles that still work with function-only patter
    • I think this is more complex, more moving parts, but should maintain existing backwards compatibility (I haven't prototyped this to see what final impact to the interface classes are

@mdwarne1
Copy link
Contributor

mdwarne1 commented Jul 18, 2024

I haven't been able to pull this into my operation test bed yet, but local representative stuff looks good. Thanks.

@vigith
Copy link
Member

vigith commented Aug 20, 2024

We have included this in the 1.3.0 release.

@vigith vigith closed this as completed Aug 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants