Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUGFIX: Reap immediately after initialization on startup #58

Merged
merged 2 commits into from
Jul 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions dead_pool_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

const (
deadTime = 5 * time.Minute
deadTime = 10 * time.Second // 2 x heartbeat
reapPeriod = 10 * time.Minute
reapJitterSecs = 30
requeueKeysPerJob = 4
Expand Down Expand Up @@ -49,13 +49,8 @@ func (r *deadPoolReaper) stop() {
}

func (r *deadPoolReaper) loop() {
// Reap
if err := r.reap(); err != nil {
logError("dead_pool_reaper.reap", err)
}

// Begin reaping periodically
timer := time.NewTimer(r.reapPeriod)
// Reap immediately after we provide some time for initialization
timer := time.NewTimer(r.deadTime)
defer timer.Stop()

for {
Expand All @@ -64,7 +59,7 @@ func (r *deadPoolReaper) loop() {
r.doneStoppingChan <- struct{}{}
return
case <-timer.C:
// Schedule next occurrence with jitter
// Schedule next occurrence periodically with jitter
timer.Reset(r.reapPeriod + time.Duration(rand.Intn(reapJitterSecs))*time.Second)

// Reap
Expand Down
10 changes: 3 additions & 7 deletions dead_pool_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) {
cleanKeyspace(ns, pool)
// test vars
expectedDeadTime := 5 * time.Millisecond
expiration := 2 * expectedDeadTime

// create a stale job with a heartbeat
conn := pool.Get()
Expand All @@ -272,12 +271,8 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) {
assert.NoError(t, err)
jobTypes := map[string]*jobType{"job1": nil}
staleHeart := newWorkerPoolHeartbeater(ns, pool, stalePoolID, jobTypes, 1, []string{"id1"})
staleHeart.expires = expiration
staleHeart.start()

// sleep long enough for staleJob to be considered dead
time.Sleep(expectedDeadTime)

// should have 1 stale job and empty job queue
assert.EqualValues(t, 1, listSize(pool, redisKeyJobsInProgress(ns, stalePoolID, job1)))
assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1)))
Expand All @@ -287,8 +282,9 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) {
wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, []string{"job1"})
wp.deadPoolReaper.deadTime = expectedDeadTime
wp.deadPoolReaper.start()
// provide some initialization time
time.Sleep(1 * time.Millisecond)

// sleep long enough for staleJob to be considered dead
time.Sleep(expectedDeadTime * 2)

// now we should have 1 job in queue and no more stale jobs
assert.EqualValues(t, 1, listSize(pool, redisKeyJobs(ns, job1)))
Expand Down
5 changes: 0 additions & 5 deletions heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,13 @@ import (
)

const (
// expire the heartbeat key after the reaper has had a chance to assess whether or not the job(s) are dead
heartbeatExpiration = reapPeriod + (reapJitterSecs+1)*time.Second
beatPeriod = 5 * time.Second
)

type workerPoolHeartbeater struct {
workerPoolID string
namespace string // eg, "myapp-work"
pool *redis.Pool
expires time.Duration
beatPeriod time.Duration
concurrency uint
jobNames string
Expand All @@ -37,7 +34,6 @@ func newWorkerPoolHeartbeater(namespace string, pool *redis.Pool, workerPoolID s
workerPoolID: workerPoolID,
namespace: namespace,
pool: pool,
expires: heartbeatExpiration,
beatPeriod: beatPeriod,
concurrency: concurrency,
stopChan: make(chan struct{}),
Expand Down Expand Up @@ -107,7 +103,6 @@ func (h *workerPoolHeartbeater) heartbeat() {
"host", h.hostname,
"pid", h.pid,
)
conn.Send("EXPIRE", heartbeatKey, h.expires.Seconds())

if err := conn.Flush(); err != nil {
logError("heartbeat", err)
Expand Down
7 changes: 0 additions & 7 deletions heartbeater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@ import (
"time"
)

func TestHeartBeatExpiration(t *testing.T) {
// just to make sure -- heartbeats should not expire before the reaper has had a chance to assess whether or
// not jobs are dead (in the event of a dirty shutdown, for example)
assert.True(t, heartbeatExpiration > deadTime)
assert.True(t, heartbeatExpiration > reapPeriod + reapJitterSecs * time.Second)
}

func TestHeartbeater(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
Expand Down