Skip to content

Commit

Permalink
Connectable Observable documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Feb 25, 2020
1 parent 2d8d4aa commit 2a2752b
Show file tree
Hide file tree
Showing 56 changed files with 416 additions and 55 deletions.
110 changes: 109 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,119 @@ observable.Map(transform, rxgo.WithPool(32))

In this example, we create a pool of 32 goroutines that consume items concurrently from the same channel. If the operation is CPU-bound, we can use the `WithCPUPool()` option that creates a pool based on the number of logical CPUs.

### Connectable Observable

A Connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when its connect() method is called. In this way, you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items.

Let's create a Connectable Observable using `rxgo.WithPublishStrategy`:

```go
ch := make(chan rxgo.Item)
go func() {
ch <- rxgo.Of(1)
ch <- rxgo.Of(2)
ch <- rxgo.Of(3)
close(ch)
}()
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())
```

Then, we create two Observers:

```go
observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
return i.(int) + 1, nil
}).DoOnNext(func(i interface{}) {
fmt.Printf("First observer: %d\n", i)
})

observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
return i.(int) * 2, nil
}).DoOnNext(func(i interface{}) {
fmt.Printf("Second observer: %d\n", i)
})
```

If `observable` was not a Connectable Observable, as `DoOnNext` creates an Observer, the source Observable would have begun emitting items. Yet, in the case of a Connectable Observable, we have to call `Connect()`:

```go
observable.Connect()
```

Once `Connect()` is called, the Connectable Observable begin to emit items.

There is another important change with a regular Observable. A Connectable Observable publishes its items. It means, all the Observers receive a copy of the items.

Here is an example with a regular Observable:

```go
// Create a regular Observable
ch := make(chan rxgo.Item)
go func() {
ch <- rxgo.Of(1)
ch <- rxgo.Of(2)
ch <- rxgo.Of(3)
close(ch)
}()
observable := rxgo.FromChannel(ch)

// Create the first Observer
observable.DoOnNext(func(i interface{}) {
fmt.Printf("First observer: %d\n", i)
})

// Create the second Observer
observable.DoOnNext(func(i interface{}) {
fmt.Printf("Second observer: %d\n", i)
})
```

```
First observer: 1
First observer: 2
First observer: 3
```

Now, with a Connectable Observable:

```go
// Create a regular Observable
ch := make(chan rxgo.Item)
go func() {
ch <- rxgo.Of(1)
ch <- rxgo.Of(2)
ch <- rxgo.Of(3)
close(ch)
}()
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

// Create the first Observer
observable.DoOnNext(func(i interface{}) {
fmt.Printf("First observer: %d\n", i)
})

// Create the second Observer
observable.DoOnNext(func(i interface{}) {
fmt.Printf("Second observer: %d\n", i)
})

observable.Connect()
```

```
Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3
```

## Documentation

### Assert API

How to use the [assert API](doc/assert.md) to write unit tests while using RxGo
How to use the [assert API](doc/assert.md) to write unit tests while using RxGo.

### Operator Options

Expand Down
35 changes: 35 additions & 0 deletions a_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package rxgo_test

import (
"fmt"
"github.com/reactivex/rxgo/v2"
"testing"
"time"
)

func TestMap(t *testing.T) {
// Create a regular Observable
ch := make(chan rxgo.Item)
go func() {
ch <- rxgo.Of(1)
ch <- rxgo.Of(2)
ch <- rxgo.Of(3)
close(ch)
}()
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

// Create the first Observer
observable.DoOnNext(func(i interface{}) {
fmt.Printf("First observer: %d\n", i)
})

// Create the second Observer
observable.DoOnNext(func(i interface{}) {
fmt.Printf("Second observer: %d\n", i)
})

observable.Connect()

time.Sleep(500 * time.Millisecond)

}
6 changes: 5 additions & 1 deletion doc/all.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,8 @@ true

### WithCPUPool

[Detail](options.md#withcpupool)
[Detail](options.md#withcpupool)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/average.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,8 @@ Output:

### WithCPUPool

[Detail](options.md#withcpupool)
[Detail](options.md#withcpupool)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/backoffretry.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,8 @@ foo

### WithErrorStrategy

[Detail](options.md#witherrorstrategy)
[Detail](options.md#witherrorstrategy)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/buffer.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,8 @@ Output:

### WithErrorStrategy

[Detail](options.md#witherrorstrategy)
[Detail](options.md#witherrorstrategy)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/combinelatest.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,8 @@ Output:

### WithErrorStrategy

[Detail](options.md#witherrorstrategy)
[Detail](options.md#witherrorstrategy)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/concat.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ Output:

### WithErrorStrategy

[Detail](options.md#witherrorstrategy)
[Detail](options.md#witherrorstrategy)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/contains.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ true

### WithErrorStrategy

[Detail](options.md#witherrorstrategy)
[Detail](options.md#witherrorstrategy)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/count.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ Output:

### WithErrorStrategy

[Detail](options.md#witherrorstrategy)
[Detail](options.md#witherrorstrategy)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/create.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,8 @@ Output:

### WithErrorStrategy

[Detail](options.md#witherrorstrategy)
[Detail](options.md#witherrorstrategy)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/debounce.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ Output: each item emitted by the Observable if not item has been emitted after 2

### WithCPUPool

[Detail](options.md#withcpupool)
[Detail](options.md#withcpupool)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/defaultifempty.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ Output:

### WithErrorStrategy

[Detail](options.md#witherrorstrategy)
[Detail](options.md#witherrorstrategy)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/distinct.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,8 @@ Output:

### Serialize

[Detail](options.md#serialize)
[Detail](options.md#serialize)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/distinctuntilchanged.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ Output:

### WithErrorStrategy

[Detail](options.md#witherrorstrategy)
[Detail](options.md#witherrorstrategy)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/elementat.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ Output:

### WithErrorStrategy

[Detail](options.md#witherrorstrategy)
[Detail](options.md#witherrorstrategy)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/filter.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ Output:

### Serialize

[Detail](options.md#serialize)
[Detail](options.md#serialize)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/first.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@ true

### WithObservationStrategy

[Detail](options.md#withobservationstrategy)
[Detail](options.md#withobservationstrategy)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/firstordefault.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@ Output:

### WithObservationStrategy

[Detail](options.md#withobservationstrategy)
[Detail](options.md#withobservationstrategy)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/flatmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,8 @@ Output:

### WithCPUPool

[Detail](options.md#withcpupool)
[Detail](options.md#withcpupool)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/fromchannel.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,8 @@ The items are consumed when an Observer subscribes.
```go
ch := make(chan rxgo.Item)
observable := rxgo.FromChannel(ch)
```
```

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
6 changes: 5 additions & 1 deletion doc/groupby.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ item: 8

### WithErrorStrategy

[Detail](options.md#witherrorstrategy)
[Detail](options.md#witherrorstrategy)

### WithPublishStrategy

[Detail](options.md#withpublishstrategy)
Loading

0 comments on commit 2a2752b

Please sign in to comment.