Skip to content

Commit

Permalink
refactor(timer): optimize code logic to improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
hwjiangkai committed Sep 19, 2022
1 parent ca600ab commit 1bb2361
Show file tree
Hide file tree
Showing 4 changed files with 372 additions and 173 deletions.
108 changes: 71 additions & 37 deletions internal/timer/timingwheel/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func newTimingMsg(ctx context.Context, e *ce.Event) *timingMsg {
}

func (tm *timingMsg) hasExpired() bool {
return time.Now().After(tm.expiration)
return !time.Now().Before(tm.expiration)
}

func (tm *timingMsg) getExpiration() time.Time {
Expand Down Expand Up @@ -114,10 +114,13 @@ type bucket struct {

timingwheel *timingWheel
element *list.Element

waitingForReady func(ctx context.Context, events []*ce.Event)
eventHandler func(ctx context.Context, event *ce.Event)
}

func newBucket(tw *timingWheel, element *list.Element, tick time.Duration, ebName string, layer, slot int64) *bucket {
return &bucket{
b := &bucket{
config: tw.config,
tick: tick,
layer: layer,
Expand All @@ -130,6 +133,15 @@ func newBucket(tw *timingWheel, element *list.Element, tick time.Duration, ebNam
timingwheel: tw,
element: element,
}

if layer == 1 {
b.waitingForReady = b.waitingForExpired
b.eventHandler = b.pushToDistributionStation
} else {
b.waitingForReady = b.waitingForFlow
b.eventHandler = b.pushToPrevTimingWheel
}
return b
}

func (b *bucket) start(ctx context.Context) error {
Expand Down Expand Up @@ -210,32 +222,11 @@ func (b *bucket) run(ctx context.Context) {
glimitC <- struct{}{}
go func(ctx context.Context, e *ce.Event) {
defer wg.Done()
tm := newTimingMsg(ctx, e)
waitCtx, cancel := context.WithCancel(ctx)
wait.Until(func() {
if b.layer == 1 || tm.hasExpired() {
if b.timingwheel.getDistributionStation().push(ctx, tm) == nil {
cancel()
} else {
log.Warning(ctx, "push event to distribution station failed, retry until it succeed", map[string]interface{}{
"eventbus": b.eventbus,
"event": e.String(),
})
}
} else {
if b.getTimingWheelElement().prev().push(ctx, tm, true) {
cancel()
} else {
log.Warning(ctx, "push event to prev timingwheel failed, retry until it succeed", map[string]interface{}{
"eventbus": b.eventbus,
"event": e.String(),
})
}
}
}, b.config.Tick/defaultCheckWaitingPeriodRatio, waitCtx.Done())
b.eventHandler(ctx, e)
<-glimitC
}(ctx, event)
}

// asynchronously update offset after the same batch of events are successfully written.
offsetC <- waitGroup{
wg: &wg,
Expand All @@ -247,32 +238,75 @@ func (b *bucket) run(ctx context.Context) {
}()
}

func (b *bucket) waitingForReady(ctx context.Context, events []*ce.Event) {
func (b *bucket) pushToDistributionStation(ctx context.Context, e *ce.Event) {
tm := newTimingMsg(ctx, e)
waitCtx, cancel := context.WithCancel(ctx)
wait.Until(func() {
if b.timingwheel.getDistributionStation().push(ctx, tm) {
cancel()
} else {
log.Warning(ctx, "push event to distribution station failed, retry until it succeed", map[string]interface{}{
"eventbus": b.eventbus,
"event": e.String(),
})
}
}, b.config.Tick/defaultCheckWaitingPeriodRatio, waitCtx.Done())
}

func (b *bucket) pushToPrevTimingWheel(ctx context.Context, e *ce.Event) {
var handler func(ctx context.Context, tm *timingMsg) bool
tm := newTimingMsg(ctx, e)
if tm.hasExpired() {
handler = b.timingwheel.getDistributionStation().push
} else {
handler = b.getTimingWheelElement().prev().flow
}
waitCtx, cancel := context.WithCancel(ctx)
wait.Until(func() {
if handler(ctx, tm) {
cancel()
} else {
log.Warning(ctx, "push event failed, retry until it succeed", map[string]interface{}{
"eventbus": b.eventbus,
"event": e.String(),
})
}
}, b.config.Tick/defaultCheckWaitingPeriodRatio, waitCtx.Done())
}

func (b *bucket) waitingForExpired(ctx context.Context, events []*ce.Event) {
tm := newTimingMsg(ctx, events[0])
blockCtx, blockCancel := context.WithCancel(ctx)
wait.Until(func() {
if tm.hasExpired() {
if b.isReadyToDeliver(tm) {
blockCancel()
}
if b.layer > 1 && b.isReadyToFlow(tm) {
log.Debug(ctx, "the bucket is ready to flow", map[string]interface{}{
"eventbus": b.eventbus,
"offset": b.offset,
"layer": b.layer,
})
}, b.tick/defaultFrequentCheckWaitingPeriodRatio, blockCtx.Done())
}

func (b *bucket) isReadyToDeliver(tm *timingMsg) bool {
startTimeOfBucket := tm.getExpiration().UnixNano() - (tm.getExpiration().UnixNano() % b.tick.Nanoseconds())
return time.Now().UnixNano() >= startTimeOfBucket
}

func (b *bucket) waitingForFlow(ctx context.Context, events []*ce.Event) {
tm := newTimingMsg(ctx, events[0])
blockCtx, blockCancel := context.WithCancel(ctx)
wait.Until(func() {
if b.isReadyToFlow(tm) {
blockCancel()
}
}, b.tick/defaultFrequentCheckWaitingPeriodRatio, blockCtx.Done())
}

func (b *bucket) isReadyToFlow(tm *timingMsg) bool {
startTimeOfBucket := tm.getExpiration().UnixNano() - (tm.getExpiration().UnixNano() % b.tick.Nanoseconds())
advanceTimeOfFlow := defaultNumberOfTickLoadsInAdvance * b.getTimingWheelElement().prev().tick
return time.Now().UTC().Add(advanceTimeOfFlow).UnixNano() > startTimeOfBucket
advanceTimeOfFlow := defaultNumberOfTickFlowInAdvance * b.getTimingWheelElement().prev().tick
return time.Now().Add(advanceTimeOfFlow).UnixNano() >= startTimeOfBucket
}

func (b *bucket) push(ctx context.Context, tm *timingMsg) error {
return b.putEvent(ctx, tm)
func (b *bucket) push(ctx context.Context, tm *timingMsg) bool {
return b.putEvent(ctx, tm) == nil
}

func (b *bucket) isExistEventbus(ctx context.Context) bool {
Expand Down
44 changes: 39 additions & 5 deletions internal/timer/timingwheel/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestBucket_run(t *testing.T) {
}
}

Convey("test bucket run with get event failed", func() {
Convey("get event failed", func() {
mockEventlogReader.EXPECT().Seek(gomock.Any(), gomock.Any(), io.SeekStart).AnyTimes().Return(int64(0), nil)
mockEventlogReader.EXPECT().Read(gomock.Any(), gomock.Any()).AnyTimes().Return(events, errors.New("test"))
go func() {
Expand All @@ -159,7 +159,7 @@ func TestBucket_run(t *testing.T) {
bucket.wg.Wait()
})

Convey("test bucket run with push failed", func() {
Convey("push failed", func() {
mockEventlogReader.EXPECT().Seek(gomock.Any(), gomock.Any(), io.SeekStart).AnyTimes().Return(int64(0), nil)
mockEventlogReader.EXPECT().Read(gomock.Any(), gomock.Any()).AnyTimes().Return(events, nil)
mockEventbusWriter.EXPECT().Append(gomock.Any(), gomock.Any()).AnyTimes().Return("", errors.New("test"))
Expand All @@ -172,9 +172,11 @@ func TestBucket_run(t *testing.T) {
bucket.wg.Wait()
})

Convey("test bucket run with high layer push failed", func() {
Convey("flow failed", func() {
events[0] = event(1000)
bucket.layer = 2
bucket.waitingForReady = bucket.waitingForFlow
bucket.eventHandler = bucket.pushToPrevTimingWheel
bucket.element = tw.twList.Front().Next()
mockEventlogReader.EXPECT().Seek(gomock.Any(), gomock.Any(), io.SeekStart).AnyTimes().Return(int64(0), nil)
mockEventlogReader.EXPECT().Read(gomock.Any(), gomock.Any()).AnyTimes().Return(events, nil)
Expand All @@ -188,7 +190,25 @@ func TestBucket_run(t *testing.T) {
bucket.wg.Wait()
})

Convey("test bucket run success", func() {
Convey("flow success", func() {
events[0] = event(1000)
bucket.layer = 2
bucket.waitingForReady = bucket.waitingForFlow
bucket.eventHandler = bucket.pushToPrevTimingWheel
bucket.element = tw.twList.Front().Next()
mockEventlogReader.EXPECT().Seek(gomock.Any(), gomock.Any(), io.SeekStart).AnyTimes().Return(int64(0), nil)
mockEventlogReader.EXPECT().Read(gomock.Any(), gomock.Any()).AnyTimes().Return(events, nil)
mockEventbusWriter.EXPECT().Append(gomock.Any(), gomock.Any()).AnyTimes().Return("", nil)
mockStoreCli.EXPECT().Set(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
bucket.run(ctx)
bucket.wg.Wait()
})

Convey("push success", func() {
mockEventlogReader.EXPECT().Seek(gomock.Any(), gomock.Any(), io.SeekStart).AnyTimes().Return(int64(0), nil)
mockEventlogReader.EXPECT().Read(gomock.Any(), gomock.Any()).AnyTimes().Return(events, nil)
mockEventbusWriter.EXPECT().Append(gomock.Any(), gomock.Any()).AnyTimes().Return("", nil)
Expand All @@ -214,11 +234,25 @@ func TestBucket_createEventBus(t *testing.T) {
mockEventbusCtrlCli := ctrlpb.NewMockEventBusControllerClient(mockCtrl)
bucket.client = mockEventbusCtrlCli

Convey("test bucket create with eventbus exist", func() {
Convey("eventbus has exist", func() {
mockEventbusCtrlCli.EXPECT().GetEventBus(gomock.Any(), gomock.Any()).Times(1).Return(nil, nil)
err := bucket.createEventbus(ctx)
So(err, ShouldBeNil)
})

Convey("create failed", func() {
mockEventbusCtrlCli.EXPECT().GetEventBus(gomock.Any(), gomock.Any()).Times(1).Return(nil, errors.New("test"))
mockEventbusCtrlCli.EXPECT().CreateEventBus(gomock.Any(), gomock.Any()).Times(1).Return(nil, errors.New("test"))
err := bucket.createEventbus(ctx)
So(err, ShouldNotBeNil)
})

Convey("create success", func() {
mockEventbusCtrlCli.EXPECT().GetEventBus(gomock.Any(), gomock.Any()).Times(1).Return(nil, errors.New("test"))
mockEventbusCtrlCli.EXPECT().CreateEventBus(gomock.Any(), gomock.Any()).Times(1).Return(nil, nil)
err := bucket.createEventbus(ctx)
So(err, ShouldBeNil)
})
})
}

Expand Down
Loading

0 comments on commit 1bb2361

Please sign in to comment.