diff --git a/README.md b/README.md index 63401798..4c0a9120 100644 --- a/README.md +++ b/README.md @@ -318,6 +318,7 @@ In this example, we create a pool of 32 goroutines that consume items concurrent * [Run](doc/run.md) — create an Observer without consuming the emitted items * [Send](doc/send.md) — send the Observable items in a specific channel * [Serialize](doc/serialize.md) — force an Observable to make serialized calls and to be well-behaved +* [Timestamp](doc/timestamp.md) — attach a timestamp to each item emitted by an Observable ### Conditional and Boolean Operators * [All](doc/all.md) — determine whether all items emitted by an Observable meet some criteria diff --git a/doc/timestamp.md b/doc/timestamp.md new file mode 100644 index 00000000..601f555c --- /dev/null +++ b/doc/timestamp.md @@ -0,0 +1,40 @@ +# Timestamp Operator + +## Overview + +Determine whether all items emitted by an Observable meet some criteria. + +![](http://reactivex.io/documentation/operators/images/timestamp.c.png) + +## Example + +```go +observe := rxgo.Just([]interface{}{1, 2, 3}).Timestamp().Observe() +var timestampItem rxgo.TimestampItem +timestampItem = (<-observe).V.(rxgo.TimestampItem) +fmt.Println(timestampItem) +``` + +Output: + +``` +{2020-02-23 15:26:02.231197 +0000 UTC 1} +``` + +## Options + +### WithBufferedChannel + +[Detail](options.md#withbufferedchannel) + +### WithContext + +[Detail](options.md#withcontext) + +### WithObservationStrategy + +[Detail](options.md#withobservationstrategy) + +### WithErrorStrategy + +[Detail](options.md#witherrorstrategy) \ No newline at end of file diff --git a/item.go b/item.go index 98b7d634..70d31ca5 100644 --- a/item.go +++ b/item.go @@ -3,6 +3,7 @@ package rxgo import ( "context" "reflect" + "time" ) type ( @@ -12,6 +13,12 @@ type ( E error } + // TimestampItem attach a timestamp to an item. + TimestampItem struct { + Timestamp time.Time + V interface{} + } + // CloseChannelStrategy indicates a strategy on whether to close a channel. CloseChannelStrategy uint32 ) diff --git a/observable.go b/observable.go index 3be1a677..16b5b9f0 100644 --- a/observable.go +++ b/observable.go @@ -70,6 +70,7 @@ type Observable interface { TakeLast(nth uint, opts ...Option) Observable TakeUntil(apply Predicate, opts ...Option) Observable TakeWhile(apply Predicate, opts ...Option) Observable + Timestamp(opts ...Option) Observable ToMap(keySelector Func, opts ...Option) Single ToMapWithValueSelector(keySelector, valueSelector Func, opts ...Option) Single ToSlice(initialCapacity int, opts ...Option) ([]interface{}, error) diff --git a/observable_operator.go b/observable_operator.go index 6d8a3e96..6790ae75 100644 --- a/observable_operator.go +++ b/observable_operator.go @@ -2333,6 +2333,33 @@ func (op *takeWhileOperator) end(_ context.Context, _ chan<- Item) { func (op *takeWhileOperator) gatherNext(_ context.Context, _ Item, _ chan<- Item, _ operatorOptions) { } +// Timestamp attaches a timestamp to each item emitted by an Observable indicating when it was emitted. +func (o *ObservableImpl) Timestamp(opts ...Option) Observable { + return observable(o, func() operator { + return ×tampOperator{} + }, true, false, opts...) +} + +type timestampOperator struct { +} + +func (op *timestampOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { + dst <- Of(TimestampItem{ + Timestamp: time.Now().UTC(), + V: item.V, + }) +} + +func (op *timestampOperator) err(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { + defaultErrorFuncOperator(ctx, item, dst, operatorOptions) +} + +func (op *timestampOperator) end(ctx context.Context, dst chan<- Item) { +} + +func (op *timestampOperator) gatherNext(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) { +} + // ToMap convert the sequence of items emitted by an Observable // into a map keyed by a specified key function. // Cannot be run in parallel. diff --git a/observable_operator_test.go b/observable_operator_test.go index b5b338e5..d7f4ea10 100644 --- a/observable_operator_test.go +++ b/observable_operator_test.go @@ -1289,6 +1289,23 @@ func Test_Observable_TakeWhile(t *testing.T) { Assert(context.Background(), t, obs, HasItems(1, 2)) } +func Test_Observable_Timestamp(t *testing.T) { + observe := testObservable(1, 2, 3).Timestamp().Observe() + v := (<-observe).V.(TimestampItem) + assert.Equal(t, 1, v.V) + v = (<-observe).V.(TimestampItem) + assert.Equal(t, 2, v.V) + v = (<-observe).V.(TimestampItem) + assert.Equal(t, 3, v.V) +} + +func Test_Observable_Error(t *testing.T) { + observe := testObservable(1, errFoo).Timestamp().Observe() + v := (<-observe).V.(TimestampItem) + assert.Equal(t, 1, v.V) + assert.True(t, (<-observe).Error()) +} + func Test_Observable_ToMap(t *testing.T) { obs := testObservable(3, 4, 5, true, false).ToMap(func(_ context.Context, i interface{}) (interface{}, error) { switch v := i.(type) {