Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(timer): discard events if no eventbus causes failed delivery #239

Merged
merged 4 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions client/pkg/discovery/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ import (
"github.com/linkall-labs/vanus/client/pkg/primitive"
)

type WritableLogsResult struct {
Eventlogs []*record.EventLog
Err error
}

type WritableLogsWatcher struct {
*primitive.Watcher
ch chan []*record.EventLog
ch chan *WritableLogsResult
}

func (w *WritableLogsWatcher) Chan() <-chan []*record.EventLog {
func (w *WritableLogsWatcher) Chan() <-chan *WritableLogsResult {
return w.ch
}

Expand All @@ -45,16 +50,12 @@ func WatchWritableLogs(eventbus *VRN) (*WritableLogsWatcher, error) {
}

// TODO: true watch
ch := make(chan []*record.EventLog, 1)
ch := make(chan *WritableLogsResult, 1)
w := primitive.NewWatcher(30*time.Second, func() {
rs, err := ns.LookupWritableLogs(context.Background(), eventbus)
ifplusor marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
// TODO: logging

// FIXME: notify
ch <- nil
} else {
ch <- rs
ch <- &WritableLogsResult{
Eventlogs: rs,
Err: err,
}
}, func() {
close(ch)
Expand Down
53 changes: 43 additions & 10 deletions client/pkg/eventbus/basic_eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ import (
"encoding/base64"
"encoding/binary"
stderrors "errors"
"strings"
"sync"

// third-party libraries.
ce "github.com/cloudevents/sdk-go/v2"
"github.com/scylladb/go-set/strset"
"google.golang.org/grpc/status"

// this project.
"github.com/linkall-labs/vanus/client/pkg/discovery"
"github.com/linkall-labs/vanus/client/pkg/discovery/record"
"github.com/linkall-labs/vanus/client/pkg/errors"
"github.com/linkall-labs/vanus/client/pkg/eventlog"
"github.com/linkall-labs/vanus/client/pkg/primitive"
Expand All @@ -47,20 +48,18 @@ func newEventBus(cfg *Config) EventBus {
writableLogs: strset.New(),
logWriters: make([]eventlog.LogWriter, 0),
writableMu: sync.RWMutex{},
state: nil,
}

go func() {
ch := w.Chan()
for {
rs, ok := <-ch
re, ok := <-ch
if !ok {
break
}

if rs != nil {
bus.updateWritableLogs(rs)
}

bus.updateWritableLogs(re)
bus.writableWatcher.Wakeup()
}
}()
Expand All @@ -78,6 +77,7 @@ type basicEventBus struct {
writableLogs *strset.Set
logWriters []eventlog.LogWriter
writableMu sync.RWMutex
state error
}

// make sure basicEventBus implements EventBus.
Expand All @@ -104,9 +104,39 @@ func (b *basicEventBus) Writer() (BusWriter, error) {
return w, nil
}

func (b *basicEventBus) updateWritableLogs(ls []*record.EventLog) {
s := strset.NewWithSize(len(ls))
for _, l := range ls {
func (b *basicEventBus) getState() error {
b.writableMu.RLock()
defer b.writableMu.RUnlock()
return b.state
}

func (b *basicEventBus) setState(err error) {
b.writableMu.Lock()
defer b.writableMu.Unlock()
b.state = err
}

func (b *basicEventBus) isNeedUpdate(err error) bool {
if err == nil {
b.setState(nil)
return true
}
sts := status.Convert(err)
// TODO: temporary scheme, wait for error code reconstruction
if strings.Contains(sts.Message(), "RESOURCE_NOT_FOUND") {
b.setState(errors.ErrNotFound)
return true
}
return false
}

func (b *basicEventBus) updateWritableLogs(re *discovery.WritableLogsResult) {
if !b.isNeedUpdate(re.Err) {
return
}

s := strset.NewWithSize(len(re.Eventlogs))
for _, l := range re.Eventlogs {
s.Add(l.VRN)
}

Expand All @@ -119,7 +149,7 @@ func (b *basicEventBus) updateWritableLogs(ls []*record.EventLog) {
removed := strset.Difference(b.writableLogs, s)
added := strset.Difference(s, b.writableLogs)

a := make([]eventlog.LogWriter, 0, len(ls))
a := make([]eventlog.LogWriter, 0, len(re.Eventlogs))
for _, w := range b.logWriters {
if !removed.Has(w.Log().VRN().String()) {
a = append(a, w)
Expand Down Expand Up @@ -207,6 +237,9 @@ func (w *basicBusWriter) Append(ctx context.Context, event *ce.Event) (string, e
func (w *basicBusWriter) pickLogWriter(ctx context.Context, event *ce.Event) (eventlog.LogWriter, error) {
lws := w.ebus.getLogWriters(ctx)
if len(lws) == 0 {
if err := w.ebus.getState(); err != nil {
return nil, err
}
return nil, errors.ErrNotWritable
}

Expand Down
30 changes: 23 additions & 7 deletions internal/timer/timingwheel/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,19 @@ func (b *bucket) connectEventbus(ctx context.Context) error {
return nil
}

func (b *bucket) putEvent(ctx context.Context, tm *timingMsg) error {
func (b *bucket) putEvent(ctx context.Context, tm *timingMsg) (err error) {
defer func() {
if errOfPanic := recover(); errOfPanic != nil {
ifplusor marked this conversation as resolved.
Show resolved Hide resolved
log.Warning(ctx, "panic when put event", map[string]interface{}{
log.KeyError: errOfPanic,
})
err = errors.New("panic when put event")
}
}()
if !b.isLeader() {
return nil
}
_, err := b.eventbusWriter.Append(ctx, tm.getEvent())
_, err = b.eventbusWriter.Append(ctx, tm.getEvent())
if err != nil {
log.Error(ctx, "append event to failed", map[string]interface{}{
log.KeyError: err,
Expand All @@ -353,14 +361,22 @@ func (b *bucket) putEvent(ctx context.Context, tm *timingMsg) error {
"eventbus": b.eventbus,
"expiration": tm.getExpiration().Format(time.RFC3339Nano),
})
return nil
return err
}

func (b *bucket) getEvent(ctx context.Context, number int16) ([]*ce.Event, error) {
func (b *bucket) getEvent(ctx context.Context, number int16) (events []*ce.Event, err error) {
defer func() {
if errOfPanic := recover(); errOfPanic != nil {
log.Warning(ctx, "panic when get event", map[string]interface{}{
log.KeyError: errOfPanic,
})
events = []*ce.Event{}
err = es.ErrOnEnd
}
}()
if !b.isLeader() {
return []*ce.Event{}, es.ErrOnEnd
}
var err error
_, err = b.eventlogReader.Seek(ctx, b.offset, io.SeekStart)
if err != nil {
log.Error(ctx, "seek failed", map[string]interface{}{
Expand All @@ -370,7 +386,7 @@ func (b *bucket) getEvent(ctx context.Context, number int16) ([]*ce.Event, error
return nil, err
}

events, err := b.eventlogReader.Read(ctx, number)
events, err = b.eventlogReader.Read(ctx, number)
if err != nil {
if !errors.Is(err, es.ErrOnEnd) && !errors.Is(ctx.Err(), context.Canceled) {
log.Error(ctx, "read failed", map[string]interface{}{
Expand All @@ -386,7 +402,7 @@ func (b *bucket) getEvent(ctx context.Context, number int16) ([]*ce.Event, error
"offset": b.offset,
"number": number,
})
return events, nil
return events, err
}

func (b *bucket) updateOffsetMeta(ctx context.Context, offset int64) {
Expand Down
15 changes: 15 additions & 0 deletions internal/timer/timingwheel/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,13 @@ func TestBucket_putEvent(t *testing.T) {
err := bucket.putEvent(ctx, newTimingMsg(ctx, e))
So(err, ShouldBeNil)
})

Convey("test bucket put event panic", func() {
bucket.eventbusWriter = nil
tw.SetLeader(true)
err := bucket.putEvent(ctx, newTimingMsg(ctx, e))
So(err, ShouldNotBeNil)
})
})
}

Expand All @@ -303,6 +310,14 @@ func TestBucket_getEvent(t *testing.T) {
So(err, ShouldNotBeNil)
})

Convey("test bucket get event panic", func() {
bucket.eventlogReader = nil
tw.SetLeader(true)
result, err := bucket.getEvent(ctx, 1)
So(len(result), ShouldEqual, 0)
So(err, ShouldNotBeNil)
})

Convey("test bucket get event with seek error", func() {
tw.SetLeader(true)
events := make([]*ce.Event, 1)
Expand Down
14 changes: 11 additions & 3 deletions internal/timer/timingwheel/timingwheel.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"sync"
"time"

es "github.com/linkall-labs/vanus/client/pkg/errors"
errcli "github.com/linkall-labs/vanus/client/pkg/errors"
"github.com/linkall-labs/vanus/internal/kv"
"github.com/linkall-labs/vanus/internal/kv/etcd"
timererrors "github.com/linkall-labs/vanus/internal/timer/errors"
Expand Down Expand Up @@ -373,7 +373,7 @@ func (tw *timingWheel) runReceivingStation(ctx context.Context) {
// batch read
events, err := tw.receivingStation.getEvent(ctx, defaultNumberOfEventsRead)
if err != nil {
if !errors.Is(err, es.ErrOnEnd) {
if !errors.Is(err, errcli.ErrOnEnd) {
log.Error(ctx, "get event failed when receiving station running", map[string]interface{}{
log.KeyError: err,
"eventbus": tw.receivingStation.getEventbus(),
Expand Down Expand Up @@ -473,7 +473,7 @@ func (tw *timingWheel) runDistributionStation(ctx context.Context) {
// batch read
events, err := tw.distributionStation.getEvent(ctx, defaultNumberOfEventsRead)
if err != nil {
if !errors.Is(err, es.ErrOnEnd) {
if !errors.Is(err, errcli.ErrOnEnd) {
log.Error(ctx, "get event failed when distribution station running", map[string]interface{}{
log.KeyError: err,
"eventbus": tw.distributionStation.getEventbus(),
Expand Down Expand Up @@ -544,6 +544,14 @@ func (tw *timingWheel) deliver(ctx context.Context, e *ce.Event) error {
defer eventbusWriter.Close()
_, err = eventbusWriter.Append(ctx, e)
if err != nil {
if errors.Is(err, errcli.ErrNotFound) {
log.Warning(ctx, "eventbus not found, discard this event", map[string]interface{}{
log.KeyError: err,
"eventbus": ebName,
"event": e.String(),
})
return nil
}
log.Error(ctx, "append failed", map[string]interface{}{
log.KeyError: err,
"eventbus": ebName,
Expand Down
12 changes: 11 additions & 1 deletion internal/timer/timingwheel/timingwheel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
ce "github.com/cloudevents/sdk-go/v2"
"github.com/golang/mock/gomock"
"github.com/linkall-labs/vanus/client/pkg/discovery/record"
errcli "github.com/linkall-labs/vanus/client/pkg/errors"
"github.com/linkall-labs/vanus/client/pkg/eventbus"
eventlog "github.com/linkall-labs/vanus/client/pkg/eventlog"
"github.com/linkall-labs/vanus/internal/kv"
Expand Down Expand Up @@ -425,7 +426,16 @@ func TestTimingWheel_deliver(t *testing.T) {
So(err, ShouldNotBeNil)
})

Convey("test timingwheel deliver failure with append failed", func() {
Convey("test timingwheel deliver failure with eventbus not found", func() {
stubs := StubFunc(&openBusWriter, mockEventbusWriter, nil)
defer stubs.Reset()
mockEventbusWriter.EXPECT().Append(ctx, gomock.Any()).Times(1).Return("", errcli.ErrNotFound)
mockEventbusWriter.EXPECT().Close().Times(1).Return()
err := tw.deliver(ctx, e)
So(err, ShouldBeNil)
})

Convey("test timingwheel deliver failure with append failed", func() {
stubs := StubFunc(&openBusWriter, mockEventbusWriter, nil)
defer stubs.Reset()
mockEventbusWriter.EXPECT().Append(ctx, gomock.Any()).Times(1).Return("", errors.New("test"))
Expand Down