Skip to content
This repository has been archived by the owner on Apr 8, 2024. It is now read-only.

Would like to handle jobs in batch #134

Closed
jbeaudoin11 opened this issue Mar 1, 2020 · 20 comments
Closed

Would like to handle jobs in batch #134

jbeaudoin11 opened this issue Mar 1, 2020 · 20 comments
Assignees
Labels
enhancement New feature or request

Comments

@jbeaudoin11
Copy link

jbeaudoin11 commented Mar 1, 2020

Hi !

Something i was wondering about, could we add batch support for worker ? Let say i have 500 jobs which each of them fetch data to an external API or database for example. Batching them in a single request would help reduce the load on these external resources and also reduce latency in general. Example :

const zbBatchWorker = zbc.createBatchWorker(
    'test-worker',
    'demo-service',
    {
        batch: 50, // # of jobs per batch
        timeout: 1000, // or 1 sec
    },
    async (payloads, complete) => {
        const ids = jobs.map((j) => j.id);
        const users = await fetch(`someUrl/user?ids=${ids.join(',')}`);

        for(const [key] of Object.keys(ids)) {
            const index = Number(key);
            const user = users[index];

            try {
                // ...Do some work... 

                if(...) {
                    complete.success(index, ...)
                } else {
                    complete.failure(index, ...)
                }
            } catch(error) {
                complete.error(index, ...)
            }
        }

        /*
            When the batch is done,
            this commit each job and
            call the right procedure (grpc)
            base on 'complete'
        */
        await complete.done();
    }
);

I can pass the number of jobs per batch or a timeout after which the batch is handle anyway. That said, it could be link to the maxActiveJobs propertie.

@jwulf
Copy link
Member

jwulf commented Mar 1, 2020

maxActiveJobs is 32 by default. See this section of the README: https://github.com/creditsenseau/zeebe-client-node-js#create-a-task-worker

The job handler is already the predicate function on an async map function (forEach with a callback - the complete methods), so it is doing what you are thinking of already.

See here: https://github.com/creditsenseau/zeebe-client-node-js/blob/master/src/zb/ZBWorker.ts#L275

Maybe I'm not understanding your use case...

How is your task not something that should be done as an array property in a single task?

Each of those jobs is a different instance of a task in the BPM model. Is there a reason why you don't batch it in your model?

If they can be batched in the worker, both logically and temporally, how are not correlated that way in the model?

@jbeaudoin11
Copy link
Author

jbeaudoin11 commented Mar 1, 2020

The job handler is already the predicate function on an async map function (forEach with a callback - the complete methods), so it is doing what you are thinking of already.

Not exactly, yes the worker process multiple job concurrently, but they share the same handle definition. What i'm trying to to do is let the handle manage the concurrency.

Maybe I'm not understanding your use case...

Maybe it's worth mentioning, I'm in the process of evaluating Zeebe to move an old implementation of workflow engine. We are currently processing hundreds of jobs per sec, this means hundreds/thousands workflow instances per workflow simultaneously, 0-10 workflows per user. It adds up. A workflow can be super simple, like 1 step, or more complex like 20 steps. But we have 16 job types to create these steps, 8 actions, 8 conditions with some timer steps.

In our implementation, a worker has control hover the whole batch it's trying to process. This gives us more flexibility like external API calls can be group into 1 single call which reduce a lot of the traffic on that external API.

How is your task not something that should be done as an array property in a single task?

Like i said, my goal is to reduce the number of external calls done by the job worker. Tasks of the same job type can share part of the code to optimize the processing ie fetch data from external API. Make 1 request vs 500.

Each of those jobs is a different instance of a task in the BPM model. Is there a reason why you don't batch it in your model?

Because it doesn't make sense to do it like that. I'm not trying to fetch multiple items related to a single job, generally 1 job = 1 fetch. What I'm trying to do is to batch at the worker level since we have lots of concurrent instances.

Part of the job can be shared in a batch, but we still want to keep control of individual instances.

Does it make more sense ?

@jwulf
Copy link
Member

jwulf commented Mar 1, 2020

You can actually do it now, like this:

import { ZBClient, ActivateJobsRequest } from 'zeebe-node'

const zbc = new ZBClient()

const req: ActivateJobsRequest = {
	maxJobsToActivate: 2,
	requestTimeout: 2000,
	timeout: 2000,
	type: 'my-task-type',
	worker: 'test-worker',
}

console.log((zbc as any).gRPCClient)
const stream = (zbc as any).gRPCClient.activateJobsStream(req)
stream.then(res => res.on('data', console.log))

If you are using TypeScript you'll need to erase typing of zbc, because gRPCClient is marked private. (and heads-up: it is changing to grpc in 0.23.0-alpha1, so assign it to another const in one place in your code, so the impact is minimal for you).

This is the branch that is coming in: https://github.com/jwulf/zeebe-client-node-js/tree/grpc-refactor

You would use the ZBClient.completeJob() and ZBClient.failJob() methods, as the ZBWorker lib does.

You can look through the source code for further information.

I am not going to add support for this use-case any time soon. It's a whole different state management situation. You can use the Node client as a low-level lib to do all the kind of custom coded solutions that are the way it is done with the other clients, for sure.

(Having said that - I'm easily excited about cool new things, and the next time I take a shower or go to sleep I may wake up possessed by a vision for this).

My focus with the library has been to provide an opinionated solution that lifts the logistics out of the frame and lets you focus just on the logic and the model. Dan Shapir's Zeebe Node NestJS integration (which is what I use in production) takes that even further.

But yeah, you can totally use it to do it in any way that you like.

The public API is stable, and if we stay in communication, I can make a stable API for your use-case - even if the library doesn't immediately support it as a first-class pattern. The changes in 0.23.0 will impact the naming of the grpc client component, but most of the other state management you wouldn't be using, and the underlying gRPC protocol methods, and the Zeebe Node public APIs are stable.

@jbeaudoin11
Copy link
Author

Alright, tbh i was already thinking about forking and doing it my self :P.

Might have a PR at some point for that.

@jwulf
Copy link
Member

jwulf commented Mar 1, 2020

Yeah, there is like retry logic for operations and things like that that would be useful to reuse in this scenario.

The lightest weight solution would be to give the ZBClient an ActivateJobs method that allows you to specify the request, and returns a batch.

Then you are responsible for managing the completion of each job using the decoupled completion pattern.

Yeah, I can see a low-orbit solution. I'll add the public API for activateJobs on ZBClient and it will roll out this week with the 0.23.0 release.

It actually just takes that decoupled completion pattern another step forward.

@jbeaudoin11
Copy link
Author

I was thinking of creating a new ZBBatchWorker inspired by the ZBWorker.

Maybe they could share some logic.. Will see

@jwulf jwulf self-assigned this Mar 1, 2020
@jwulf
Copy link
Member

jwulf commented Mar 1, 2020

That will be cool.

@jwulf jwulf reopened this Mar 1, 2020
@jwulf jwulf added this to the 0.23.0-alpha.1 milestone Mar 1, 2020
@jwulf
Copy link
Member

jwulf commented Mar 1, 2020

About this:

const zbBatchWorker = zbc.createBatchWorker(
    'test-worker',
    'demo-service',
    {
        batch: 50, // # of jobs per batch
        timeout: 1000, // or 1 sec
    },

The timer resolution in the broker is a 30-second sweep, so you can't guarantee anything less than that for job timeouts.

It was a mistake I made putting times in ms. See here: camunda/camunda#3944

@jbeaudoin11
Copy link
Author

jbeaudoin11 commented Mar 1, 2020

I'm not sure if we are talking about the same thing. Maybe my naming is wrong, in google pubsub it would be :

{
    maxMessages: 50,
    maxMilliseconds: 1000,
}

It's either 50 jobs or wait 1s before processing the next batch. So you can have less than 50 jobs.

@jwulf
Copy link
Member

jwulf commented Mar 1, 2020

Ah, in Zeebe it is: 50 jobs, and give me n seconds to complete them before passing them out to another worker request.

@jbeaudoin11
Copy link
Author

Yes I'm describing a different behavior.
Is it not ActivateJobsRequest.requestTimeout ?

@jwulf
Copy link
Member

jwulf commented Mar 1, 2020

What do you think about extending the job, by appending success failure and error properties to it?

Then the API would be:

batch.map(job => didItWork? job.success() : job.failure())

You don't have to manage references or state then in your business logic.

You wouldn't be limited to using that - you can still destructure the data for the job and complete it with the job key as an argument to a method. It would be more like a convenience method.

Also, the error completion is not for exceptions in code, it is for business errors - like "User's payment was declined", that are modeled as exceptional flows. BPMN Error Event.

Code exceptions are failure - they communicate: "I blew up, give the job to another worker to retry or raise an operational incident if retries are exhausted".

@jwulf
Copy link
Member

jwulf commented Mar 1, 2020

There are two timeouts.

One is requestTimeout - how long your request should be pending before the broker releases the blocking long poll of your pending request. That's not going to be an issue for your use case, by the sound of it. By default the broker terminates requests at 15s, unless you request more.

The other is timeout - that is the amount of time the broker gives the worker exclusively responsibility for carrying out and reporting the outcome of the work. When that time expires, if jobs have not been completed, the broker will return those jobs in response to requests from other workers.

After that time, this worker can still complete them, up the moment that another worker reports completion before they do; but this worker cannot fail them now. They are already being "retried".

Zeebe guarantees at-least-once delivery of jobs.

@jwulf
Copy link
Member

jwulf commented Mar 1, 2020

One last thought, about the method signature. 0.23.0 adds overloads to createWorker to remove the worker id (it turns out not be so useful that it should be required), and add a single param object signature.

See here for more details: https://www.joshwulf.com/blog/2020/02/refining-method-signature/

@jbeaudoin11
Copy link
Author

Yes I think it would be better to receive a job obj with callbacks, variables and headers. Since we need to do individual grpc calls anyways there is no real benefit of having a complete.done method for the batch.

requestTimeout, is what i was trying to describe then. Is there a minimum value ? In our case, some job types are rarely used to build workflow, this means it could be nice to not always poke for new tasks every milliseconds if the "queue" is empty, workers will use minimal ressources in those down time.

@jwulf
Copy link
Member

jwulf commented Mar 1, 2020

https://zeebe.io/blog/2019/08/long-polling/

You probably want to read this one too:

https://zeebe.io/blog/2019/12/zeebe-performance-profiling/

@jwulf
Copy link
Member

jwulf commented Mar 2, 2020

I can see how to build this, re-using the machinery in place to get the ergonomics of retries.

It might be best to lift the existing ZBWorker to a ZBWorkerBase class, then extend it to get the specialisations of the poll request and the handler shape for batch and single job workers. I'll take a look at it today.

@jwulf
Copy link
Member

jwulf commented Mar 2, 2020

OK, I've done it. I had the thought at the beginning:

Favour composition over inheritance

But I did it as a base class and two classes that extend it. Getting the Generics to work was a bit of work.

I think that it can be refactored to a single class, because the only difference between them is the shape of the taskhandler function, and then how the worker calls that taskhandler (mapping it over the jobs, or passing in the array).


There is an issue, though, I think, with this approach.

The ZBWorkers now have a minJobBatchSize property.

It is not a statement about the broker or the state of the jobs on the broker, but rather a statement about the capacity of the worker - which is the only thing the worker has knowledge of.

However, there is no way to guarantee that this is the minimum number of jobs that the worker will receive.

Rather, this is the minimum capacity that the worker must have before it will request more jobs.

For example:

  • A worker has maxActive 32, and minJobBatchSize 10.
  • It gets 32 jobs on a poll.
  • It no longer has capacity to request 10 jobs (the minimum), so it does not ask for more.
  • When it has completed 10 of the jobs, it currently has 22 active jobs that it is working on, giving it capacity for 10 more.
  • Now it again asks for 10 jobs (the minimum batch size, and the amount of capacity it has).
  • However, if the broker has only 2 jobs available, it will get 2 jobs.
  • And then, as soon as two more jobs are complete, it will ask again.
  • If one job is available, it will get that.
  • If it drains all jobs, as soon as a single job is available on the broker, it will get that.

There is no way to guarantee any kind of batching in the worker, unless I add a "poll period parameter", which will make configuration complex (as well as the code). Even then, nothing is guaranteed. You are really waiting for jobs to buffer on the broker.

My initial intuition is confirmed. If your workloads can be correlated in the worker, they need to be correlated in the model you build.

Or, you have to implement buffering in the worker, using an array and a timer, and using the complete.forwarded() method, like this:

// State machine to execute a handler on multiple jobs every ${time} or ${batchSize}

const JobBuffer = ({handler, timeout, batchSize) => {
   let jobs = []
   const loop = () => setTimeout(execute, timeout * 1000)
   let t = loop()
   const execute = () => {
      clearTimeout(t)
      handler([...jobs])
      jobs = []
      t = loop()
   }
   return {
      buffer: job => {
        jobs.push(job)
        if (jobs.length >= batchSize) {
           execute()
        }
      },
      count: () => jobs.length,
    }
}

// Every 60s or 10 jobs
const jobBuffer = JobBuffer({
  timeout: 60,
  handler: jobs => jobs.forEach(job => zbc.completeJob(job.key, {})),
  batchSize: 10
})

zbc.createWorker({
  taskType: 'jobs-to-batch',
  timeout: 70, // 70 seconds
  taskHandler: (job, complete) => {
    jobBuffer.buffer(job)
    complete.forwarded()
  })
})

OK, so I put this state machine in the ZBBatchWorker:

02:41:34.277 | zeebe |  [generic-test (batch)] Executing batched handler with 5 jobs
02:41:34.277 | zeebe |  [generic-test (batch)] Got 5 jobs...
CREATING
CREATING
CREATING
CREATING
CREATING
02:41:36.779 | zeebe |  [generic-test (batch)] Executing batched handler with 5 jobs
02:41:36.779 | zeebe |  [generic-test (batch)] Got 5 jobs...
CREATING
CREATING
CREATING
CREATING
CREATING
02:41:39.312 | zeebe |  [generic-test (batch)] Executing batched handler with 5 jobs
02:41:39.312 | zeebe |  [generic-test (batch)] Got 5 jobs...
CREATING
CREATING
STOPPING

@jwulf
Copy link
Member

jwulf commented Mar 2, 2020

Will be out this week in the 0.23.0-alpha.1 release.

Here are the docs from the README:

The ZBBatchWorker Job Worker

The ZBBatchWorker Job Worker batches jobs before calling the job handler. Its fundamental differences from the ZBWorker are:

  • Its job handler receives an array of one or more jobs.
  • The jobs have success, failure, error, and forwarded methods attached to them.
  • The handler is not invoked immediately, but rather when enough jobs are batched, or a job in the batch is at risk of being timed out by the Zeebe broker.

You can use the batch worker if you have tasks that benefit from processing together, but are not related in the BPMN model.

An example would be a high volume of jobs that require calls to an external system, where you have to pay per call to that system. In that case, you may want to batch up jobs, make one call to the external system, then update all the jobs and send them on their way.

The batch worker works on a first-of batch size or batch timeout basis.

You must configure both jobBatchMinSize and jobBatchMaxTime. Whichever condition is met first will trigger the processing of the jobs:

  • Enough jobs are available to the worker to satisfy the minimum job batch size;
  • The batch has been building for the maximum amount of time - "we're doing this now, before the earliest jobs in the batch time out on the broker".

You should be sure to specify a timeout for your worker that is jobBatchMaxTime plus the expected latency of the external call plus your processing time and network latency, to avoid the broker timing your batch worker's lock and making the jobs available to another worker. That would defeat the whole purpose.

Here is an example of using the ZBBatchWorker:

import { API } from './lib/my-awesome-external-api'
import { ZBClient, BatchedJob } from 'zeebe-node'

const zbc = new ZBClient()

// Helper function to find a job by its key
const findJobByKey = jobs => key => jobs.filter(job => job.jobKey === id)?.[0] ?? {}

const handler = async (jobs: BatchedJob[], worker: ZBBatchWorker) => {
    worker.log("Let's do this!")
    const {jobKey, variables} = job
    // Construct some hypothetical payload with correlation ids and requests
    const req = jobs.map(job => ({id: jobKey, data: variables.request}))
    // An uncaught exception will not be managed by the library
    try {
        // Our API wrapper turns that into a request, and returns
        // an array of results with ids
        const outcomes = await API.post(req)
        // Construct a find function for these jobs
        const getJob = findJobByKey(jobs)
        // Iterate over the results and call the succeed method on the corresponding job,
        // passing in the correlated outcome of the API call
        outcomes.forEach(res => getJob(res.id)?.success(res.data))
    } catch (e) {
        jobs.forEach(job => job.failure(e.message))
    }
}

const batchWorker = zbc.createBatchWorker({
    taskType: 'get-data-from-external-api',
    taskHandler: handler,
    jobBatchMinSize: 10, // at least 10 at a time
    jobBatchMaxTime: 60, // or every 60 seconds, whichever comes first
    timeout: 80 // 80 second timeout means we have 20 seconds to process at least
})

@jwulf jwulf added the enhancement New feature or request label Mar 2, 2020
@jbeaudoin11
Copy link
Author

I think it's exactly what i want in term of functionality. Thanks !

@s3than s3than closed this as completed in c1ca167 Mar 6, 2020
@jwulf jwulf mentioned this issue May 6, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants