Skip to content

Commit 03b1a8a

Browse files
authored
improve docs (#13)
* improve docs * fix
1 parent 3d82afd commit 03b1a8a

File tree

6 files changed

+59
-47
lines changed

6 files changed

+59
-47
lines changed

Cargo.lock

Lines changed: 8 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
- [Quick Start](quick-start.md)
66
- [Going Deeper](guide/index.md)
77
- [Defining Tasks](guide/defining-tasks.md)
8+
- [Running Workers](guide/running-workers.md)
89
- [Best Practices](best-practices/index.md)
910
- [Coming from Python?](coming-from-python/index.md)
1011

src/best-practices/index.md

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@
22

33
## Acks early vs acks late
44

5-
If you're familiar with Python Celery, [the answer](https://docs.celeryproject.org/en/stable/faq.html#should-i-use-retry-or-acks-late) of whether to set `acks_late` to `true` or `false` is the same: it depends.
5+
Tasks are only removed from a queue when they are acknowledged ("acked") by the worker that received them. The [`acks_late`](https://docs.rs/celery/*/celery/struct.CeleryBuilder.html#method.acks_late) setting determines when a worker will ack a task. When set to `true`, tasks are acked after the worker finishes executing them. When set to `false`, they are executed right before the worker starts executing them.
66

7-
The effect of acking late is that if a worker were to crash, any tasks that it's currently executing will be retried automatically by the next available worker. So if your tasks are [idempotent](https://docs.celeryproject.org/en/stable/glossary.html#term-idempotent) then it's recommended that you [set `acks_late` to `true`](https://docs.celeryproject.org/en/stable/glossary.html#term-idempotent). On the other hand, if retrying tasks that have potentially already executed could cause more damage than not retrying them, you should not ack late.
7+
The default of `acks_late` is `false`, however if your tasks are [idempotent](https://docs.celeryproject.org/en/stable/glossary.html#term-idempotent) it's strongly recommended that you set `acks_late` to `true`. This has two major benefits.
8+
9+
First, it ensures that if a worker were to crash, any tasks currently executing will be retried automatically by the next available worker.
10+
11+
Second, it provides a better [back pressure](https://medium.com/@jayphelps/backpressure-explained-the-flow-of-data-through-software-2350b3e77ce7) mechanism when used in conjunction with a suitable [`prefetch_count`](https://docs.rs/celery/*/celery/struct.CeleryBuilder.html#method.prefetch_count) (see below).
812

913
## Prefetch count
1014

@@ -14,8 +18,12 @@ When initializing your Rust Celery app it's recommended that you [set the `prefe
1418
1519
The `prefetch_count` determines how many un-acked tasks (ignoring those with a future ETA) that a worker can hold onto at any point in time. Having `prefetch_count` too low or too high can create a bottleneck.
1620

17-
If the number is set too low, workers could be under-utilized. If the number is set too high, workers could be hogging tasks that they can't execute yet.
21+
If the number is set too low, workers could be under-utilized. If the number is set too high, workers could be hogging tasks that they can't execute yet, or worse: they could run out of memory from receiving too many tasks and crash.
1822

1923
Unfortunately finding an optimal prefetch count is easier said than done. It depends on a lot of factors, such as the hardware your workers are running on, the task throughput, and whether your tasks are more CPU-bound or IO-bound.
2024

21-
The last reason is especially important. A worker running on even a single CPU can probably handle hundreds, if not thousands, of (non-blocking) IO-bound tasks at once. But a worker consuming CPU-bound tasks is essentially limited to executing one task per CPU core. Therefore a good starting point for `prefetch_count` would be either `100 x NUM_CPUS` for IO-bound tasks or `m * NUM_CPUS` for CPU-bound tasks, where `m` is a small integer between 1 and 4.
25+
The last reason is especially important. A worker running on even a single CPU can probably handle hundreds, if not thousands, of (non-blocking) IO-bound tasks at once. But a worker consuming CPU-bound tasks is essentially limited to executing one task per CPU core. Therefore a good starting point for `prefetch_count` would be either `100 x NUM_CPUS` for IO-bound tasks or `2 * NUM_CPUS` for CPU-bound tasks.
26+
27+
## Consuming blocking / CPU-bound tasks
28+
29+
If your tasks are CPU-bound (or otherwise blocking), it's recommended that you use a multi-threaded async runtime, such as [the one](https://docs.rs/tokio/0.2.16/tokio/runtime/index.html#threaded-scheduler) provided by `tokio`. Within the task body you can then call [`tokio::task::block_in_place`](https://docs.rs/tokio/0.2.16/tokio/task/index.html#block_in_place) where appropriate.

src/coming-from-python/index.md

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -29,45 +29,6 @@ my_app.register_task::<add>().await.unwrap();
2929
# }
3030
```
3131

32-
## Running a worker
33-
34-
While Python Celery provides a CLI that you can use to run a worker, in Rust you'll have to implement your own worker binary. However this is a lot easier than it sounds. At a minimum you just need to initialize your [`Celery`](https://docs.rs/celery/*/celery/struct.Celery.html) application, define and register your tasks, and run the [`Celery::consume`](https://docs.rs/celery/*/celery/struct.Celery.html#method.consume) method within your `main` function.
35-
36-
Note that `Celery::consume` is an `async` method, which means you need an async runtime to execute it. Luckily this is provided by [`tokio`](https://docs.rs/tokio/*/tokio/) and is as simple as declaring your `main` function `async` and decorating it with the `tokio::main` macro.
37-
38-
Here is a complete example of a worker application:
39-
40-
```rust,no_run,noplaypen
41-
#![allow(non_upper_case_globals)]
42-
43-
use celery::TaskResult;
44-
use exitfailure::ExitFailure;
45-
46-
#[celery::task]
47-
fn add(x: i32, y: i32) -> TaskResult<i32> {
48-
Ok(x + y)
49-
}
50-
51-
#[tokio::main]
52-
async fn main() -> Result<(), ExitFailure> {
53-
env_logger::init();
54-
55-
let celery_app = celery::app!(
56-
broker = AMQP { std::env::var("AMQP_ADDR").unwrap() },
57-
tasks = [add],
58-
task_routes = [],
59-
prefetch_count = 2,
60-
default_queue = "celery-rs",
61-
);
62-
63-
celery_app.consume().await?;
64-
65-
Ok(())
66-
}
67-
```
68-
69-
The `consume` method will listen for `SIGINT` and `SIGTERM` signals - just like a Python worker - and will try to finish all pending tasks before shutting down unless it receives another signal.
70-
7132
## Time limits vs timeout
7233

7334
In Python you configure tasks to have a [soft or hard time limit](https://docs.celeryproject.org/en/latest/userguide/workers.html#time-limits). A soft time limit allows a task to clean up after itself if it runs over the limit, while a hard limit will force terminate the task.

src/guide/running-workers.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Running Workers
2+
3+
While the Python version of Celery provides a CLI that you can use to run a worker, in Rust you'll have to implement your own worker binary. However this is a lot easier than it sounds. At a minimum you just need to initialize your [`Celery`](https://docs.rs/celery/*/celery/struct.Celery.html) application, define and register your tasks, and run the [`Celery::consume`](https://docs.rs/celery/*/celery/struct.Celery.html#method.consume) method within an async executor.
4+
5+
Here is a complete example of a worker application:
6+
7+
```rust,no_run,noplaypen
8+
#![allow(non_upper_case_globals)]
9+
10+
use celery::TaskResult;
11+
use exitfailure::ExitFailure;
12+
13+
#[celery::task]
14+
fn add(x: i32, y: i32) -> TaskResult<i32> {
15+
Ok(x + y)
16+
}
17+
18+
#[tokio::main]
19+
async fn main() -> Result<(), ExitFailure> {
20+
env_logger::init();
21+
22+
let celery_app = celery::app!(
23+
broker = AMQP { std::env::var("AMQP_ADDR").unwrap() },
24+
tasks = [add],
25+
task_routes = [],
26+
prefetch_count = 2,
27+
acks_late = true,
28+
default_queue = "celery-rs",
29+
);
30+
31+
celery_app.consume().await?;
32+
33+
Ok(())
34+
}
35+
```
36+
37+
The `consume` method will listen for `SIGINT` and `SIGTERM` signals - just like a Python worker - and will try to finish all pending tasks before shutting down unless it receives another signal.

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ doctest!("./what-is-celery.md");
66
doctest!("./quick-start.md");
77
doctest!("./guide/index.md");
88
doctest!("./guide/defining-tasks.md");
9+
doctest!("./guide/running-workers.md");
910
doctest!("./best-practices/index.md");
1011
doctest!("./coming-from-python/index.md");
1112
doctest!("./additional-resources.md");

0 commit comments

Comments
 (0)