-
Notifications
You must be signed in to change notification settings - Fork 46
Open
Labels
Description
It might be interesting to be able to convert a Stream to an IObservable<'T>, i.e.
module Stream =
let toObservable (stream:Stream<'T>) : System.IObservable<'T> =
{ new System.IObservable<'T> with
member __.Subscribe(observer:System.IObserver<'T>) =
let (Stream streamf) = stream
let (bulk, _) = streamf (fun value -> observer.OnNext value; true)
bulk ()
observer.OnComplete()
{ new System.IDisposable with member __.Dispose() = ()}
}
let obs = [|1..10|] |> Stream.ofArray |> Stream.toObservable
obs.Subscribe(fun x -> printfn "%d" x)