Skip to content

Support IObservable<'T> #4

@ptrelford

Description

@ptrelford

It might be interesting to be able to convert a Stream to an IObservable<'T>, i.e.

module Stream =
  let toObservable (stream:Stream<'T>) : System.IObservable<'T> =
     { new System.IObservable<'T> with 
        member __.Subscribe(observer:System.IObserver<'T>) =
           let (Stream streamf) = stream
           let (bulk, _) = streamf (fun value -> observer.OnNext value; true) 
           bulk ()
           observer.OnComplete()
           { new System.IDisposable with member __.Dispose() = ()}
     }

let obs = [|1..10|] |> Stream.ofArray |> Stream.toObservable
obs.Subscribe(fun x -> printfn "%d" x)

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions