Skip to content

Updates #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ authors = ["epwalsh <epwalsh10@gmail.com>"]
edition = "2018"

[dependencies]
async-trait = "0.1"
tokio = { version = "0.2", features = ["full"] }
celery = { git = "https://github.com/rusty-celery/rusty-celery" }
exitfailure = "0.5.1"
Expand Down
4 changes: 3 additions & 1 deletion src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

[What is Rusty Celery?](what-is-celery.md)

- [Getting Started](getting-started/index.md)
- [Quick Start](quick-start.md)
- [Going Deeper](guide/index.md)
- [Defining Tasks](guide/defining-tasks.md)
- [Best Practices](best-practices/index.md)
- [Coming from Python?](coming-from-python/index.md)

Expand Down
14 changes: 12 additions & 2 deletions src/coming-from-python/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Though a lot of the Rusty Celery API is very similar to the Python equivalent -

In some cases this means the Rust equivalent is a little more verbose or takes a little more care on the user's end, but ultimately I think you'll find that the downsides of the Rust implementation are heavily outweighed by the benifits it brings: most notably speed, safety, and a much smaller memory footprint.

### Registering tasks
## Registering tasks

In Python you can register tasks by dynamically importing them at runtime through the [`imports`](https://docs.celeryproject.org/en/stable/userguide/configuration.html#imports) configuration field, but in Rust you need to manually register all tasks either within the [`celery_app`](https://docs.rs/celery/*/celery/macro.celery_app.html) macro or using the [`Celery::register_task`](https://docs.rs/celery/*/celery/struct.Celery.html#method.register_task) method:

Expand All @@ -25,7 +25,7 @@ fn add(x: i32, y: i32) -> i32 {
my_app.register_task::<add>().unwrap();
```

### Running a worker
## Running a worker

While Python Celery provides a CLI that you can use to run a worker, in Rust you'll have to implement your own worker binary. However this is a lot easier than it sounds. At a minimum you just need to initialize your [`Celery`](https://docs.rs/celery/*/celery/struct.Celery.html) application, define and register your tasks, and run the [`Celery::consume`](https://docs.rs/celery/*/celery/struct.Celery.html#method.consume) method within your `main` function.

Expand Down Expand Up @@ -61,3 +61,13 @@ async fn main() -> Result<(), ExitFailure> {
Ok(())
}
```

The `consume` method will listen for `SIGINT` and `SIGTERM` signals just like a Python worker and will try to clean up finish all pending tasks before shutting down.

## Time limits vs timeout

In Python you configure tasks to have a [soft or hard time limit](https://docs.celeryproject.org/en/latest/userguide/workers.html#time-limits). A soft time limit allows a task to clean up after itself if it runs over the limit, while a hard limit will force terminate the task.

In Rust we've replaced these with a single configuration option: [`timeout`](https://docs.rs/celery/*/celery/struct.TaskOptions.html#structfield.timeout). A worker will wait `timeout` seconds for a task to finish and then will interrupt it if it hasn't completed in time. After a task is interrupted, its [`on_failure`](https://docs.rs/celery/*/celery/trait.Task.html#method.on_failure) callback will be called with the [`TimeoutError`](https://docs.rs/celery/*/celery/enum.ErrorKind.html#variant.TimeoutError) error kind.

> NOTE: It's only possible to interrupt non-blocking operations since tasks don't run in their own dedicated threads.
3 changes: 0 additions & 3 deletions src/getting-started/index.md

This file was deleted.

131 changes: 131 additions & 0 deletions src/guide/defining-tasks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Defining Tasks

A **task** represents a unit of work that a `Celery` app can produce or consume.

The recommended way to define a task is by decorating a function with the [`task`](https://docs.rs/celery/*/celery/attr.task.html) attribute macro:

```rust,noplaypen
use celery::task;

#[task]
fn add(x: i32, y: i32) -> i32 {
x + y
}
```

The macro accepts the following optional parameters:

- `name`: The name to use when registering the task. Should be unique. If not given the name
will be set to the name of the function being decorated.
- `timeout`: Corresponds to [`Task::timeout`](https://docs.rs/celery/*/celery/trait.Task.html#method.timeout).
- `max_retries`: Corresponds to [`Task::max_retries`](https://docs.rs/celery/*/celery/trait.Task.html#method.max_retries).
- `min_retry_delay`: Corresponds to [`Task::min_retry_delay`](https://docs.rs/celery/*/celery/trait.Task.html#method.min_retry_delay).
- `max_retry_delay`: Corresponds to [`Task::max_retry_delay`](https://docs.rs/celery/*/celery/trait.Task.html#method.max_retry_delay).

For example, to give a task a custom name and set a timeout:

```rust,noplaypen
# use celery::task;
use tokio::time::{self, Duration};

#[task(name = "sleep", timeout = 5)]
fn delay(secs: u64) {
time::delay_for(Duration::from_secs(secs)).await
}
```

## Error handling

As demonstrated below in [Implementation details](#implementation-details), the `#[task]` attribute macro will wrap the return value
of the function in `Result<Self::Returns, Error>`.
Therefore the recommended way to propogate errors when defining a task is to use
`.context("...")?` on `Result` types within the task body:

```rust,noplaypen
use celery::{task, ResultExt};

#[task]
fn read_some_file() -> String {
tokio::fs::read_to_string("some_file")
.await
.context("File does not exist")?
}
```

The `.context` method on a `Result` comes from the [`ResultExt`](https://docs.rs/celery/*/celery/trait.ResultExt.html) trait.
This is used to provide additional human-readable context to the error and also
to convert it into the expected [`Error`](https://docs.rs/celery/*/celery/struct.Error.html) type.

## Positional vs keyword parameters

Within the [Celery protocol](https://docs.celeryproject.org/en/latest/internals/protocol.html#version-2)
task parameters can be treated as either `args` (positional) or `kwargs` (key-word based).
Both are supported in Rusty Celery, which means you could call the Rust `add` task defined above from another language like Python in any of the following ways:

```python,noplaypen
celery_app.send_task("add", args=[1, 2])
celery_app.send_task("add", kwargs={"x": 1, "y": 2})
celery_app.send_task("add", args=[1], kwargs={"y": 2})
```

## Optional parameters

Any parameters that are [`Option<T>`](https://doc.rust-lang.org/stable/std/option/enum.Option.html) types are automatically treated as optional with a default value of `None`. For example

```rust,noplaypen
# use celery::task;
use tokio::time::{self, Duration};

#[task]
fn delay(secs: Option<u64>) {
let secs = secs.unwrap_or(10);
time::delay_for(Duration::from_secs(secs)).await
}
```

So you could call this task from Python with or without providing a value for `secs`:

```python,noplaypen
celery_app.send_task("sleep", args=[10])
celery_app.send_task("sleep")
```

## Implementation details

Under the hood a task is just a struct that implements the [`Task`](https://docs.rs/celery/*/celery/trait.Task.html) trait. The `#[task]` proc macro inspects the
function it is decorating and creates a struct with fields matching the function arguments and
then provides an implementation of the `Task` trait where the [`Task::run`](https://docs.rs/celery/*/celery/trait.Task.html#method.run) method
is the body of the function.

The `add` task from above essentially expands out to this:

```rust,noplaypen
use async_trait::async_trait;
use serde::{Serialize, Deserialize};
use celery::{Task, Error};

#[allow(non_camel_case_types)]
#[derive(Serialize, Deserialize)]
struct add {
x: i32,
y: i32,
}

impl add {
fn new(x: i32, y: i32) -> Self {
Self { x, y }
}
}

#[async_trait]
impl Task for add {
const NAME: &'static str = "add";
const ARGS: &'static [&'static str] = &["x", "y"];

type Returns = i32;

async fn run(mut self) -> Result<Self::Returns, Error> {
Ok(self.x + self.y)
}
}
```
3 changes: 3 additions & 0 deletions src/guide/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# The Rusty Celery Guide

This chapter will walk you through the details of defining and configuring tasks and applications.
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ extern crate doc_comment;

doctest!("./SUMMARY.md");
doctest!("./what-is-celery.md");
doctest!("./getting-started/index.md");
doctest!("./quick-start.md");
doctest!("./guide/index.md");
doctest!("./guide/defining-tasks.md");
doctest!("./best-practices/index.md");
doctest!("./coming-from-python/index.md");
doctest!("./additional-resources.md");
68 changes: 68 additions & 0 deletions src/quick-start.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Quick Start

Rusty Celery is provided as the [`celery`](https://crates.io/crates/celery) library on crates.io. To get started, add `celery` as a dependency to your project. Then you can define define tasks by decorating functions with the [`task`](https://docs.rs/celery/*/celery/attr.task.html) attribute:

```rust,noplaypen
# use celery::task;
#[task]
fn add(x: i32, y: i32) -> i32 {
x + y
}
```

And create a [`Celery`](https://docs.rs/celery/*/celery/struct.Celery.html) app with the [`celery_app`](https://docs.rs/celery/*/celery/macro.celery_app.html) macro:

```rust,no_run,noplaypen
# use celery::{celery_app, task, AMQPBroker};
# #[task]
# fn add(x: i32, y: i32) -> i32 {
# x + y
# }
let my_app = celery_app!(
broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
tasks = [add],
task_routes = [],
);
```

The Celery app can be used as either a producer or consumer (worker). To send tasks to a
queue for a worker to consume, use the [`Celery::send_task`](https://docs.rs/celery/*/celery/struct.Celery.html#method.send_task) method:

```rust,no_run,noplaypen
# use celery::{celery_app, task, AMQPBroker};
# #[task]
# fn add(x: i32, y: i32) -> i32 {
# x + y
# }
# #[tokio::main]
# async fn main() -> Result<(), exitfailure::ExitFailure> {
# let my_app = celery_app!(
# broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
# tasks = [add],
# task_routes = [],
# );
my_app.send_task(add::new(1, 2)).await?;
# Ok(())
# }
```

And to act as worker and consume tasks sent to a queue by a producer, use the
[`Celery::consume`](https://docs.rs/celery/*/celery/struct.Celery.html#method.consume) method:

```rust,no_run,noplaypen
# use celery::{celery_app, task, AMQPBroker};
# #[task]
# fn add(x: i32, y: i32) -> i32 {
# x + y
# }
# #[tokio::main]
# async fn main() -> Result<(), exitfailure::ExitFailure> {
# let my_app = celery_app!(
# broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
# tasks = [add],
# task_routes = [],
# );
my_app.consume().await?;
# Ok(())
# }
```