Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var (
cancelProto = status.FromContextError(context.Canceled).Proto()
deadlineProto = status.FromContextError(context.DeadlineExceeded).Proto()

filterErr = errors.New(filterErrMsg)
filterErr = errors.New(filterErrMsg, j.C("ERR_d5e8f7a9b2c3d4e5"))
)

// IsStoppedErr checks whether err is an ErrStopped
Expand Down Expand Up @@ -60,5 +60,5 @@ func IsFilterErr(err error) bool {
}

func asFilterErr(err error) error {
return errors.Wrap(err, filterErrMsg)
return errors.Wrap(err, filterErrMsg, j.C("ERR_d5e8f7a9b2c3d4e5"))
}
9 changes: 5 additions & 4 deletions filters/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ const (
)

var (
metadataEventFilterErr = errors.New(metadataEventFilterErrMsg)
deserializationErr = errors.New(deserializationErrMsg)
metadataEventFilterErr = errors.New(metadataEventFilterErrMsg, j.C("ERR_c1f2e3d4a5b6c7d8"))
deserializationErr = errors.New(deserializationErrMsg, j.C("ERR_a1b2c3d4e5f6a7b8"))
)

func MetadataEventFilter[T any](ds Deserializer[T], flt DataFilter[T]) (reflex.EventFilter, error) {
Expand All @@ -45,7 +45,7 @@ func IsDeserializationErr(err error) bool {
}

func asDeserializationErr(err error) error {
return errors.Wrap(err, deserializationErrMsg)
return errors.Wrap(err, deserializationErrMsg, j.C("ERR_a1b2c3d4e5f6a7b8"))
}

// IsMetadataEventFilterErr returns true if the error occurred during construction of a MetadataEventFilter.
Expand All @@ -54,5 +54,6 @@ func IsMetadataEventFilterErr(err error) bool {
}

func makeMetadataEventFilterErr(ol ...errors.Option) error {
return errors.New(metadataEventFilterErrMsg, ol...)
opts := append([]errors.Option{j.C("ERR_c1f2e3d4a5b6c7d8")}, ol...)
return errors.New(metadataEventFilterErrMsg, opts...)
}
16 changes: 11 additions & 5 deletions filters/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@ import (
"testing"

"github.com/luno/jettison/errors"
"github.com/luno/jettison/j"
"github.com/luno/jettison/jtest"
"github.com/stretchr/testify/require"

"github.com/luno/reflex"
)

var (
testBadMetadataErr = errors.New("bad metadata", j.C("ERR_test_bad_metadata"))
testBadFilterErr = errors.New("bad filter", j.C("ERR_test_bad_filter"))
)

func TestMakeMetadataEventFilter(t *testing.T) {
type testCase struct {
name string
Expand Down Expand Up @@ -74,12 +80,12 @@ func TestMetadataEventFilter(t *testing.T) {
name: "Deserializer errors",
e: &reflex.Event{MetaData: m},
ds: func(x *testing.T) Deserializer[string] {
return func(b []byte) (string, error) { require.Equal(x, m, b); return string(b), errors.New("bad metadata") }
return func(b []byte) (string, error) { require.Equal(x, m, b); return string(b), testBadMetadataErr }
},
flt: func(x *testing.T) DataFilter[string] {
return func(s string) (bool, error) { require.Fail(x, "should not be reached"); return true, nil }
},
err: []error{deserializationErr, errors.New("bad metadata")},
err: []error{deserializationErr, testBadMetadataErr},
},
{
name: "Data Filter errors",
Expand All @@ -88,9 +94,9 @@ func TestMetadataEventFilter(t *testing.T) {
return func(b []byte) (string, error) { require.Equal(x, m, b); return string(b), nil }
},
flt: func(x *testing.T) DataFilter[string] {
return func(s string) (bool, error) { require.Equal(x, d, s); return true, errors.New("bad filter") }
return func(s string) (bool, error) { require.Equal(x, d, s); return true, testBadFilterErr }
},
err: []error{errors.New("bad filter")},
err: []error{testBadFilterErr},
},
{
name: "Exclude",
Expand Down Expand Up @@ -159,7 +165,7 @@ func TestIsDeserializationErr(t *testing.T) {
{
name: "constructed deserialization error",
err: errors.New(deserializationErrMsg),
want: true,
want: false,
},
}
for _, tt := range tests {
Expand Down
7 changes: 5 additions & 2 deletions rpatterns/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ type batchEvent struct {

const minWait = time.Millisecond * 100

var ErrBatchState = errors.New("batch error state", j.C("ERR_b3053f5f1a3ecd23"))
var (
ErrBatchState = errors.New("batch error state", j.C("ERR_b3053f5f1a3ecd23"))
ErrInvalidBatchConfig = errors.New("batchPeriod or batchLen must be non-zero", j.C("ERR_invalid_batch_config"))
)

// BatchConsumer provides a reflex consumer that buffers events
// and flushes a batch to the consume function when either
Expand Down Expand Up @@ -95,7 +98,7 @@ func (c *BatchConsumer) Stop() error {
// enqueue adds the event to the buffer or returns error if batch needs to be reset.
func (c *BatchConsumer) enqueue(ctx context.Context, e *AckEvent) error {
if c.flushPeriod == 0 && c.flushLen == 0 {
return errors.New("batchPeriod or batchLen must be non-zero")
return ErrInvalidBatchConfig
}

// Add event to batch queue
Expand Down
7 changes: 3 additions & 4 deletions rpatterns/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ func TestRunBatchConsumer(t *testing.T) {
}

func TestReset(t *testing.T) {
recvEndErr := errors.New("recv error")
tests := []struct {
name string
batchLen int
Expand Down Expand Up @@ -146,13 +145,13 @@ func TestReset(t *testing.T) {
spec := rpatterns.NewBatchSpec(b.Stream, consumer)
ctx := context.Background()
err := reflex.Run(ctx, spec)
jtest.Assert(t, recvEndErr, err)
jtest.Assert(t, errEvents, err)

events = ItoEList(tt.passEvents...)
b.events = events

err = reflex.Run(ctx, spec)
jtest.Assert(t, recvEndErr, err)
jtest.Assert(t, errEvents, err)
})
}
}
Expand All @@ -171,7 +170,7 @@ func TestInvalidConfig(t *testing.T) {
spec := rpatterns.NewBatchSpec(b.Stream, consumer)
ctx := context.Background()
err := reflex.Run(ctx, spec)
jtest.Assert(t, errors.New("batchPeriod or batchLen must be non-zero"), err)
jtest.Assert(t, rpatterns.ErrInvalidBatchConfig, err)
}

type EventList struct {
Expand Down