Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unique and expiring jobs (Enterprise Faktory) #45

Merged
merged 19 commits into from
Jan 7, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions .github/workflows/ent.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# This is a CI workflow that runs the test against Enterprise Edition of Faktory.
# The binary (for macos only) is avalable for download for testing purposes with each Faktory release.
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
permissions:
contents: read
on:
push:
branches:
- main
pull_request:
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
name: enterprise
jobs:
test:
runs-on: macos-latest
env:
FAKTORY_VERSION: 1.8.0
steps:
- uses: actions/checkout@v4
- name: Install redis
run: brew install redis
- name: Download Faktory binary
run: |
wget -O faktory.tbz https://github.com/contribsys/faktory/releases/download/v${{ env.FAKTORY_VERSION }}/faktory-ent_${{ env.FAKTORY_VERSION }}.macos.amd64.tbz
tar xfv faktory.tbz
cp ./faktory /usr/local/bin
- name: Launch Faktory in background
run: faktory &
- name: Install stable
uses: dtolnay/rust-toolchain@stable
- name: cargo generate-lockfile
if: hashFiles('Cargo.lock') == ''
run: cargo generate-lockfile
- name: Run tests
env:
FAKTORY_URL: tcp://127.0.0.1:7419
FAKTORY_ENT: true
run: cargo test --locked --features ent --all-targets
doc:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install stable
uses: dtolnay/rust-toolchain@stable
- name: Run doc tests
run: cargo test --features ent --doc
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ categories = ["api-bindings", "asynchronous", "network-programming"]
default = []
tls = ["native-tls"]
binaries = ["clap"]
ent = []

[dependencies]
serde_json = "1.0"
Expand Down
130 changes: 130 additions & 0 deletions src/proto/single/ent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use chrono::{DateTime, Utc};

use crate::JobBuilder;

impl JobBuilder {
/// When Faktory should expire this job.
///
/// Faktory Enterprise allows for expiring jobs. This is setter for `expires_at`
/// field in the job's custom data.
/// ```
/// use faktory::JobBuilder;
/// use chrono::{Duration, Utc};
///
/// let _job = JobBuilder::new("order")
/// .args(vec!["ISBN-13:9781718501850"])
/// .expires_at(Utc::now() + Duration::hours(1))
/// .build();
/// ```
pub fn expires_at(&mut self, dt: DateTime<Utc>) -> &mut Self {
self.add_to_custom_data(
"expires_at".into(),
dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true),
)
}

/// In what period of time from now (UTC) the Faktory should expire this job.
///
/// Use this setter when you are unwilling to populate the `expires_at` field in custom
/// options with some exact date and time, e.g.:
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
/// ```
/// use faktory::JobBuilder;
/// use chrono::Duration;
///
/// let _job = JobBuilder::new("order")
/// .args(vec!["ISBN-13:9781718501850"])
/// .expires_in(Duration::weeks(1))
/// .build();
/// ```
pub fn expires_in(&mut self, ttl: chrono::Duration) -> &mut Self {
self.expires_at(Utc::now() + ttl)
}

/// How long the Faktory will not accept duplicates of this job.
///
/// The job will be considered unique for kind-args-queue combination.
/// The uniqueness is best-effort, rather than a guarantee. Check out
/// the Enterprise Faktory [docs](https://github.com/contribsys/faktory/wiki/Ent-Unique-Jobs)
/// for details on how scheduling, retries and other features live together with `unique_for`.
pub fn unique_for(&mut self, secs: usize) -> &mut Self {
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
self.add_to_custom_data("unique_for".into(), secs)
}

/// Remove unique lock for this job right before the job starts executing.
pub fn unique_until_start(&mut self) -> &mut Self {
self.add_to_custom_data("unique_until".into(), "start")
}

/// Do not remove unique lock for this job until it successfully finishes.
///
/// Sets `unique_until` on the Job's custom hash to `success`, which is Faktory's default.
pub fn unique_until_success(&mut self) -> &mut Self {
self.add_to_custom_data("unique_until".into(), "success")
}
}

