Skip to content

Commit

Permalink
New methods for repeating jobs
Browse files Browse the repository at this point in the history
There are now new methods for repeating jobs multiple
times in a row once they're scheduled (e.g. at 10:30 AM,
run tihs task three times, 10 minutes apart.)

Also, times can now be specified by passing in chrono::NaiveTime
or any other type that implements TryInto<ClokwerkTime> (a newtype
wrapper around NaiveTime).

Documentation is also updated to point out that the scheduler
blocks on running jobs, so jobs that will take a long time should
find a way to move to another thread.
  • Loading branch information
mdsherry committed Jun 15, 2020
1 parent 2f1e247 commit 1061e60
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 26 deletions.
46 changes: 35 additions & 11 deletions src/intervals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,33 @@ pub trait NextTime {
fn prev<Tz: TimeZone>(&self, from: &DateTime<Tz>) -> DateTime<Tz>;
}

pub(crate) fn parse_time(s: &str) -> Option<NaiveTime> {
pub(crate) fn parse_time(s: &str) -> Result<NaiveTime, chrono::ParseError> {
NaiveTime::parse_from_str(s, "%H:%M:%S")
.or_else(|_| NaiveTime::parse_from_str(s, "%I:%M:%S %p"))
.or_else(|_| NaiveTime::parse_from_str(s, "%H:%M"))
.or_else(|_| NaiveTime::parse_from_str(s, "%I:%M %p"))
.ok()
}

/// A new-type for parsing various types into [`chrono::NaiveTime`] values.
///
/// To use your own type with [`Job::at`], impl TryFrom/TryInto for your type.
/// Notable types that have implementations are &str and NaiveTime.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
pub struct ClokwerkTime(pub NaiveTime);

impl TryFrom<NaiveTime> for ClokwerkTime {
type Error = std::convert::Infallible;
fn try_from(value: NaiveTime) -> Result<Self, Self::Error> {
Ok(ClokwerkTime(value))
}
}

impl TryFrom<&str> for ClokwerkTime {
type Error = chrono::ParseError;
fn try_from(value: &str) -> Result<Self, Self::Error> {
Ok(ClokwerkTime(parse_time(value)?))
}

}

#[derive(Debug)]
Expand All @@ -65,9 +86,9 @@ impl RunConfig {
}
}

pub fn with_time(&self, s: &str) -> Self {
pub fn with_time(&self, t: ClokwerkTime) -> Self {
RunConfig {
adjustment: Some(Adjustment::Time(parse_time(s).unwrap())),
adjustment: Some(Adjustment::Time(t.0)),
..*self
}
}
Expand All @@ -84,6 +105,7 @@ impl RunConfig {
..*self
}
}

fn apply_adjustment<Tz: TimeZone>(&self, from: &DateTime<Tz>) -> DateTime<Tz> {
match self.adjustment {
None => from.clone(),
Expand Down Expand Up @@ -138,6 +160,7 @@ fn day_of_week(i: Interval) -> usize {
}

use Interval::*;
use std::convert::{TryInto, TryFrom};
impl NextTime for Interval {
fn next<Tz: TimeZone>(&self, from: &DateTime<Tz>) -> DateTime<Tz> {
match *self {
Expand Down Expand Up @@ -499,35 +522,36 @@ mod tests {
}

use super::parse_time;
use std::convert::TryInto;
#[test]
fn test_parse_time() {
assert_eq!(
parse_time("14:52:13"),
Some(NaiveTime::from_hms(14, 52, 13))
Ok(NaiveTime::from_hms(14, 52, 13))
);
assert_eq!(
parse_time("2:52:13 pm"),
Some(NaiveTime::from_hms(14, 52, 13))
Ok(NaiveTime::from_hms(14, 52, 13))
);
assert_eq!(parse_time("14:52"), Some(NaiveTime::from_hms(14, 52, 0)));
assert_eq!(parse_time("2:52 PM"), Some(NaiveTime::from_hms(14, 52, 0)));
assert_eq!(parse_time("14:52"), Ok(NaiveTime::from_hms(14, 52, 0)));
assert_eq!(parse_time("2:52 PM"), Ok(NaiveTime::from_hms(14, 52, 0)));
}

#[test]
fn test_run_config() {
let rc = RunConfig::from_interval(1.day()).with_time("15:00");
let rc = RunConfig::from_interval(1.day()).with_time("15:00".try_into().unwrap());
let dt = DateTime::parse_from_rfc3339("2018-09-04T14:22:13-00:00").unwrap();
let next_dt = rc.next(&dt);
let expected = DateTime::parse_from_rfc3339("2018-09-04T15:00:00-00:00").unwrap();
assert_eq!(next_dt, expected);

let rc = RunConfig::from_interval(Tuesday).with_time("15:00");
let rc = RunConfig::from_interval(Tuesday).with_time("15:00".try_into().unwrap());
let dt = DateTime::parse_from_rfc3339("2018-09-04T14:22:13-00:00").unwrap();
let next_dt = rc.next(&dt);
let expected = DateTime::parse_from_rfc3339("2018-09-04T15:00:00-00:00").unwrap();
assert_eq!(next_dt, expected);

let rc = RunConfig::from_interval(Tuesday).with_time("14:00");
let rc = RunConfig::from_interval(Tuesday).with_time("14:00".try_into().unwrap());
let next_dt = rc.next(&dt);
let expected = DateTime::parse_from_rfc3339("2018-09-11T14:00:00-00:00").unwrap();
assert_eq!(next_dt, expected);
Expand Down
168 changes: 155 additions & 13 deletions src/job.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::timeprovider::{ChronoTimeProvider, TimeProvider};
use crate::{
intervals::ClokwerkTime,
timeprovider::{ChronoTimeProvider, TimeProvider},
};
use chrono::prelude::*;
use intervals::NextTime;
use std::fmt;
use std::marker::PhantomData;
use std::fmt::{self, Debug};
use std::{convert::TryInto, marker::PhantomData, thread};
use Interval;
use RunConfig;

Expand All @@ -13,6 +16,12 @@ enum RunCount {
Forever,
}

struct RepeatConfig {
repeats: usize,
repeat_interval: Interval,
repeats_left: usize,
}

/// A job to run on the scheduler.
/// Create these by calling [`Scheduler::every()`](::Scheduler::every).
pub struct Job<Tz = Local, Tp = ChronoTimeProvider>
Expand All @@ -25,6 +34,8 @@ where
last_run: Option<DateTime<Tz>>,
job: Option<Box<dyn FnMut() + Send>>,
run_count: RunCount,
separate_thread: bool,
repeat_config: Option<RepeatConfig>,
tz: Tz,
_tp: PhantomData<Tp>,
}
Expand Down Expand Up @@ -55,6 +66,8 @@ where
last_run: None,
job: None,
run_count: RunCount::Forever,
separate_thread: false,
repeat_config: None,
tz,
_tp: PhantomData,
}
Expand All @@ -68,20 +81,51 @@ where
/// Specify the time of day when a task should run, e.g.
/// ```rust
/// # extern crate clokwerk;
/// # extern crate chrono;
/// # use clokwerk::*;
/// # use clokwerk::Interval::*;
/// # use chrono::NaiveTime;
/// let mut scheduler = Scheduler::new();
/// scheduler.every(1.day()).at("14:32").run(|| println!("Tea time!"));
/// scheduler.every(Wednesday).at("6:32:21 PM").run(|| println!("Writing examples is hard"));
/// scheduler.every(Weekday).at(NaiveTime::from_hms(23, 42, 16)).run(|| println!("Also works with NaiveTime"));
/// ```
/// Times can be specified using strings, with or without seconds, and in either 24-hour or 12-hour time.
/// They can also be any other type that implements `TryInto<ClokwerkTime>`, which includes [`chrono::NaiveTime`].
/// This method will panic if TryInto fails, e.g. because the time string could not be parsed.
/// If the value comes from an untrusted source, e.g. user input, [`Job::try_at`] will return a result instead.
///
/// This method is mutually exclusive with [`Job::plus()`].
pub fn at<T>(&mut self, time: T) -> &mut Self
where
T: TryInto<ClokwerkTime>,
T::Error: Debug,
{
self.try_at(time)
.expect("Could not convert value into a time")
}

/// Identical to [`Job::at`] except that it returns a Result instead of panicking if the conversion failed.
/// ```rust
/// # extern crate clokwerk;
/// # extern crate chrono;
/// # use clokwerk::*;
/// # use clokwerk::Interval::*;
/// let mut scheduler = Scheduler::new();
/// scheduler.every(1.day()).try_at("14:32")?.run(|| println!("Tea time!"));
/// # Ok::<(), chrono::ParseError>(())
/// ```
/// Times can be specified with or without seconds, and in either 24-hour or 12-hour time.
/// Mutually exclusive with [`Job::plus()`].
pub fn at(&mut self, s: &str) -> &mut Self {
pub fn try_at<T>(&mut self, time: T) -> Result<&mut Self, T::Error>
where
T: TryInto<ClokwerkTime>,
{
{
let frequency = self.last_frequency();
*frequency = frequency.with_time(s);
*frequency = frequency.with_time(time.try_into()?);
}
self
Ok(self)
}

/// Add additional precision time to when a task should run, e.g.
Expand Down Expand Up @@ -136,19 +180,78 @@ where
}
}

/// After running once, run again with the specified interval.
///
/// ```rust
/// # extern crate clokwerk;
/// # use clokwerk::*;
/// # use clokwerk::Interval::*;
/// # fn hit_snooze() {}
/// let mut scheduler = Scheduler::new();
/// scheduler.every(Weekday)
/// .at("7:40")
/// .repeating_every(10.minutes())
/// .times(5)
/// .run(|| hit_snooze());
/// ```
/// will hit snooze five times every morning, at 7:40, 7:50, 8:00, 8:10 and 8:20.
///
/// Unlike [`Job::at`] and [`Job::plus`],
/// this affects all intervals associated with the job, not just the most recent one.
/// ```rust
/// # extern crate clokwerk;
/// # use clokwerk::*;
/// # use clokwerk::Interval::*;
/// # fn hit_snooze() {}
/// let mut scheduler = Scheduler::new();
/// scheduler.every(Weekday)
/// .at("7:40")
/// .and_every(Saturday)
/// .at("9:15")
/// .and_every(Sunday)
/// .at("9:15")
/// .repeating_every(10.minutes())
/// .times(5)
/// .run(|| hit_snooze());
/// ```
/// hits snooze five times every day, not just Sundays.
///
/// If a job is still repeating, it will ignore otherwise scheduled runs.
/// ```rust
/// # extern crate clokwerk;
/// # use clokwerk::*;
/// # use clokwerk::Interval::*;
/// # fn hit_snooze() {}
/// let mut scheduler = Scheduler::new();
/// scheduler.every(1.hour())
/// .repeating_every(45.minutes())
/// .times(3)
/// .run(|| println!("Hello"));
/// ```
/// If this is scheduled to run at 6 AM, it will print `Hello` at 6:00, 6:45, and 7:30, and then again at 8:00, 8:45, 9:30, etc.
pub fn repeating_every(&mut self, interval: Interval) -> Repeating<Tz, Tp> {
Repeating {
job: self,
interval,
}
}

/// Specify a task to run, and schedule its next run
pub fn run<F>(&mut self, f: F) -> &mut Self
where
F: 'static + FnMut() + Send,
{
self.job = Some(Box::new(f));
match self.next_run {
Some(_) => (),
None => {
let now = Tp::now(&self.tz);
self.next_run = self.next_run_time(&now);
if let None = self.next_run {
let now = Tp::now(&self.tz);
self.next_run = self.next_run_time(&now);
match &mut self.repeat_config {
Some(RepeatConfig{ repeats, repeats_left, ..}) => {
*repeats_left = *repeats;
}
None => ()
}
};
}
self
}

Expand All @@ -171,8 +274,23 @@ where
if let Some(ref mut f) = self.job {
f();
}

// We compute this up front since we can't borrow self immutably while doing this next bit
let next_run_time = self.next_run_time(now);
match &mut self.repeat_config {
Some(RepeatConfig{ repeats, repeats_left, repeat_interval}) => {
if *repeats_left > 0 {
*repeats_left -= 1;
self.next_run = Some(repeat_interval.next(now));
} else {
self.next_run = next_run_time;
*repeats_left = *repeats;
}
}
None => self.next_run = next_run_time
}

self.last_run = Some(now.clone());
self.next_run = self.next_run_time(now);
self.run_count = match self.run_count {
RunCount::Never => RunCount::Never,
RunCount::Times(n) if n > 1 => RunCount::Times(n - 1),
Expand All @@ -181,3 +299,27 @@ where
}
}
}

pub struct Repeating<'a, Tz: chrono::TimeZone, Tp: TimeProvider> {
job: &'a mut Job<Tz, Tp>,
interval: Interval,
}

impl<'a, Tz, Tp> Repeating<'a, Tz, Tp>
where
Tz: chrono::TimeZone + Sync + Send,
Tp: TimeProvider,
{
/// Indicate the number of times the job should be run every time it's scheduled.
/// Passing a value of 1 here is the same as not specifying a repeat at all. A value of 0 is ignored.
pub fn times(self, n: usize) -> &'a mut Job<Tz, Tp> {
if n >= 1 {
self.job.repeat_config = Some(RepeatConfig {
repeats: n - 1,
repeat_interval: self.interval,
repeats_left: 0
});
}
self.job
}
}
9 changes: 7 additions & 2 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ where
&mut self.jobs[last_index]
}

/// Run all jobs that should run at this time.
/// Run all jobs that should run at this time.
///
/// This method blocks while jobs are being run. If a job takes a long time, it may prevent
/// other tasks from running as scheduled. If you have a long-running task, you might consider
/// having the job move the work into another thread so that it can return promptly.
/// ```rust
/// # extern crate clokwerk;
/// # use clokwerk::*;
Expand Down Expand Up @@ -113,6 +117,7 @@ where
{
/// Start a background thread to call [Scheduler::run_pending()] with the specified frequency.
/// The resulting thread fill end cleanly if the returned [ScheduleHandle] is dropped.
#[must_use = "The scheduler is halted when the returned handle is dropped"]
pub fn watch_thread(self, frequency: Duration) -> ScheduleHandle {
let stop = Arc::new(AtomicBool::new(false));
let my_stop = stop.clone();
Expand Down Expand Up @@ -153,7 +158,7 @@ impl Drop for ScheduleHandle {
mod tests {
use super::{Scheduler, TimeProvider};
use crate::intervals::*;
use std::sync::{atomic::AtomicU32, atomic::Ordering, Arc};
use std::{thread, sync::{atomic::AtomicU32, atomic::Ordering, Arc}};

macro_rules! make_time_provider {
($name:ident : $($time:literal),+) => {
Expand Down

0 comments on commit 1061e60

Please sign in to comment.