Simple and reliable background processing for Rust using Actix actors. Apalis currently supports Redis as a store, with SQlite, PostgresSQL and MySQL in the pipeline.
To get started, just add to Cargo.toml
[dependencies]
apalis = { version = "0.2", features = ["redis"] }
A running redis server is required. You can quickly use docker:
docker run --name some-redis -d redis
use actix::prelude::*;
use apalis::{
redis::{RedisConsumer, RedisProducer, RedisStorage}
Job, JobContext, JobFuture, JobHandler, Queue, Worker
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::sync::Mutex;
#[derive(Debug)]
pub enum MathError {
InternalError,
}
#[derive(Serialize, Deserialize)]
pub enum Math {
Add(u64, u64),
Fibonacci(u64),
}
impl Job for Math {
type Result = Result<u64, MathError>;
}
impl JobHandler<RedisConsumer<Math>> for Math {
type Result = JobFuture<Result<u64, MathError>>;
fn handle(
self,
ctx: &mut JobContext<RedisConsumer<Math>>,
) -> JobFuture<Result<u64, MathError>> {
let data = ctx.data_opt::<Arc<Mutex<MathCounter>>>().unwrap();
let mut data = data.lock().unwrap();
data.counter += 1;
match self {
Math::Add(first, second) => Box::pin(async move { Ok(first + second) }),
Math::Fibonacci(num) => {
let addr = ctx.data_opt::<Addr<FibonacciActor>>().unwrap().clone();
Box::pin(async move {
addr.send(Fibonacci(num))
.await
.map_err(|_e| MathError::InternalError)?
})
}
}
}
}
fn produce_jobs(queue: &Queue<Math, RedisStorage>) {
let producer = RedisProducer::start(queue);
producer.do_send(Math::Add(1, 2).into());
producer.do_send(Math::Fibonacci(9).into());
}
#[actix_rt::main]
async fn main() {
std::env::set_var("RUST_LOG", "info");
env_logger::init();
let storage = RedisStorage::new("redis://127.0.0.1/").unwrap();
let queue = Queue::<Math, RedisStorage>::new(&storage);
//This can be in another part of the program
produce_jobs(&queue);
let counter = Arc::new(Mutex::new(MathCounter { counter: 0 }));
let addr = SyncArbiter::start(2, || FibonacciActor); //Get the address of another actor
Worker::new()
.register_with_threads(2, move || {
RedisConsumer::new(&queue)
.data(counter.clone())
.data(addr.clone())
})
.run()
.await;
}
Please read CONTRIBUTING.md for details on our code of conduct, and the process for submitting pull requests to us.
We use SemVer for versioning. For the versions available, see the tags on this repository.
- Njuguna Mureithi - Initial work - Njuguna Mureithi
See also the list of contributors who participated in this project.
It was formally actix-redis-jobs
and if you want to use the crate name please contact me.
This project is licensed under the MIT License - see the LICENSE.md file for details
- Inspiration: The redis part of this project is heavily inspired by Curlyq which is written in GoLang