Skip to content

Commit

Permalink
Contributors section (#310)
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah authored Apr 6, 2021
1 parent d395954 commit 5aa0097
Showing 1 changed file with 35 additions and 29 deletions.
64 changes: 35 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Reactive Extensions for the Go Language

[ReactiveX](http://reactivex.io/), or Rx for short, is an API for programming with Observable streams. This is the official ReactiveX API for the Go language.

ReactiveX is a new, alternative way of asynchronous programming to callbacks, promises and deferred. It is about processing streams of events or items, with events being any occurrences or changes within the system. A stream of events is called an [Observable](http://reactivex.io/documentation/contract.html).
ReactiveX is a new, alternative way of asynchronous programming to callbacks, promises, and deferred. It is about processing streams of events or items, with events being any occurrences or changes within the system. A stream of events is called an [Observable](http://reactivex.io/documentation/contract.html).

An operator is a function that defines an Observable, how and when it should emit data. The list of operators covered is available [here](README.md#supported-operators-in-rxgo).

Expand All @@ -19,10 +19,10 @@ The RxGo implementation is based on the concept of [pipelines](https://blog.gola

![](doc/rx.png)

Let's see at a concrete example with each box being an operator:
* We create a static Observable based on a fixed list of items using `Just` operator.
* We define a transformation function (convert a circle into a square) using `Map` operator.
* We filter each yellow square using `Filter` operator.
Let's see a concrete example with each box being an operator:
* We create a static Observable based on a fixed list of items using the `Just` operator.
* We define a transformation function (convert a circle into a square) using the `Map` operator.
* We filter each yellow square using the `Filter` operator.

In this example, the final items are sent in a channel, available to a consumer. There are many ways to consume or to produce data using RxGo. Publishing the results in a channel is only one of them.

Expand All @@ -49,9 +49,9 @@ item := <-ch
fmt.Println(item.V)
```

The `Just` operator creates an Observable from a static list of items. `Of(value)` creates an item from a given value. If we want to create an item from an error, we have to use `Error(err)`. This is a difference with the v1 that was accepting directly a value or an error without having to wrap it. What's the rationale for this change? It is to prepare RxGo for the generics feature coming (hopefully) in Go 2.
The `Just` operator creates an Observable from a static list of items. `Of(value)` creates an item from a given value. If we want to create an item from an error, we have to use `Error(err)`. This is a difference with the v1 that was accepting a value or an error directly without having to wrap it. What's the rationale for this change? It is to prepare RxGo for the generics feature coming (hopefully) in Go 2.

By the way, the `Just` operator uses currying as a syntactic sugar. This way, it accepts multiple items in the first parameter list and multiple options in the second parameter list. We'll see below how to specify options.
By the way, the `Just` operator uses currying as syntactic sugar. This way, it accepts multiple items in the first parameter list and multiple options in the second parameter list. We'll see below how to specify options.

Once the Observable is created, we can observe it using `Observe()`. By default, an Observable is lazy in the sense that it emits items only once a subscription is made. `Observe()` returns a `<-chan rxgo.Item`.

Expand All @@ -69,7 +69,7 @@ fmt.Println(item.V)

`item.Error()` returns a boolean indicating whether an item contains an error. Then, we use either `item.E` to get the error or `item.V` to get the value.

By default, an Observable is stopped once an error is produced. However, there are special operators to deal with errors (e.g. `OnError`, `Retry`, etc.)
By default, an Observable is stopped once an error is produced. However, there are special operators to deal with errors (e.g., `OnError`, `Retry`, etc.)

It is also possible to consume items using callbacks:

Expand All @@ -83,7 +83,7 @@ observable.ForEach(func(v interface{}) {
})
```

In this example, we passed 3 functions:
In this example, we passed three functions:
* A `NextFunc` triggered when a value item is emitted.
* An `ErrFunc` triggered when an error item is emitted.
* A `CompletedFunc` triggered once the Observable is completed.
Expand All @@ -106,7 +106,7 @@ type Customer struct {
}
```

We create an producer that will emit `Customer`s to a given `chan rxgo.Item` and create an Observable from it:
We create a producer that will emit `Customer`s to a given `chan rxgo.Item` and create an Observable from it:
```go
// Create the input channel
ch := make(chan rxgo.Item)
Expand All @@ -119,10 +119,10 @@ observable := rxgo.FromChannel(ch)

Then, we need to perform the two following operations:
* Filter the customers whose age is below 18.
* Enrich each customer with a tax number. Retrieving a tax number is done for example by an IO-bound function doing an external REST call.
* Enrich each customer with a tax number. Retrieving a tax number is done, for example, by an IO-bound function doing an external REST call.

As the enriching step is IO-bound, it might be interesting to parallelize it within a given pool of goroutines.
Yet, for some reason, all the `Customer` items need to be produced sequentially based on its `ID`.
Yet, let's imagine that all the `Customer` items need to be produced sequentially based on its `ID`.

```go
observable.
Expand Down Expand Up @@ -162,7 +162,7 @@ for customer := range observable.Observe() {

## Observable Types

### Hot vs Cold Observables
### Hot vs. Cold Observables

In the Rx world, there is a distinction between cold and hot Observables. When the data is produced by the Observable itself, it is a cold Observable. When the data is produced outside the Observable, it is a hot Observable. Usually, when we don't want to create a producer over and over again, we favour a hot Observable.

Expand Down Expand Up @@ -199,7 +199,7 @@ The result of this execution is:
2
```

It means, the first Observer already consumed all items. And nothing left for others.
It means the first Observer already consumed all items. And nothing left for others.
Though this behavior can be altered with [Connectable](#connectable-observable) Observables.
The main point here is the goroutine produced those items.

Expand Down Expand Up @@ -234,17 +234,17 @@ Now, the result is:
2
```

In the case of a cold observable, the stream was created independently for every observer.
In the case of a cold observable, the stream was created independently for every Observer.

Again, **hot** vs **cold** Observables are not about how you consume items, it's about where data is produced.
Good example for hot Observable are price ticks from a trading exchange.
And if you teach an Observable to fetch products from a database, then yield them one by one, you will create the **cold** Observable.

### Backpressure

There is another operator called `FromEventSource` that creates an Observable from a channel. The difference between `FromChannel` operator is that as soon as the Observable is created, it starts to emit items regardless if there is an Observer or not. Hence, the items emitted by an Observable without Observer(s) are lost (whilst they are buffered with `FromChannel` operator).
There is another operator called `FromEventSource` that creates an Observable from a channel. The difference between `FromChannel` operator is that as soon as the Observable is created, it starts to emit items regardless if there is an Observer or not. Hence, the items emitted by an Observable without Observer(s) are lost (while they are buffered with `FromChannel` operator).

A use case with `FromEventSource` operator is for example telemetry. We may not be interested in all the data produced from the very beginning of a stream. Only the data since we started to observe it.
A use case with `FromEventSource` operator is, for example, telemetry. We may not be interested in all the data produced from the very beginning of a stream—only the data since we started to observe it.

Once we start observing an Observable created with `FromEventSource`, we can configure the backpressure strategy. By default, it is blocking (there is a guaranteed delivery for the items emitted after we observe it). We can override this strategy this way:

Expand All @@ -262,17 +262,17 @@ observable.Map(transform, rxgo.WithBufferedChannel(42))

Each operator has an `opts ...Option` parameter allowing to pass such options.

### Lazy vs Eager Observation
### Lazy vs. Eager Observation

The default observation strategy is lazy. It means the items emitted by an Observable are processed by an operator once we start observing it. We can change this behaviour this way:
The default observation strategy is lazy. It means an operator processes the items emitted by an Observable once we start observing it. We can change this behaviour this way:

```go
observable := rxgo.FromChannel(ch).Map(transform, rxgo.WithObservationStrategy(rxgo.Eager))
```

In this case, the `Map` operator is triggered whenever an item is produced even without any Observer.
In this case, the `Map` operator is triggered whenever an item is produced, even without any Observer.

### Sequential vs Parallel Operators
### Sequential vs. Parallel Operators

By default, each operator is sequential. One operator being one goroutine instance. We can override it using the following option:

Expand Down Expand Up @@ -321,9 +321,9 @@ If `observable` was not a Connectable Observable, as `DoOnNext` creates an Obser
observable.Connect()
```

Once `Connect()` is called, the Connectable Observable begin to emit items.
Once `Connect()` is called, the Connectable Observable begins to emit items.

There is another important change with a regular Observable. A Connectable Observable publishes its items. It means, all the Observers receive a copy of the items.
There is another important change with a regular Observable. A Connectable Observable publishes its items. It means all the Observers receive a copy of the items.

Here is an example with a regular Observable:

Expand Down Expand Up @@ -398,7 +398,7 @@ Second observer: 2
Second observer: 3
```

### Observable, Single and Optional Single
### Observable, Single, and Optional Single

An Iterable is an object that can be observed using `Observe(opts ...Option) <-chan Item`.

Expand All @@ -420,8 +420,8 @@ How to use the [assert API](doc/assert.md) to write unit tests while using RxGo.
[Operator options](doc/options.md)

### Creating Observables
* [Create](doc/create.md) — create an Observable from scratch by calling observer methods programmatically
* [Defer](doc/defer.md) — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
* [Create](doc/create.md) — create an Observable from scratch by calling Observer methods programmatically
* [Defer](doc/defer.md) — do not create the Observable until the Observer subscribes, and create a fresh Observable for each Observer
* [Empty](doc/empty.md)/[Never](doc/never.md)/[Thrown](doc/thrown.md) — create Observables that have very precise and limited behaviour
* [FromChannel](doc/fromchannel.md) — create an Observable based on a lazy channel
* [FromEventSource](doc/fromeventsource.md) — create an Observable based on an eager channel
Expand Down Expand Up @@ -449,8 +449,8 @@ How to use the [assert API](doc/assert.md) to write unit tests while using RxGo.
* [Distinct](doc/distinct.md)/[DistinctUntilChanged](doc/distinctuntilchanged.md) — suppress duplicate items emitted by an Observable
* [ElementAt](doc/elementat.md) — emit only item n emitted by an Observable
* [Filter](doc/filter.md) — emit only those items from an Observable that pass a predicate test
* [Find](doc/find.md) — emit the first item passing a predicate then complete
* [First](doc/first.md)/[FirstOrDefault](doc/firstordefault.md) — emit only the first item or the first item that meets a condition, from an Observable
* [Find](doc/find.md) — emit the first item passing a predicate, then complete
* [First](doc/first.md)/[FirstOrDefault](doc/firstordefault.md) — emit only the first item or the first item that meets a condition from an Observable
* [IgnoreElements](doc/ignoreelements.md) — do not emit any items from an Observable but mirror its termination notification
* [Last](doc/last.md)/[LastOrDefault](doc/lastordefault.md) — emit only the last item emitted by an Observable
* [Sample](doc/sample.md) — emit the most recent item emitted by an Observable within periodic time intervals
Expand Down Expand Up @@ -502,10 +502,16 @@ How to use the [assert API](doc/assert.md) to write unit tests while using RxGo.
* [Errors](doc/errors.md) — return all the errors thrown by an observable
* [ToMap](doc/tomap.md)/[ToMapWithValueSelector](doc/tomapwithvalueselector.md)/[ToSlice](doc/toslice.md) — convert an Observable into another object or data structure

## Contributions
## Contributing

All contributions are very welcome! Be sure you check out the [contributing guidelines](CONTRIBUTING.md) first. Also, if you publish a post about RxGo, please let us know. We would be glad to include it in the [External Resources](#external-resources) section.

Thanks to all the people who already contributed to RxGo!

<a href="https://github.com/ReactiveX/RxGo/graphs/contributors">
<img src="https://contrib.rocks/image?repo=ReactiveX/RxGo" />
</a>

## External Resources

* [Announcing RxGo v2](https://teivah.medium.com/introducing-rxgo-v2-e7e369faa99a)
Expand Down

0 comments on commit 5aa0097

Please sign in to comment.