Skip to content

Commit

Permalink
Return an EntryID when adding a job and allow it to be removed.
Browse files Browse the repository at this point in the history
  • Loading branch information
robfig committed Dec 3, 2014
1 parent cf42b3d commit 2b666ea
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 28 deletions.
94 changes: 66 additions & 28 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ type Cron struct {
entries []*Entry
stop chan struct{}
add chan *Entry
snapshot chan []*Entry
remove chan EntryID
snapshot chan []Entry
running bool
nextID EntryID
}

// Job is an interface for submitted cron jobs.
Expand All @@ -29,23 +31,32 @@ type Schedule interface {
Next(time.Time) time.Time
}

// EntryID identifies an entry within a Cron instance
type EntryID int

// Entry consists of a schedule and the func to execute on that schedule.
type Entry struct {
// The schedule on which this job should be run.
// ID is the cron-assigned ID of this entry, which may be used to look up a
// snapshot or remove it.
ID EntryID

// Schedule on which this job should be run.
Schedule Schedule

// Next time the job will run. This is the zero time if Cron has not been
// Next time the job will run, or the zero time if Cron has not been
// started or this entry's schedule is unsatisfiable
Next time.Time

// Prev is the last time this job was run.
// This is the zero time if the job has never been run.
// Prev is the last time this job was run, or the zero time if never.
Prev time.Time

// Job is the thing to run when the Schedule is activated.
Job Job
}

// Valid returns true if this is not the zero entry.
func (e Entry) Valid() bool { return e.ID != 0 }

// byTime is a wrapper for sorting the entry array by time
// (with zero time at the end).
type byTime []*Entry
Expand All @@ -71,7 +82,8 @@ func New() *Cron {
entries: nil,
add: make(chan *Entry),
stop: make(chan struct{}),
snapshot: make(chan []*Entry),
snapshot: make(chan []Entry),
remove: make(chan EntryID),
running: false,
}
}
Expand All @@ -82,44 +94,63 @@ type FuncJob func()
func (f FuncJob) Run() { f() }

// AddFunc adds a func to the Cron to be run on the given schedule.
func (c *Cron) AddFunc(spec string, cmd func()) error {
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
return c.AddJob(spec, FuncJob(cmd))
}

// AddJob adds a Job to the Cron to be run on the given schedule.
func (c *Cron) AddJob(spec string, cmd Job) error {
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
schedule, err := Parse(spec)
if err != nil {
return err
return 0, err
}
c.Schedule(schedule, cmd)
return nil
return c.Schedule(schedule, cmd), nil
}

// Schedule adds a Job to the Cron to be run on the given schedule.
func (c *Cron) Schedule(schedule Schedule, cmd Job) {
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
c.nextID++
entry := &Entry{
ID: c.nextID,
Schedule: schedule,
Job: cmd,
}
if !c.running {
c.entries = append(c.entries, entry)
return
} else {
c.add <- entry
}

c.add <- entry
return entry.ID
}

// Entries returns a snapshot of the cron entries.
func (c *Cron) Entries() []*Entry {
func (c *Cron) Entries() []Entry {
if c.running {
c.snapshot <- nil
x := <-c.snapshot
return x
return <-c.snapshot
}
return c.entrySnapshot()
}

// Entry returns a snapshot of the given entry, or nil if it couldn't be found.
func (c *Cron) Entry(id EntryID) Entry {
for _, entry := range c.Entries() {
if id == entry.ID {
return entry
}
}
return Entry{}
}

// Remove an entry from being run in the future.
func (c *Cron) Remove(id EntryID) {
if c.running {
c.remove <- id
} else {
c.removeEntry(id)
}
}

// Start the cron scheduler in its own go-routine.
func (c *Cron) Start() {
c.running = true
Expand Down Expand Up @@ -168,11 +199,13 @@ func (c *Cron) run() {
case <-c.snapshot:
c.snapshot <- c.entrySnapshot()

case id := <-c.remove:
c.removeEntry(id)

case <-c.stop:
return
}

// 'now' should be updated after newEntry and snapshot cases.
now = time.Now().Local()
}
}
Expand All @@ -184,15 +217,20 @@ func (c *Cron) Stop() {
}

// entrySnapshot returns a copy of the current cron entry list.
func (c *Cron) entrySnapshot() []*Entry {
entries := []*Entry{}
for _, e := range c.entries {
entries = append(entries, &Entry{
Schedule: e.Schedule,
Next: e.Next,
Prev: e.Prev,
Job: e.Job,
})
func (c *Cron) entrySnapshot() []Entry {
var entries = make([]Entry, len(c.entries))
for i, e := range c.entries {
entries[i] = *e
}
return entries
}

func (c *Cron) removeEntry(id EntryID) {
var entries []*Entry
for _, e := range c.entries {
if e.ID != id {
entries = append(entries, e)
}
}
c.entries = entries
}
41 changes: 41 additions & 0 deletions cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,43 @@ func TestAddWhileRunning(t *testing.T) {
}
}

// Add a job, remove a job, start cron, expect nothing runs.
func TestRemoveBeforeRunning(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)

cron := New()
id, _ := cron.AddFunc("* * * * * ?", func() { wg.Done() })
cron.Remove(id)
cron.Start()
defer cron.Stop()

select {
case <-time.After(ONE_SECOND):
// Success, shouldn't run
case <-wait(wg):
t.FailNow()
}
}

// Start cron, add a job, remove it, expect it doesn't run.
func TestRemoveWhileRunning(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)

cron := New()
cron.Start()
defer cron.Stop()
id, _ := cron.AddFunc("* * * * * ?", func() { wg.Done() })
cron.Remove(id)

select {
case <-time.After(ONE_SECOND):
case <-wait(wg):
t.FailNow()
}
}

// Test timing with Entries.
func TestSnapshotEntries(t *testing.T) {
wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -113,10 +150,14 @@ func TestMultipleEntries(t *testing.T) {
cron := New()
cron.AddFunc("0 0 0 1 1 ?", func() {})
cron.AddFunc("* * * * * ?", func() { wg.Done() })
id1, _ := cron.AddFunc("* * * * * ?", func() { t.Fatal() })
id2, _ := cron.AddFunc("* * * * * ?", func() { t.Fatal() })
cron.AddFunc("0 0 0 31 12 ?", func() {})
cron.AddFunc("* * * * * ?", func() { wg.Done() })

cron.Remove(id1)
cron.Start()
cron.Remove(id2)
defer cron.Stop()

select {
Expand Down

0 comments on commit 2b666ea

Please sign in to comment.