Skip to content

Latest commit

 

History

History
95 lines (61 loc) · 1.87 KB

options.md

File metadata and controls

95 lines (61 loc) · 1.87 KB

Operator Options

Most of the operators accept a list of functional options to impact the Observable behaviour.

As an example:

observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
	return i.(int) * 10, nil
}, rxgo.WithContext(),
   rxgo.WithCPUPool(),
   rxgo.WithBufferedChannel(1))

WithBufferedChannel

Configure the capacity of the output channel.

rxgo.WithBufferedChannel(1) // Create a buffered channel with a 1 capacity

WithContext

Allows passing a context. The Observable will listen to its done signal to close itself.

rxgo.WithContext(ctx)

WithObservationStrategy

  • Lazy (default): consume when an Observer starts to subscribe.
rxgo.WithObservation(rxgo.Lazy)
  • Eager: consumer when the Observable is created:
rxgo.WithObservation(rxgo.Eager)

WithErrorStrategy

  • StopOnError (default): stop processing if the Observable produces an error.
rxgo.WithErrorStrategy(rxgo.StopOnError)
  • ContinueOnError: continue processing items if the Observable produces an error.
rxgo.WithErrorStrategy(rxgo.ContinueOnError)

This strategy is propagated to the parent(s) Observable(s).

WithPool

Convert the operator in a parallel operator and specify the number of concurrent goroutines.

rxgo.WithPool(8) // Creates a pool of 8 goroutines

WithCPUPool

Convert the operator in a parallel operator and specify the number of concurrent goroutines as runtime.NumCPU().

rxgo.WithCPUPool()

Serialize

Force an Observable to produce items sequentially.

rxgo.Serialize()

This option should be used in coordination with rxgo.WithPool(n) or rxgo.WithCPUPool().

WithPublishStrategy

Create a Connectable Observable.

rxgo.WithPublishStrategy()

This option is propagated to the parent(s) Observable(s).