#[cfg(test)]
mod test {
use chrono::{DateTime, Utc};

use crate::JobBuilder;

fn half_stuff() -> JobBuilder {
let mut job = JobBuilder::new("order");
job.args(vec!["ISBN-13:9781718501850"]);
job
}

// Returns date and time string in the format expected by Faktory.
// Serializes date and time into a string as per RFC 3338 and ISO 8601
// with nanoseconds precision and 'Z' literal for the timzone column.
fn to_iso_string(dt: DateTime<Utc>) -> String {
dt.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)
}

#[test]
fn test_expiration_feature_for_enterprise_faktory() {
let five_min = chrono::Duration::seconds(300);
let exp_at = Utc::now() + five_min;
let job1 = half_stuff().expires_at(exp_at).build();
let stored = job1.custom.get("expires_at").unwrap();
assert_eq!(stored, &serde_json::Value::from(to_iso_string(exp_at)));

let job2 = half_stuff().expires_in(five_min).build();
assert!(job2.custom.get("expires_at").is_some());
}

#[test]
fn test_uniqueness_faeture_for_enterprise_faktory() {
let job = half_stuff().unique_for(60).unique_until_start().build();
let stored_unique_for = job.custom.get("unique_for").unwrap();
let stored_unique_until = job.custom.get("unique_until").unwrap();
assert_eq!(stored_unique_for, &serde_json::Value::from(60));
assert_eq!(stored_unique_until, &serde_json::Value::from("start"));

let job = half_stuff().unique_for(60).unique_until_success().build();

let stored_unique_until = job.custom.get("unique_until").unwrap();
assert_eq!(stored_unique_until, &serde_json::Value::from("success"));
}

#[test]
fn test_same_purpose_setters_applied_simultaneously() {
let expires_at1 = Utc::now() + chrono::Duration::seconds(300);
let expires_at2 = Utc::now() + chrono::Duration::seconds(300);
let job = half_stuff()
.unique_for(60)
.add_to_custom_data("unique_for".into(), 600)
.unique_for(40)
.add_to_custom_data("expires_at".into(), to_iso_string(expires_at1))
.expires_at(expires_at2)
.build();
let stored_unique_for = job.custom.get("unique_for").unwrap();
assert_eq!(stored_unique_for, &serde_json::Value::from(40));
let stored_expires_at = job.custom.get("expires_at").unwrap();
assert_eq!(
stored_expires_at,
&serde_json::Value::from(to_iso_string(expires_at2))
)
}
}
25 changes: 24 additions & 1 deletion src/proto/single/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ mod cmd;
mod resp;
mod utils;

#[cfg(feature = "ent")]
mod ent;

use crate::error::Error;

pub use self::cmd::*;
Expand Down Expand Up @@ -145,7 +148,7 @@ pub struct Job {
}

impl JobBuilder {
/// Create a new builder for a [`Job`]
/// Creates a new builder for a [`Job`]
pub fn new(kind: impl Into<String>) -> JobBuilder {
JobBuilder {
kind: Some(kind.into()),
Expand All @@ -162,6 +165,13 @@ impl JobBuilder {
self
}

/// Sets arbitrary key-value pairs to this job's custom data hash.
pub fn add_to_custom_data(&mut self, k: String, v: impl Into<serde_json::Value>) -> &mut Self {
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
let custom = self.custom.get_or_insert_with(HashMap::new);
custom.insert(k, v.into());
self
}

/// Builds a new [`Job`] from the parameters of this builder.
pub fn build(&self) -> Job {
self.try_build()
Expand Down Expand Up @@ -304,4 +314,17 @@ mod test {
assert_ne!(job2.jid, job3.jid);
assert_ne!(job2.created_at, job3.created_at);
}

#[test]
fn test_arbitrary_custom_data_setter() {
let job = JobBuilder::new("order")
.args(vec!["ISBN-13:9781718501850"])
.add_to_custom_data("arbitrary_key".into(), "arbitrary_value")
.build();

assert_eq!(
job.custom.get("arbitrary_key").unwrap(),
&serde_json::Value::from("arbitrary_value")
);
}
}
File renamed without changes.
Loading
Loading