Skip to content

Commit

Permalink
Support remove queues and all-queues helpers (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy authored May 25, 2024
1 parent 58962df commit 1252419
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 21 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ jobs:
run: cargo test --locked --all-features --all-targets
env: # set this explicitly so integration tests will run
FAKTORY_URL: tcp://127.0.0.1:7419
# commands executed during the following test affect all the queues on the Faktory server,
# so we perform this test in a dedicated - isolated - step, re-using the the Faktory container
- name: cargo test --locked (queue control actions)
run: cargo test --locked --all-features --all-targets queue_control_actions_wildcard -- --include-ignored
env: # set this explicitly so integration tests will run
FAKTORY_URL: tcp://127.0.0.1:7419
# https://github.com/rust-lang/cargo/issues/6669
- name: cargo test --doc
run: cargo test --locked --all-features --doc
Expand Down
59 changes: 51 additions & 8 deletions src/proto/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,20 @@ where
},
}
}

pub(crate) async fn perform_queue_action<Q>(
&mut self,
queues: &[Q],
action: QueueAction,
) -> Result<(), Error>
where
Q: AsRef<str> + Sync,
{
self.issue(&QueueControl::new(action, queues))
.await?
.read_ok()
.await
}
}

impl<S> Client<S>
Expand Down Expand Up @@ -370,25 +384,54 @@ where
}

/// Pause the given queues.
///
/// Passing a wildcard `&["*"]` as the value of the `queues` parameter
/// will pause all the queues. To be more explicit, you may want to call [`Client::queue_pause_all`]
/// shortcut method to pause all the queues.
pub async fn queue_pause<Q>(&mut self, queues: &[Q]) -> Result<(), Error>
where
Q: AsRef<str> + Sync,
{
self.issue(&QueueControl::new(QueueAction::Pause, queues))
.await?
.read_ok()
.await
self.perform_queue_action(queues, QueueAction::Pause).await
}

/// Pause all queues.
pub async fn queue_pause_all(&mut self) -> Result<(), Error> {
self.perform_queue_action(&["*"], QueueAction::Pause).await
}

/// Resume the given queues.
///
/// Passing a wildcard `&["*"]` as the value of the `queues` parameter
/// will resume all the queues. To be more explicit, you may want to call [`Client::queue_resume_all`]
/// shortcut method to resume all the queues.
pub async fn queue_resume<Q>(&mut self, queues: &[Q]) -> Result<(), Error>
where
Q: AsRef<str> + Sync,
{
self.issue(&QueueControl::new(QueueAction::Resume, queues))
.await?
.read_ok()
.await
self.perform_queue_action(queues, QueueAction::Resume).await
}

/// Resume all queues.
pub async fn queue_resume_all(&mut self) -> Result<(), Error> {
self.perform_queue_action(&["*"], QueueAction::Resume).await
}

/// Remove the given queues.
///
/// Beware, passing a wildcard `&["*"]` as the value of the `queues` parameter
/// will **remove** all the queues. To be more explicit, you may want to call [`Client::queue_remove_all`]
/// shortcut method to remove all the queues.
pub async fn queue_remove<Q>(&mut self, queues: &[Q]) -> Result<(), Error>
where
Q: AsRef<str> + Sync,
{
self.perform_queue_action(queues, QueueAction::Remove).await
}

