You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
While Python Celery provides a CLI that you can use to run a worker, in Rust you'll have to implement your own worker binary. However this is a lot easier than it sounds. At a minimum you just need to initialize your [`Celery`](https://docs.rs/celery/*/celery/struct.Celery.html) application, define and register your tasks, and run the [`Celery::consume`](https://docs.rs/celery/*/celery/struct.Celery.html#method.consume) method within your `main` function.
34
35
35
-
Note that `Celery::consume` is an `async` method though, which means you need an async runtime to execute it. Luckily this is provided by [`tokio`](https://docs.rs/tokio/*/tokio/) and is as simple as declaring your `main` function `async` and decorating it with the `tokio::main` macro.
36
+
Note that `Celery::consume` is an `async` method, which means you need an async runtime to execute it. Luckily this is provided by [`tokio`](https://docs.rs/tokio/*/tokio/) and is as simple as declaring your `main` function `async` and decorating it with the `tokio::main` macro.
36
37
37
38
Here is a complete example of a worker application:
Copy file name to clipboardExpand all lines: src/guide/defining-tasks.md
+16-31Lines changed: 16 additions & 31 deletions
Original file line number
Diff line number
Diff line change
@@ -5,12 +5,16 @@ A **task** represents a unit of work that a `Celery` app can produce or consume.
5
5
The recommended way to define a task is by decorating a function with the [`task`](https://docs.rs/celery/*/celery/attr.task.html) attribute macro:
6
6
7
7
```rust,noplaypen
8
+
use celery::TaskResult;
9
+
8
10
#[celery::task]
9
-
fn add(x: i32, y: i32) -> i32 {
10
-
x + y
11
+
fn add(x: i32, y: i32) -> TaskResult<i32> {
12
+
Ok(x + y)
11
13
}
12
14
```
13
15
16
+
If the function has a return value the return type must be a [`TaskResult<T>`](https://docs.rs/celery/*/celery/task/type.TaskResult.html).
17
+
14
18
Under the hood a task is just a struct that implements the [`Task`](https://docs.rs/celery/*/celery/task/trait.Task.html) trait. When you decorate a function with the task macro, this creates a struct and implements the `Task` trait so that [`Task::run`](https://docs.rs/celery/*/celery/task/trait.Task.html#method.run) calls the function you've defined.
15
19
16
20
The macro accepts a number of [optional parameters](https://docs.rs/celery/*/celery/attr.task.html#parameters).
@@ -22,49 +26,30 @@ use tokio::time::{self, Duration};
22
26
23
27
#[celery::task(name = "sleep", timeout = 5)]
24
28
async fn delay(secs: u64) {
25
-
time::delay_for(Duration::from_secs(secs)).await
29
+
time::delay_for(Duration::from_secs(secs)).await;
26
30
}
27
31
```
28
32
29
33
## Error handling
30
34
31
-
When a task executes, i.e. when the `Task::run` method is called, it returns a [`Result<T, TaskError>`](https://docs.rs/celery/*/celery/task/type.TaskResult.html) where `T` is whatever type the function you define returns (like `i32` in the `add` task above, or `()` in the `delay` task). So when the `add` task executes, an `Ok(i32)` will be returned.
32
-
33
-
The reason `Task::run` has to return a `Result` is so the worker executing the task can know when the task has failed. When an `Err(TaskError)` is returned, the worker considers the task failed and may send it back to the broker to be retried.
35
+
When a task executes, i.e. when the `Task::run` method is called, it returns a [`TaskResult<T>`](https://docs.rs/celery/*/celery/task/type.TaskResult.html) which is just a `Result<T, TaskError>`. When an `Err(TaskError)` is returned, the worker considers the task failed and may send it back to the broker to be retried.
34
36
35
-
A worker will generally treat certain [`TaskError`](https://docs.rs/celery/*/celery/error/enum.TaskError.html) variants differently. So when your task has points of failure, such as in the `read_some_file` example below, you'll need to coerce those possible error types to the appropriate `TaskError` variant and propogate them upwards:
37
+
A worker treats certain [`TaskError`](https://docs.rs/celery/*/celery/error/enum.TaskError.html) variants differently. So when your task has points of failure, such as in the `read_some_file` example below, you'll need to coerce those possible error types to the appropriate `TaskError` variant and propogate them upwards:
36
38
37
39
```rust,noplaypen
38
-
use celery::error::TaskResultExt;
40
+
use celery::{TaskResult, TaskResultExt};
39
41
40
42
#[celery::task]
41
-
async fn read_some_file() -> String {
43
+
async fn read_some_file() -> TaskResult<String> {
42
44
tokio::fs::read_to_string("some_file")
43
45
.await
44
-
.with_unexpected_err("File does not exist")?
46
+
.with_unexpected_err("File does not exist")
45
47
}
46
48
```
47
49
48
50
Here `tokio::fs::read_to_string("some_file").await` produces a [tokio::io::Result](`https://docs.rs/tokio/0.2.13/tokio/io/type.Result.html`), so we use the helper method `.with_unexpected_err` from the [`TaskResultExt`](https://docs.rs/celery/*/celery/error/trait.TaskResultExt.html) trait to convert this into a `TaskError::UnexpectedError` and then apply the [`?`](https://doc.rust-lang.org/book/ch09-02-recoverable-errors-with-result.html#propagating-errors) operator to propogate it upwards.
49
51
50
-
> There are two error kinds in particular that are meant as catch-alls for any other type of error that could arise in your task: [`TaskError::UnexpectedError`](https://docs.rs/celery/*/celery/error/enum.TaskError.html#variant.UnexpectedError) and [`TaskError::ExpectedError`](https://docs.rs/celery/*/celery/error/enum.TaskError.html#variant.ExpectedError). The latter should be used for errors that will occasionally happen due to factors outside of your control - such as a third party service being temporarily unavailable - while `UnexpectedError` should be reserved to indicate a bug or that a critical resource is missing.
51
-
52
-
It's important to note that the return type of the `read_some_file` function is not a `Result` type. In fact, **the return type of the decorated function should never be a `Result` type.** The return type should always be the type that would result from a *successful* execution, and so your function should always return that bare type instead of an `Ok` or `Err`.
53
-
54
-
If you're familiar with the `?` operator, you may be wondering how we can use this within a function that is marked as returning `String` and not `Result<String, _>`. The reason this works is because the `task` attribute macro modifies the body of function by wrapping it in `Ok({ ... })` and changing the return type to a `Result`.
55
-
56
-
So in this example the `read_some_file` function is turned into something like this:
There are two error kinds in particular that are meant as catch-alls for any other type of error that could arise in your task: [`TaskError::UnexpectedError`](https://docs.rs/celery/*/celery/error/enum.TaskError.html#variant.UnexpectedError) and [`TaskError::ExpectedError`](https://docs.rs/celery/*/celery/error/enum.TaskError.html#variant.ExpectedError). The latter should be used for errors that will occasionally happen due to factors outside of your control - such as a third party service being temporarily unavailable - while `UnexpectedError` should be reserved to indicate a bug or that a critical resource is missing.
68
53
69
54
## Positional vs keyword parameters
70
55
@@ -88,7 +73,7 @@ use tokio::time::{self, Duration};
88
73
#[celery::task]
89
74
async fn delay(secs: Option<u64>) {
90
75
let secs = secs.unwrap_or(10);
91
-
time::delay_for(Duration::from_secs(secs)).await
76
+
time::delay_for(Duration::from_secs(secs)).await;
92
77
}
93
78
```
94
79
@@ -114,7 +99,7 @@ use tokio::time::{self, Duration};
In summary, tasks are easily defined by decorating a function with the `#[celery::task]` macro. Internally the function is wrapped in a struct that implements the `Task` trait, and the return value of the function is wrapped in a `Result<T, celery::error::TaskError>`. This makes it valid to use `?` directly within your function.
119
+
In summary, tasks are easily defined by decorating a function with the `#[celery::task]` macro. If the function returns anything the return type has to be a `TaskResult<T>`. Internally the function is wrapped in a struct that implements the `Task` trait.
135
120
136
121
The quickest way to propogate expected or unexpected errors from within your task is by using `.with_expected_err("...")?` or `.with_unexpected_err("...")?`, respectively, on the `Result`.
Copy file name to clipboardExpand all lines: src/quick-start.md
+13-8Lines changed: 13 additions & 8 deletions
Original file line number
Diff line number
Diff line change
@@ -3,18 +3,21 @@
3
3
Rusty Celery is provided as the [`celery`](https://crates.io/crates/celery) library on crates.io. To get started, add `celery` as a dependency to your project. Then you can define tasks by decorating functions with the [`task`](https://docs.rs/celery/*/celery/attr.task.html) attribute:
4
4
5
5
```rust,noplaypen
6
+
use celery::TaskResult;
7
+
6
8
#[celery::task]
7
-
fn add(x: i32, y: i32) -> i32 {
8
-
x + y
9
+
fn add(x: i32, y: i32) -> TaskResult<i32> {
10
+
Ok(x + y)
9
11
}
10
12
```
11
13
12
14
And create a [`Celery`](https://docs.rs/celery/*/celery/struct.Celery.html) app with the [`app`](https://docs.rs/celery/*/celery/macro.app.html) macro:
0 commit comments