Skip to content

Commit

Permalink
BUGFIX: Reap immediately after initialization on startup (gocraft#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
shdunning authored Jul 29, 2017
1 parent e511e61 commit 2a383f8
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 28 deletions.
13 changes: 4 additions & 9 deletions dead_pool_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

const (
deadTime = 5 * time.Minute
deadTime = 10 * time.Second // 2 x heartbeat
reapPeriod = 10 * time.Minute
reapJitterSecs = 30
requeueKeysPerJob = 4
Expand Down Expand Up @@ -49,13 +49,8 @@ func (r *deadPoolReaper) stop() {
}

func (r *deadPoolReaper) loop() {
// Reap
if err := r.reap(); err != nil {
logError("dead_pool_reaper.reap", err)
}

// Begin reaping periodically
timer := time.NewTimer(r.reapPeriod)
// Reap immediately after we provide some time for initialization
timer := time.NewTimer(r.deadTime)
defer timer.Stop()

for {
Expand All @@ -64,7 +59,7 @@ func (r *deadPoolReaper) loop() {
r.doneStoppingChan <- struct{}{}
return
case <-timer.C:
// Schedule next occurrence with jitter
// Schedule next occurrence periodically with jitter
timer.Reset(r.reapPeriod + time.Duration(rand.Intn(reapJitterSecs))*time.Second)

// Reap
Expand Down
10 changes: 3 additions & 7 deletions dead_pool_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) {
cleanKeyspace(ns, pool)
// test vars
expectedDeadTime := 5 * time.Millisecond
expiration := 2 * expectedDeadTime

// create a stale job with a heartbeat
conn := pool.Get()
Expand All @@ -272,12 +271,8 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) {
assert.NoError(t, err)
jobTypes := map[string]*jobType{"job1": nil}
staleHeart := newWorkerPoolHeartbeater(ns, pool, stalePoolID, jobTypes, 1, []string{"id1"})
staleHeart.expires = expiration
staleHeart.start()

// sleep long enough for staleJob to be considered dead
time.Sleep(expectedDeadTime)

// should have 1 stale job and empty job queue
assert.EqualValues(t, 1, listSize(pool, redisKeyJobsInProgress(ns, stalePoolID, job1)))
assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1)))
Expand All @@ -287,8 +282,9 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) {
wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, []string{"job1"})
wp.deadPoolReaper.deadTime = expectedDeadTime
wp.deadPoolReaper.start()
// provide some initialization time
time.Sleep(1 * time.Millisecond)

// sleep long enough for staleJob to be considered dead
time.Sleep(expectedDeadTime * 2)

// now we should have 1 job in queue and no more stale jobs
assert.EqualValues(t, 1, listSize(pool, redisKeyJobs(ns, job1)))
Expand Down
5 changes: 0 additions & 5 deletions heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,13 @@ import (
)

const (
// expire the heartbeat key after the reaper has had a chance to assess whether or not the job(s) are dead
heartbeatExpiration = reapPeriod + (reapJitterSecs+1)*time.Second
beatPeriod = 5 * time.Second
)

type workerPoolHeartbeater struct {
workerPoolID string
namespace string // eg, "myapp-work"
pool *redis.Pool
expires time.Duration
beatPeriod time.Duration
concurrency uint
jobNames string
Expand All @@ -37,7 +34,6 @@ func newWorkerPoolHeartbeater(namespace string, pool *redis.Pool, workerPoolID s
workerPoolID: workerPoolID,
namespace: namespace,
pool: pool,
expires: heartbeatExpiration,
beatPeriod: beatPeriod,
concurrency: concurrency,
stopChan: make(chan struct{}),
Expand Down Expand Up @@ -107,7 +103,6 @@ func (h *workerPoolHeartbeater) heartbeat() {
"host", h.hostname,
"pid", h.pid,
)
conn.Send("EXPIRE", heartbeatKey, h.expires.Seconds())

if err := conn.Flush(); err != nil {
logError("heartbeat", err)
Expand Down
7 changes: 0 additions & 7 deletions heartbeater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@ import (
"time"
)

func TestHeartBeatExpiration(t *testing.T) {
// just to make sure -- heartbeats should not expire before the reaper has had a chance to assess whether or
// not jobs are dead (in the event of a dirty shutdown, for example)
assert.True(t, heartbeatExpiration > deadTime)
assert.True(t, heartbeatExpiration > reapPeriod + reapJitterSecs * time.Second)
}

func TestHeartbeater(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
Expand Down

0 comments on commit 2a383f8

Please sign in to comment.