/// Remove all queues.
pub async fn queue_remove_all(&mut self) -> Result<(), Error> {
self.perform_queue_action(&["*"], QueueAction::Remove).await
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/proto/single/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ impl FaktoryCommand for PushBulk {
pub(crate) enum QueueAction {
Pause,
Resume,
Remove,
}

pub(crate) struct QueueControl<'a, S>
Expand All @@ -298,6 +299,7 @@ where
let command = match self.action {
QueueAction::Pause => b"QUEUE PAUSE".as_ref(),
QueueAction::Resume => b"QUEUE RESUME".as_ref(),
QueueAction::Remove => b"QUEUE REMOVE".as_ref(),
};
w.write_all(command).await?;
write_queues(w, self.queues).await?;
Expand Down
177 changes: 164 additions & 13 deletions tests/real/community.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,41 +253,192 @@ async fn fail() {
}

#[tokio::test(flavor = "multi_thread")]
async fn queue() {
async fn queue_control_actions() {
skip_check!();
let local = "pause";

let local_1 = "queue_control_pause_and_resume_1";
let local_2 = "queue_control_pause_and_resume_2";

let (tx, rx) = sync::mpsc::channel();
let tx = sync::Arc::new(sync::Mutex::new(tx));
let tx_1 = sync::Arc::new(sync::Mutex::new(tx));
let tx_2 = sync::Arc::clone(&tx_1);

let mut w = WorkerBuilder::default()
let mut worker = WorkerBuilder::default()
.hostname("tester".to_string())
.wid(WorkerId::new(local))
.register_fn(local, move |_job| {
let tx = sync::Arc::clone(&tx);
.wid(WorkerId::new(local_1))
.register_fn(local_1, move |_job| {
let tx = sync::Arc::clone(&tx_1);
Box::pin(async move { tx.lock().unwrap().send(true) })
})
.register_fn(local_2, move |_job| {
let tx = sync::Arc::clone(&tx_2);
Box::pin(async move { tx.lock().unwrap().send(true) })
})
.connect(None)
.await
.unwrap();

let mut p = Client::connect(None).await.unwrap();
p.enqueue(Job::new(local, vec![Value::from(1)]).on_queue(local))
let mut client = Client::connect(None).await.unwrap();

// enqueue three jobs
client
.enqueue_many([
Job::new(local_1, vec![Value::from(1)]).on_queue(local_1),
Job::new(local_1, vec![Value::from(1)]).on_queue(local_1),
Job::new(local_1, vec![Value::from(1)]).on_queue(local_1),
])
.await
.unwrap();
p.queue_pause(&[local]).await.unwrap();

let had_job = w.run_one(0, &[local]).await.unwrap();
// pause the queue
client.queue_pause(&[local_1]).await.unwrap();

// try to consume from that queue
let had_job = worker.run_one(0, &[local_1]).await.unwrap();
assert!(!had_job);
let worker_executed = rx.try_recv().is_ok();
assert!(!worker_executed);

p.queue_resume(&[local]).await.unwrap();
// resume that queue and ...
client.queue_resume(&[local_1]).await.unwrap();

let had_job = w.run_one(0, &[local]).await.unwrap();
// ... be able to consume from it
let had_job = worker.run_one(0, &[local_1]).await.unwrap();
assert!(had_job);
let worker_executed = rx.try_recv().is_ok();
assert!(worker_executed);

// push two jobs on the other queue (reminder: we got two jobs
// remaining on the first queue):
client
.enqueue_many([
Job::new(local_2, vec![Value::from(1)]).on_queue(local_2),
Job::new(local_2, vec![Value::from(1)]).on_queue(local_2),
])
.await
.unwrap();

// pause both queues the queues
client.queue_pause(&[local_1, local_2]).await.unwrap();

// try to consume from them
assert!(!worker.run_one(0, &[local_1]).await.unwrap());
assert!(!worker.run_one(0, &[local_2]).await.unwrap());
assert!(!rx.try_recv().is_ok());

// now, resume the queues and ...
client.queue_resume(&[local_1, local_2]).await.unwrap();

// ... be able to consume from both of them
assert!(worker.run_one(0, &[local_1]).await.unwrap());
assert!(rx.try_recv().is_ok());
assert!(worker.run_one(0, &[local_2]).await.unwrap());
assert!(rx.try_recv().is_ok());

// let's inspect the sever state
let server_state = client.info().await.unwrap();
let queues = &server_state.get("faktory").unwrap().get("queues").unwrap();
assert_eq!(*queues.get(local_1).unwrap(), 1); // 1 job remaining
assert_eq!(*queues.get(local_2).unwrap(), 1); // also 1 job remaining

// let's now remove the queues
client.queue_remove(&[local_1, local_2]).await.unwrap();

// though there _was_ a job in each queue, consuming from
// the removed queues will not yield anything
assert!(!worker.run_one(0, &[local_1]).await.unwrap());
assert!(!worker.run_one(0, &[local_2]).await.unwrap());
assert!(!rx.try_recv().is_ok());

// let's inspect the sever state again
let server_state = client.info().await.unwrap();
let queues = &server_state.get("faktory").unwrap().get("queues").unwrap();
// our queue are not even mentioned in the server report:
assert!(queues.get(local_1).is_none());
assert!(queues.get(local_2).is_none());
}

// Run the following test with:
// FAKTORY_URL=tcp://127.0.0.1:7419 cargo test --locked --all-features --all-targets queue_control_actions_wildcard -- --include-ignored
#[tokio::test(flavor = "multi_thread")]
#[ignore = "this test requires a dedicated test run since the commands being tested will affect all queues on the Faktory server"]
async fn queue_control_actions_wildcard() {
skip_check!();

let local_1 = "queue_control_wildcard_1";
let local_2 = "queue_control_wildcard_2";

let (tx, rx) = sync::mpsc::channel();
let tx_1 = sync::Arc::new(sync::Mutex::new(tx));
let tx_2 = sync::Arc::clone(&tx_1);

let mut worker = WorkerBuilder::default()
.hostname("tester".to_string())
.wid(WorkerId::new(local_1))
.register_fn(local_1, move |_job| {
let tx = sync::Arc::clone(&tx_1);
Box::pin(async move { tx.lock().unwrap().send(true) })
})
.register_fn(local_2, move |_job| {
let tx = sync::Arc::clone(&tx_2);
Box::pin(async move { tx.lock().unwrap().send(true) })
})
.connect(None)
.await
.unwrap();

let mut client = Client::connect(None).await.unwrap();

// enqueue two jobs on each queue
client
.enqueue_many([
Job::new(local_1, vec![Value::from(1)]).on_queue(local_1),
Job::new(local_1, vec![Value::from(1)]).on_queue(local_1),
Job::new(local_2, vec![Value::from(1)]).on_queue(local_2),
Job::new(local_2, vec![Value::from(1)]).on_queue(local_2),
])
.await
.unwrap();

// pause all queues the queues
client.queue_pause_all().await.unwrap();

// try to consume from queues
assert!(!worker.run_one(0, &[local_1]).await.unwrap());
assert!(!worker.run_one(0, &[local_2]).await.unwrap());
assert!(!rx.try_recv().is_ok());

// now, resume all the queues and ...
client.queue_resume_all().await.unwrap();

// ... be able to consume from both of them
assert!(worker.run_one(0, &[local_1]).await.unwrap());
assert!(rx.try_recv().is_ok());
assert!(worker.run_one(0, &[local_2]).await.unwrap());
assert!(rx.try_recv().is_ok());

// let's inspect the sever state
let server_state = client.info().await.unwrap();
let queues = &server_state.get("faktory").unwrap().get("queues").unwrap();
assert_eq!(*queues.get(local_1).unwrap(), 1); // 1 job remaining
assert_eq!(*queues.get(local_2).unwrap(), 1); // also 1 job remaining

// let's now remove all the queues
client.queue_remove_all().await.unwrap();

// though there _was_ a job in each queue, consuming from
// the removed queues will not yield anything
assert!(!worker.run_one(0, &[local_1]).await.unwrap());
assert!(!worker.run_one(0, &[local_2]).await.unwrap());
assert!(!rx.try_recv().is_ok());

// let's inspect the sever state again
let server_state = client.info().await.unwrap();
let queues = &server_state.get("faktory").unwrap().get("queues").unwrap();

// our queue are not even mentioned in the server report:
assert!(queues.get(local_1).is_none());
assert!(queues.get(local_2).is_none());
}

#[tokio::test(flavor = "multi_thread")]
Expand Down

0 comments on commit 1252419

Please sign in to comment.