Skip to content

Commit

Permalink
Replace time.After with timers
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Sep 25, 2024
1 parent 11c3626 commit a64f23c
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 6 deletions.
7 changes: 6 additions & 1 deletion common/cache/metricsScopeCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/metrics"
)

Expand All @@ -47,6 +48,7 @@ type (
cache atomic.Value
closeCh chan struct{}
flushDuration time.Duration
timeSource clock.TimeSource
}
)

Expand All @@ -59,16 +61,19 @@ func NewDomainMetricsScopeCache() DomainMetricsScopeCache {
},
closeCh: make(chan struct{}),
flushDuration: flushBufferedMetricsScopeDuration,
timeSource: clock.NewRealTimeSource(),
}

mc.cache.Store(make(metricsScopeMap))
return mc
}

func (c *domainMetricsScopeCache) flushBufferedMetricsScope(flushDuration time.Duration) {
ticker := c.timeSource.NewTicker(flushDuration)
defer ticker.Stop()
for {
select {
case <-time.After(flushDuration):
case <-ticker.Chan():
c.buffer.Lock()
if len(c.buffer.bufferMap) > 0 {
scopeMap := make(metricsScopeMap)
Expand Down
2 changes: 1 addition & 1 deletion service/history/engine/engineimpl/query_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (e *historyEngineImpl) QueryWorkflow(
}
deadline := time.Now().Add(queryFirstDecisionTaskWaitTime)
for mutableStateResp.GetPreviousStartedEventID() <= 0 && time.Now().Before(deadline) {
<-time.After(queryFirstDecisionTaskCheckInterval)
time.Sleep(queryFirstDecisionTaskCheckInterval)
mutableStateResp, err = e.getMutableState(ctx, request.GetDomainUUID(), execution)
if err != nil {
return nil, err
Expand Down
13 changes: 12 additions & 1 deletion service/history/queue/timer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,13 @@ func (t *timerQueueProcessor) completeTimerLoop() {
completeTimer := time.NewTimer(t.config.TimerProcessorCompleteTimerInterval())
defer completeTimer.Stop()

// Create a retryTimer once, and reset it as needed
retryTimer := time.NewTimer(0)
defer retryTimer.Stop()

Check warning on line 406 in service/history/queue/timer_queue_processor.go

View check run for this annotation

Codecov / codecov/patch

service/history/queue/timer_queue_processor.go#L405-L406

Added lines #L405 - L406 were not covered by tests
// Stop it immediately because we don't want it to fire initially
if !retryTimer.Stop() {
<-retryTimer.C

Check warning on line 409 in service/history/queue/timer_queue_processor.go

View check run for this annotation

Codecov / codecov/patch

service/history/queue/timer_queue_processor.go#L408-L409

Added lines #L408 - L409 were not covered by tests
}
for {
select {
case <-t.shutdownChan:
Expand All @@ -425,11 +432,15 @@ func (t *timerQueueProcessor) completeTimerLoop() {
return
}

// Reset the retryTimer for the delay between attempts
// TODO: the first retry has 0 backoff, revisit it to see if it's expected
retryDuration := time.Duration(attempt*100) * time.Millisecond
retryTimer.Reset(retryDuration)

Check warning on line 438 in service/history/queue/timer_queue_processor.go

View check run for this annotation

Codecov / codecov/patch

service/history/queue/timer_queue_processor.go#L437-L438

Added lines #L437 - L438 were not covered by tests
select {
case <-t.shutdownChan:
t.drain()
return
case <-time.After(time.Duration(attempt*100) * time.Millisecond):
case <-retryTimer.C:

Check warning on line 443 in service/history/queue/timer_queue_processor.go

View check run for this annotation

Codecov / codecov/patch

service/history/queue/timer_queue_processor.go#L443

Added line #L443 was not covered by tests
// do nothing. retry loop will continue
}
}
Expand Down
15 changes: 13 additions & 2 deletions service/history/queue/transfer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,14 @@ func (t *transferQueueProcessor) completeTransferLoop() {
completeTimer := time.NewTimer(t.config.TransferProcessorCompleteTransferInterval())
defer completeTimer.Stop()

// Create a retryTimer once, and reset it as needed
retryTimer := time.NewTimer(0)
defer retryTimer.Stop()
// Stop it immediately because we don't want it to fire initially
if !retryTimer.Stop() {
<-retryTimer.C

Check warning on line 371 in service/history/queue/transfer_queue_processor.go

View check run for this annotation

Codecov / codecov/patch

service/history/queue/transfer_queue_processor.go#L371

Added line #L371 was not covered by tests
}

for {
select {
case <-t.shutdownChan:
Expand All @@ -387,12 +395,15 @@ func (t *transferQueueProcessor) completeTransferLoop() {
t.Stop()
return
}

// Reset the retryTimer for the delay between attempts
// TODO: the first retry has 0 backoff, revisit it to see if it's expected
retryDuration := time.Duration(attempt*100) * time.Millisecond
retryTimer.Reset(retryDuration)

Check warning on line 401 in service/history/queue/transfer_queue_processor.go

View check run for this annotation

Codecov / codecov/patch

service/history/queue/transfer_queue_processor.go#L400-L401

Added lines #L400 - L401 were not covered by tests
select {
case <-t.shutdownChan:
t.drain()
return
case <-time.After(time.Duration(attempt*100) * time.Millisecond):
case <-retryTimer.C:

Check warning on line 406 in service/history/queue/transfer_queue_processor.go

View check run for this annotation

Codecov / codecov/patch

service/history/queue/transfer_queue_processor.go#L406

Added line #L406 was not covered by tests
// do nothing. retry loop will continue
}
}
Expand Down
7 changes: 6 additions & 1 deletion service/worker/scanner/tasklist/scavenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -59,6 +60,7 @@ type (
maxTasksPerJobFn dynamicconfig.IntPropertyFn
cleanOrphans dynamicconfig.BoolPropertyFn
pollInterval time.Duration
timeSource clock.TimeSource

// stopC is used to signal the scavenger to stop
stopC chan struct{}
Expand Down Expand Up @@ -178,6 +180,7 @@ func NewScavenger(
pollInterval: pollInterval,
maxTasksPerJobFn: maxTasksPerJobFn,
getOrphanTasksPageSizeFn: getOrphanTasksPageSize,
timeSource: clock.NewRealTimeSource(),
}
}

Expand Down Expand Up @@ -257,9 +260,11 @@ func (s *Scavenger) process(taskListInfo *p.TaskListInfo) executor.TaskStatus {

func (s *Scavenger) awaitExecutor() {
outstanding := s.executor.TaskCount()
ticker := s.timeSource.NewTicker(s.pollInterval)
defer ticker.Stop()
for outstanding > 0 {
select {
case <-time.After(s.pollInterval):
case <-ticker.Chan():
outstanding = s.executor.TaskCount()
s.scope.UpdateGauge(metrics.TaskListOutstandingCount, float64(outstanding))
case <-s.stopC:
Expand Down

0 comments on commit a64f23c

Please sign in to comment.