Skip to content

Commit

Permalink
Merge branch 'release-5.0' into pr/2499
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Aug 12, 2021
2 parents 1d6fb97 + 1098628 commit e842f28
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 361 deletions.
114 changes: 0 additions & 114 deletions pkg/workerpool/context.go

This file was deleted.

237 changes: 0 additions & 237 deletions pkg/workerpool/context_test.go

This file was deleted.

2 changes: 2 additions & 0 deletions pkg/workerpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ type EventHandle interface {
// AddEvent adds an `event` object to the internal queue, so that the `f` used to register the handle can be called.
// Note: events are always processed in the order they are added.
// Unregistering the EventHandle MAY CAUSE EVENT LOSSES. But for an event lost, any event after it is guaranteed to be lost too.
// Cancelling `ctx` here will cancel the on-going or next execution of the event.
AddEvent(ctx context.Context, event interface{}) error

// SetTimer is used to provide a function that is periodic called, as long as the EventHandle has not been unregistered.
// The current implementation uses as the base clock source a ticker whose interval is the const workerPoolDefaultClockSourceInterval.
// DO NOT set an interval less than workerPoolDefaultClockSourceInterval.
// Cancelling `ctx` here will cancel the on-going or next execution of `f`.
SetTimer(ctx context.Context, interval time.Duration, f func(ctx context.Context) error) EventHandle

// Unregister removes the EventHandle from the WorkerPool.
Expand Down
12 changes: 2 additions & 10 deletions pkg/workerpool/pool_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,7 @@ func (h *defaultEventHandle) AddEvent(ctx context.Context, event interface{}) er
task := task{
handle: h,
f: func(ctx1 context.Context) error {
// Here we merge the context passed down from WorkerPool.Run,
// with the context supplied by AddEvent,
// because we want operations to be cancellable by both contexts.
mContext, cancel := MergeContexts(ctx, ctx1)
// this cancels the merged context only.
defer cancel()
return h.f(mContext, event)
return h.f(ctx, event)
},
}

Expand All @@ -154,9 +148,7 @@ func (h *defaultEventHandle) SetTimer(ctx context.Context, interval time.Duratio

h.timerInterval = interval
h.timerHandler = func(ctx1 context.Context) error {
mContext, cancel := MergeContexts(ctx, ctx1)
defer cancel()
return f(mContext)
return f(ctx)
}
// mark the timer handler function as valid
atomic.StoreInt32(&h.hasTimer, 1)
Expand Down
Loading

0 comments on commit e842f28

Please sign in to comment.