Closed
Description
Snippet 1:
val o = Observable(1, 2, 3)
println("x0")
o.observeOn(Schedulers.threadPoolForComputation).subscribe(x=>println("a: " + x), t=>println(t), ()=>{println("a: done")})
println("x1")
o.observeOn(Schedulers.threadPoolForComputation).subscribe(x=>println("b: " + x), t=>println(t), ()=>{println("b: done")})
println("x2")
Thread.sleep(6000)
println("x3")
outputs, as expected, this:
x0
a: 1
a: 2
a: 3
a: done
x1
x2
b: 1
b: 2
b: 3
b: done
x3
Snippet 2:
val o = Observable(1, 2, 3).observeOn(Schedulers.threadPoolForComputation)
println("x0")
o.subscribe(x=>println("a: " + x), t=>println(t), ()=>{println("a: done")})
println("x1")
o.subscribe(x=>println("b: " + x), t=>println(t), ()=>{println("b: done")})
println("x2")
Thread.sleep(6000)
println("x3")
sometimes outputs the same as snippet 1, but sometimes only outputs this:
x0
a: 1
a: 2
a: 3
x1
a: done
x2
x3
or this
x0
x1
a: 1
a: 2
x2
a: 3
a: done
x3
(order between x and a doesn't matter, but the problem is that there are no lines with b).
Snippet 3 (= snippet 2 translated to C#)
var o = new[] { 1, 2, 3 }.ToObservable().ObserveOn(TaskPoolScheduler.Default);
Console.WriteLine("x0");
o.Subscribe(x => Console.WriteLine("a: " + x), t => Console.WriteLine(t), () => Console.WriteLine("a: done"));
Console.WriteLine("x1");
o.Subscribe(x => Console.WriteLine("b: " + x), t => Console.WriteLine(t), () => Console.WriteLine("b: done"));
Console.WriteLine("x2");
Thread.Sleep(6000);
Console.WriteLine("x3");
Console.ReadKey();
always prints something like
x0
x1
x2
b: 1
b: 2
b: 3
b: done
a: 1
a: 2
a: 3
a: done
x3
(order between a and b doesn't matter, but there are always lines with b)
So I guess that somehow observeOn
does not properly support multiple subscribers, but I couldn't find any clue in the source supporting this claim. Would be nice if somehow could take look.
Metadata
Metadata
Assignees
Labels
No labels