Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed list processing #4

Open
c0c0n3 opened this issue Jul 3, 2017 · 2 comments
Open

Distributed list processing #4

c0c0n3 opened this issue Jul 3, 2017 · 2 comments

Comments

@c0c0n3
Copy link
Member

c0c0n3 commented Jul 3, 2017

Implement a simple distributed list processing framework where operations on lists are executed in separate processes, typically on different nodes in a cluster.

This would be a very simple form of dataflow programming that we could implement on top of the (refactored) messaging API (see #3) by extracting generic list processing functionality from the import task. Not very sophisticated but practical enough we could use it to do map-reduce (sort of) and run scripts at scale without having to commit to a heavy-weight framework and its programming model. In my experience, lots of programming tasks can be brought back, in a way or another, to list processing. Examples: most of Smuggler's code uses the new Java streams API heavily; analysis pipelines; Unix pipes & filters; etc. So having a distributed list processing framework in place could come in handy...

I know all this is debatable at best, but am putting it here for discussion. Following this morning's meeting with @jburel, I've written up some implementation notes below. Please don't curse me for the maths below, I know it's hideous but these are mainly "notes-to-self" (and @jburel) that I'll eventually replace with better, non-mathematical explanations when I have the time. In the meantime, you can safely ignore what's below ;-)

Background info

Most list processing operations can be derived quite easily from the fact that an arbitrary mapping f from a set A to a monoid M can be lifted in a unique way to a monoid homomorphism. (See free-forgetful adjunction and free monoids.) In fact, you can easily derive "fold" operations known to functional programmers from this lifted homomorphism. See pic below for how to derive a left fold.

img_20170703_110853049

But most list processing operations are primitive recursive and so can be implemented with a fold. (See e.g. this.) Note that initial/terminal algebras are a better and more general conceptual framework for all this, but I think if you have to implement list processing from scratch without compiler support, then the conceptual model based on free monoids is easier to implement.

Basic idea

Take the monoid homomorphism discussed above as a primitive operation and provide an implementation of it that distributes computation over a cluster of machines connected by a message queue. (See diagram in black on the top right of the picture above, "N" stands for a node, "q" for, well, queue :) Once we have that, other list operations are easy to implement, e.g. map (as in map-reduce) becomes map f = Φ (i ∘ f) where f : A ⟶ B is the "mapper" and i: B ⟶ FB is the insertion of generators. So we could potentially get alot of list processing functionality almost for free if we manage to get right the implementation of the primitive operation. Whereas Artemis (or another MOM, see #3) provides the "distribute" & "consume" infrastructure (see diagram) and potentially we'd need very little code to implement "route" & "reduce buffer", there's a party pooper called...data locality!!! I haven't thought about this thoroughly and it could be a show stopper...

@joshmoore
Copy link
Member

What would it look like (and what would it be named) to layer another verticle on top of this one to make this happen?

@c0c0n3
Copy link
Member Author

c0c0n3 commented Jul 13, 2017

A verticle layered on top could be a micro-service that orchestrates the underlying distributed list processing functionality to provide the service. Just an idea. Not sure yet about the details though. To make things a bit more concrete, here's the lunar orbit view of how imports work in Smuggler:

img_20170713_110052290

The client submits a list of import requests that Smuggler queues. Then it processes them by running the CLI import. Finally it produces a process outcome report. (There's alot more going on under the bonnet---retries, failure handling, progress tracking, user notifications, etc.---but let's not get bogged down with the details now.) The point is, you could conceptualise all this as the list processing pipeline I sketched on the whiteboard: first map an enqueue task to each list item, then map the process task, then reduce the list to a single item by grouping together failed/succeeded imports. (NB this is an oversimplification that doesn't take into account side-effects, but again, let's ignore the technical details for now.)

The distributed list processing framework would provide generic list operations such as map, reduce, etc. Like I said, most of them could be implemented in terms of a single primitive operation, dramatically reducing dev effort. The micro-service would implement specific tasks like running an import and producing an outcome report and then string them together in a distributed pipeline using the generic list operations, e.g. map runImport | groupBy outcomeReport (pseudo code). Under the bonnet, computation would happen just like I sketched out in the Background Info section above---see drawing in black on the top right. In a nutshell, list items become messages on one or more queues, each gets consumed by a mapping task, the mapped outputs end up on the queue(s) again, and finally consumed by a reduce task that assembles the result.

In fact, this exactly how imports work in Smuggler, except there's only one embedded message broker so there's no distribution of messages across brokers/queues. So what I'm proposing here is generalising Smuggler's import workflow to a simple map/reduce framework of sorts. In principle, this would be very similar to Vert.x/Reactive except based on a message queue instead of the internal Vert.x event bus and implemented using Smuggler's messaging API.

I need to have a more in-depth look at Vert.x/Reactive though. In fact, we might even be better off ditching Smuggler's messaging and use Vert.x/Reactive for distributed list processing instead. Surely one thing to take into account is fault-tolerance in the two different architectures. Vert.x uses an event bus where messages can be distributed with Hazelcast through an in-memory data grid. Hazelcast automatically backs up your data across multiple nodes, so that if one of them crashes, your message data should still be available through the surviving nodes. So supposedly, another node can carry on processing your message. But what happens in the simple scenario where you only have one JVM on one box? Well, if the box goes down, your message is gone too, unless you configure Hazelcast to also persist your data, e.g. using a NoSQL backend. On the other hand, if messaging happens over a persistent queue (like e.g. with Artemis), even in the single-machine scenario, a crash wouldn't be a problem as any queued or being-processed message will still be available when the box comes back up. This is one of the reasons Smuggler uses a message queue. In fact, at MRI we have a single, standalone Smuggler instance on each acquisition workstation and it's not uncommon these workstations get rebooted or even powered off at least a couple of times a week when Smuggler could be in the middle of processing an import or have some imports still in the queue or both.

Help, ideas and suggestions most welcome!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants