- Delayed jobs
- Retries
- Dead queue
- Priority
- Concurrency
- Pause/resume processing
- Optional Topic based publishing (publish into multiple queues)
- Low CPU usage
- Able to process around 1000 messages per second (tested on Macbook Pro 13-inch, 2017, concurrency set to 150)
npm install monkeycymbal --save
or
yarn add monkeycymbal
Requirements: Monkey Cymbal requires MongoDB
import { Queue } from 'monkeycymbal';
// Initialize queue
const queue = new Queue('mongodb://localhost/myDb', 'videoTranscoding');
// subscribe to the queue
queue.subscribe((msg) => {
// process the message
// ...
// we can return a result that will be saved in the message
return 'transcoded';
});
// Add a message to the queue
const [msgId] = await queue.add({ video: 'http://example.com/video1.mov' });
A queue can be paused and resumed globally (pass true
to pause processing for
just this worker):
await queue.pause()
// queue is paused now
await queue.resume()
// queue is resumed now
A queue emits also some useful events, for example...
queue.on('added', msgId => {
// A message has been added to the queue
})
queue.on('active', msg => {
// The message is being processed
});
queue.on('completed', (msg, result) => {
// The message has been processed succesfully
})
queue.on('error', (msg, error) => {
// An error occurred while processing the message.
// If maxRetries is set, it will be re-processed after a visibility timeout
})
queue.on('dead', msg => {
// The message is failed permanently.
// If a dead queue is configured, the message will be copied there.
})
For more information on events, including the full list of events that are fired, check out the Events reference
new Queue(connectionUrlOrMongoClient, queueName, options)
This is the Queue constructor. It creates a new Queue that is persisted in MongoDB.
Name | Type | Description |
---|---|---|
connectionUrlOrMongoClient required |
`MongoClient | string` |
queueName required |
string |
The name of the queue |
options |
SubscriptionOptions |
Arguments | Type | Default | Description |
---|---|---|---|
visibility |
number (seconds) |
10 | After a message is received to prevent other consumers from processing the message again, Monkeycymbal sets a visibility timeout, a period of time during which Monkeycymbal prevents other consumers from receiving and processing the message. |
delay |
number (seconds) |
if you set delay to be 10, then every message will only be available for retrieval 10s after being added. | |
maxRetries |
number |
5 | Maximum number of attempts to retry processing a message. If deadQueue is set, the message will be moved to the dead queue. Otherwise it will be acked. |
expireAfterSeconds |
number (seconds) |
The processed messages will be removed from the collection after the specified number of seconds. | |
concurrency |
number |
1 | The max number of messages that will be processed in parallel. |
pollInterval |
number (seconds) |
10 | The amount of time the subscriber waits before checking for new messages. |
deadQueue |
string or Queue instance |
Messages that have been retried over maxRetries will be pushed to this queue for later inspection. |
queue.subscribe(handler);
Defines a processing function for the jobs in a given Queue and start processing.
The handler function receive msg
as argument.
queue.add(msg, AddMessageOptions);
Adds a message to the queue.
Arguments | Type | Default | Description |
---|---|---|---|
priority |
number |
1 | Optional priority value. It ranges from -Infinity to +Infinity |
queue.pause();
Pause a queue. A paused queue will not process new jobs until resumed.
queue.resume();
Resume a queue after being paused.
queue.ping(msg.ack);
Ping a message to keep it's visibility open for long-running tasks.
queue.totalCount();
Returns the total number of records in the collection.
queue.waitingCount();
Returns the total number of messages that are waiting to be processed.
queue.inFlightCount();
Returns the total number of messages that are currently being processed.
new Channel(connectionUrlOrMongoClient, topic, options)
This is the Channel constructor. It creates a new Channel.
Name | Type | Description |
---|---|---|
connectionUrlOrMongoClient required |
`MongoClient | string` |
topic required |
topic |
The name of the channel. |
options |
SubscriptionOptions |
channel.publish(msg, PublishOptions);
Publish messages to the queues subscribed to the topic.
Arguments | Type | Default | Description |
---|---|---|---|
priority |
number |
1 | Optional priority value. It ranges from -Infinity to +Infinity |
channel.subscribe(handler, queueName, SubscriptionOptions): Queue;
Convenience method that returns an instance of a queue bound to the channel.