Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,24 @@ use workers::Runner;
use std::time::Duration;

let runner = Runner::new(connection_pool, app_context)
.register_job_type::<SendEmailJob>()
.configure_queue("emails", |queue| {
.register::<ReadRedditJob>() // Simple registration
.register_with::<BossIsWatchingJob, _>(|queue| { // With queue config. Note the omitted generic parameter
queue.num_workers(4)
.poll_interval(Duration::from_millis(100))
.archive_completed_jobs(true)
})
.configure_default_queue(|queue| {
queue.num_workers(2)
.poll_interval(Duration::from_secs(5))
.jitter(Duration::from_millis(500))
});

let handle = runner.start();
handle.wait_for_shutdown().await;
```

> [!TIP]
> When using `register_with`, always use `_` as the second generic parameter to let Rust infer the closure type!

### Enqueuing Jobs

```rust,ignore
Expand Down
6 changes: 3 additions & 3 deletions examples/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ async fn main() -> Result<()> {

// Create runner with archiving enabled for important jobs
let runner = Runner::new(pool.clone(), ())
.register_job_type::<NotificationJob>()
.register_job_type::<PaymentJob>()
.configure_queue("default", |queue| {
.register::<NotificationJob>()
.register::<PaymentJob>()
.configure_default_queue(|queue| {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still looks a bit weird.

In the two lines before we talk about jobs and here we talk about a queue now. Maybe we can also use the register job syntax for the default queue as well for consistency? I don't know if that makes sense.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks weird to me too, but I attribute that to there being a default queue to begin with...
If we get rid of that and let people explicitly configure each queue, the weirdness would go away.
That being said, there will still be some iffiness due to the fact that multiple jobs can be in the same queue, and that's something I do not know at this moment whether to keep or try to reimagine.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's definitely weird. I don't think there should be a notion of a "default queue." Will open an issue for that.

queue
.num_workers(2)
.poll_interval(Duration::from_millis(100))
Expand Down
38 changes: 29 additions & 9 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,47 @@ impl<Context: Clone + Send + Sync + 'static> Runner<Context> {
}

/// Register a new job type for this job runner.
pub fn register_job_type<J: BackgroundJob<Context = Context>>(mut self) -> Self {
pub fn register<J: BackgroundJob<Context = Context>>(mut self) -> Self {
let queue = self.queues.entry(J::QUEUE.into()).or_default();
queue.job_registry.register::<J>();
self
}

/// Adjust the configuration of the [`DEFAULT_QUEUE`] queue.
pub fn configure_default_queue<F>(self, f: F) -> Self
/// Register a new job type and configure its queue.
///
/// # Example
///
/// ```rust,ignore
/// use workers::Runner;
/// use std::time::Duration;
///
/// let runner = Runner::new(pool, context)
/// .register_with::<EmailJob, _>(|queue| {
/// queue.num_workers(4)
/// .poll_interval(Duration::from_millis(500))
/// .archive_completed_jobs(true)
/// });
/// ```
///
/// **Note:** Use `_` as the second generic parameter to let Rust infer the closure type.
pub fn register_with<J, F>(mut self, f: F) -> Self
where
F: FnOnce(&mut Queue<Context>) -> &Queue<Context>,
J: BackgroundJob<Context = Context>,
F: FnOnce(&mut Queue<Context>) -> &mut Queue<Context>,
{
self.configure_queue(DEFAULT_QUEUE, f)
let queue = self.queues.entry(J::QUEUE.into()).or_default();
f(queue);
queue.job_registry.register::<J>();
self
}

/// Adjust the configuration of a queue. If the queue does not exist,
/// it will be created.
pub fn configure_queue<F>(mut self, name: &str, f: F) -> Self
/// Adjust the configuration of the [`DEFAULT_QUEUE`] queue.
pub fn configure_default_queue<F>(mut self, f: F) -> Self
where
F: FnOnce(&mut Queue<Context>) -> &Queue<Context>,
{
f(self.queues.entry(name.into()).or_default());
let default_queue = self.queues.entry(DEFAULT_QUEUE.into()).or_default();
f(default_queue);
self
}

Expand Down
21 changes: 10 additions & 11 deletions tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ async fn jobs_are_locked_when_fetched() -> anyhow::Result<()> {

let (pool, _container) = test_utils::setup_test_db().await?;

let runner = test_utils::create_test_runner(pool.clone(), test_context.clone())
.register_job_type::<TestJob>();
let runner =
test_utils::create_test_runner(pool.clone(), test_context.clone()).register::<TestJob>();

let job_id = assert_some!(TestJob.enqueue(&pool).await?);

Expand Down Expand Up @@ -203,7 +203,7 @@ async fn jobs_are_deleted_when_successfully_run() -> anyhow::Result<()> {

let (pool, _container) = test_utils::setup_test_db().await?;

let runner = test_utils::create_test_runner(pool.clone(), ()).register_job_type::<TestJob>();
let runner = test_utils::create_test_runner(pool.clone(), ()).register::<TestJob>();

assert_eq!(remaining_jobs(&pool).await?, 0);

Expand Down Expand Up @@ -243,8 +243,8 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() -> anyhow:

let (pool, _container) = test_utils::setup_test_db().await?;

let runner = test_utils::create_test_runner(pool.clone(), test_context.clone())
.register_job_type::<TestJob>();
let runner =
test_utils::create_test_runner(pool.clone(), test_context.clone()).register::<TestJob>();

TestJob.enqueue(&pool).await?;

Expand Down Expand Up @@ -289,7 +289,7 @@ async fn panicking_in_jobs_updates_retry_counter() -> anyhow::Result<()> {

let (pool, _container) = test_utils::setup_test_db().await?;

let runner = test_utils::create_test_runner(pool.clone(), ()).register_job_type::<TestJob>();
let runner = test_utils::create_test_runner(pool.clone(), ()).register::<TestJob>();

let job_id = assert_some!(TestJob.enqueue(&pool).await?);

Expand Down Expand Up @@ -352,7 +352,7 @@ async fn jobs_can_be_deduplicated() -> anyhow::Result<()> {
let (pool, _container) = test_utils::setup_test_db().await?;

let runner = Runner::new(pool.clone(), test_context.clone())
.register_job_type::<TestJob>()
.register::<TestJob>()
.shutdown_when_queue_empty();

// Enqueue first job
Expand Down Expand Up @@ -409,8 +409,7 @@ async fn jitter_configuration_affects_polling() -> anyhow::Result<()> {

// Test that jitter configuration is accepted and compiles
let runner = Runner::new(pool.clone(), ())
.register_job_type::<TestJob>()
.configure_queue("default", |queue| {
.register_with::<TestJob, _>(|queue| {
queue
.num_workers(1)
.poll_interval(Duration::from_millis(100))
Expand Down Expand Up @@ -454,8 +453,8 @@ async fn archive_functionality_works() -> anyhow::Result<()> {

// Configure runner with archiving enabled
let runner = Runner::new(pool.clone(), ())
.register_job_type::<TestJob>()
.configure_queue("default", |queue| {
.register::<TestJob>()
.configure_default_queue(|queue| {
queue.num_workers(1).archive_completed_jobs(true) // Enable archiving
})
.shutdown_when_queue_empty();
Expand Down