diff --git a/cron.go b/cron.go index cff47b9d..829da122 100644 --- a/cron.go +++ b/cron.go @@ -13,8 +13,10 @@ type Cron struct { entries []*Entry stop chan struct{} add chan *Entry - snapshot chan []*Entry + remove chan EntryID + snapshot chan []Entry running bool + nextID EntryID } // Job is an interface for submitted cron jobs. @@ -29,23 +31,32 @@ type Schedule interface { Next(time.Time) time.Time } +// EntryID identifies an entry within a Cron instance +type EntryID int + // Entry consists of a schedule and the func to execute on that schedule. type Entry struct { - // The schedule on which this job should be run. + // ID is the cron-assigned ID of this entry, which may be used to look up a + // snapshot or remove it. + ID EntryID + + // Schedule on which this job should be run. Schedule Schedule - // Next time the job will run. This is the zero time if Cron has not been + // Next time the job will run, or the zero time if Cron has not been // started or this entry's schedule is unsatisfiable Next time.Time - // Prev is the last time this job was run. - // This is the zero time if the job has never been run. + // Prev is the last time this job was run, or the zero time if never. Prev time.Time // Job is the thing to run when the Schedule is activated. Job Job } +// Valid returns true if this is not the zero entry. +func (e Entry) Valid() bool { return e.ID != 0 } + // byTime is a wrapper for sorting the entry array by time // (with zero time at the end). type byTime []*Entry @@ -71,7 +82,8 @@ func New() *Cron { entries: nil, add: make(chan *Entry), stop: make(chan struct{}), - snapshot: make(chan []*Entry), + snapshot: make(chan []Entry), + remove: make(chan EntryID), running: false, } } @@ -82,44 +94,63 @@ type FuncJob func() func (f FuncJob) Run() { f() } // AddFunc adds a func to the Cron to be run on the given schedule. -func (c *Cron) AddFunc(spec string, cmd func()) error { +func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) { return c.AddJob(spec, FuncJob(cmd)) } // AddJob adds a Job to the Cron to be run on the given schedule. -func (c *Cron) AddJob(spec string, cmd Job) error { +func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) { schedule, err := Parse(spec) if err != nil { - return err + return 0, err } - c.Schedule(schedule, cmd) - return nil + return c.Schedule(schedule, cmd), nil } // Schedule adds a Job to the Cron to be run on the given schedule. -func (c *Cron) Schedule(schedule Schedule, cmd Job) { +func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID { + c.nextID++ entry := &Entry{ + ID: c.nextID, Schedule: schedule, Job: cmd, } if !c.running { c.entries = append(c.entries, entry) - return + } else { + c.add <- entry } - - c.add <- entry + return entry.ID } // Entries returns a snapshot of the cron entries. -func (c *Cron) Entries() []*Entry { +func (c *Cron) Entries() []Entry { if c.running { c.snapshot <- nil - x := <-c.snapshot - return x + return <-c.snapshot } return c.entrySnapshot() } +// Entry returns a snapshot of the given entry, or nil if it couldn't be found. +func (c *Cron) Entry(id EntryID) Entry { + for _, entry := range c.Entries() { + if id == entry.ID { + return entry + } + } + return Entry{} +} + +// Remove an entry from being run in the future. +func (c *Cron) Remove(id EntryID) { + if c.running { + c.remove <- id + } else { + c.removeEntry(id) + } +} + // Start the cron scheduler in its own go-routine. func (c *Cron) Start() { c.running = true @@ -168,11 +199,13 @@ func (c *Cron) run() { case <-c.snapshot: c.snapshot <- c.entrySnapshot() + case id := <-c.remove: + c.removeEntry(id) + case <-c.stop: return } - // 'now' should be updated after newEntry and snapshot cases. now = time.Now().Local() } } @@ -184,15 +217,20 @@ func (c *Cron) Stop() { } // entrySnapshot returns a copy of the current cron entry list. -func (c *Cron) entrySnapshot() []*Entry { - entries := []*Entry{} - for _, e := range c.entries { - entries = append(entries, &Entry{ - Schedule: e.Schedule, - Next: e.Next, - Prev: e.Prev, - Job: e.Job, - }) +func (c *Cron) entrySnapshot() []Entry { + var entries = make([]Entry, len(c.entries)) + for i, e := range c.entries { + entries[i] = *e } return entries } + +func (c *Cron) removeEntry(id EntryID) { + var entries []*Entry + for _, e := range c.entries { + if e.ID != id { + entries = append(entries, e) + } + } + c.entries = entries +} diff --git a/cron_test.go b/cron_test.go index eea34008..06dfea3a 100644 --- a/cron_test.go +++ b/cron_test.go @@ -77,6 +77,43 @@ func TestAddWhileRunning(t *testing.T) { } } +// Add a job, remove a job, start cron, expect nothing runs. +func TestRemoveBeforeRunning(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + + cron := New() + id, _ := cron.AddFunc("* * * * * ?", func() { wg.Done() }) + cron.Remove(id) + cron.Start() + defer cron.Stop() + + select { + case <-time.After(ONE_SECOND): + // Success, shouldn't run + case <-wait(wg): + t.FailNow() + } +} + +// Start cron, add a job, remove it, expect it doesn't run. +func TestRemoveWhileRunning(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + + cron := New() + cron.Start() + defer cron.Stop() + id, _ := cron.AddFunc("* * * * * ?", func() { wg.Done() }) + cron.Remove(id) + + select { + case <-time.After(ONE_SECOND): + case <-wait(wg): + t.FailNow() + } +} + // Test timing with Entries. func TestSnapshotEntries(t *testing.T) { wg := &sync.WaitGroup{} @@ -113,10 +150,14 @@ func TestMultipleEntries(t *testing.T) { cron := New() cron.AddFunc("0 0 0 1 1 ?", func() {}) cron.AddFunc("* * * * * ?", func() { wg.Done() }) + id1, _ := cron.AddFunc("* * * * * ?", func() { t.Fatal() }) + id2, _ := cron.AddFunc("* * * * * ?", func() { t.Fatal() }) cron.AddFunc("0 0 0 31 12 ?", func() {}) cron.AddFunc("* * * * * ?", func() { wg.Done() }) + cron.Remove(id1) cron.Start() + cron.Remove(id2) defer cron.Stop() select {