Skip to content

Commit

Permalink
Timestamp operator
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Feb 23, 2020
1 parent 14a4579 commit 5b6d45d
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ In this example, we create a pool of 32 goroutines that consume items concurrent
* [Run](doc/run.md) — create an Observer without consuming the emitted items
* [Send](doc/send.md) — send the Observable items in a specific channel
* [Serialize](doc/serialize.md) — force an Observable to make serialized calls and to be well-behaved
* [Timestamp](doc/timestamp.md) — attach a timestamp to each item emitted by an Observable

### Conditional and Boolean Operators
* [All](doc/all.md) — determine whether all items emitted by an Observable meet some criteria
Expand Down
40 changes: 40 additions & 0 deletions doc/timestamp.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Timestamp Operator

## Overview

Determine whether all items emitted by an Observable meet some criteria.

![](http://reactivex.io/documentation/operators/images/timestamp.c.png)

## Example

```go
observe := rxgo.Just([]interface{}{1, 2, 3}).Timestamp().Observe()
var timestampItem rxgo.TimestampItem
timestampItem = (<-observe).V.(rxgo.TimestampItem)
fmt.Println(timestampItem)
```

Output:

```
{2020-02-23 15:26:02.231197 +0000 UTC 1}
```

## Options

### WithBufferedChannel

[Detail](options.md#withbufferedchannel)

### WithContext

[Detail](options.md#withcontext)

### WithObservationStrategy

[Detail](options.md#withobservationstrategy)

### WithErrorStrategy

[Detail](options.md#witherrorstrategy)
7 changes: 7 additions & 0 deletions item.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rxgo
import (
"context"
"reflect"
"time"
)

type (
Expand All @@ -12,6 +13,12 @@ type (
E error
}

// TimestampItem attach a timestamp to an item.
TimestampItem struct {
Timestamp time.Time
V interface{}
}

// CloseChannelStrategy indicates a strategy on whether to close a channel.
CloseChannelStrategy uint32
)
Expand Down
1 change: 1 addition & 0 deletions observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Observable interface {
TakeLast(nth uint, opts ...Option) Observable
TakeUntil(apply Predicate, opts ...Option) Observable
TakeWhile(apply Predicate, opts ...Option) Observable
Timestamp(opts ...Option) Observable
ToMap(keySelector Func, opts ...Option) Single
ToMapWithValueSelector(keySelector, valueSelector Func, opts ...Option) Single
ToSlice(initialCapacity int, opts ...Option) ([]interface{}, error)
Expand Down
27 changes: 27 additions & 0 deletions observable_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2333,6 +2333,33 @@ func (op *takeWhileOperator) end(_ context.Context, _ chan<- Item) {
func (op *takeWhileOperator) gatherNext(_ context.Context, _ Item, _ chan<- Item, _ operatorOptions) {
}

// Timestamp attaches a timestamp to each item emitted by an Observable indicating when it was emitted.
func (o *ObservableImpl) Timestamp(opts ...Option) Observable {
return observable(o, func() operator {
return &timestampOperator{}
}, true, false, opts...)
}

type timestampOperator struct {
}

func (op *timestampOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
dst <- Of(TimestampItem{
Timestamp: time.Now().UTC(),
V: item.V,
})
}

func (op *timestampOperator) err(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
defaultErrorFuncOperator(ctx, item, dst, operatorOptions)
}

func (op *timestampOperator) end(ctx context.Context, dst chan<- Item) {
}

func (op *timestampOperator) gatherNext(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
}

// ToMap convert the sequence of items emitted by an Observable
// into a map keyed by a specified key function.
// Cannot be run in parallel.
Expand Down
17 changes: 17 additions & 0 deletions observable_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1289,6 +1289,23 @@ func Test_Observable_TakeWhile(t *testing.T) {
Assert(context.Background(), t, obs, HasItems(1, 2))
}

func Test_Observable_Timestamp(t *testing.T) {
observe := testObservable(1, 2, 3).Timestamp().Observe()
v := (<-observe).V.(TimestampItem)
assert.Equal(t, 1, v.V)
v = (<-observe).V.(TimestampItem)
assert.Equal(t, 2, v.V)
v = (<-observe).V.(TimestampItem)
assert.Equal(t, 3, v.V)
}

func Test_Observable_Error(t *testing.T) {
observe := testObservable(1, errFoo).Timestamp().Observe()
v := (<-observe).V.(TimestampItem)
assert.Equal(t, 1, v.V)
assert.True(t, (<-observe).Error())
}

func Test_Observable_ToMap(t *testing.T) {
obs := testObservable(3, 4, 5, true, false).ToMap(func(_ context.Context, i interface{}) (interface{}, error) {
switch v := i.(type) {
Expand Down

0 comments on commit 5b6d45d

Please sign in to comment.