-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Closed
Labels
Description
Observable.observeOn() has an unbounded buffer and ignores the bufferSize parameter.
Why does Observable.observeOn() exist in the first place ? I think schedulers only make sense for Flowable.
This code crashes as expected:
Flowable.create<Int>({
for (i in 0..100) {
it.onNext(i)
}
}, FlowableEmitter.BackpressureMode.NONE)
.observeOn(Schedulers.computation(), false, 1)
.subscribe {
Thread.sleep(100)
}This does not crash, I think this should crash :
Observable.create<Int> {
for (i in 0..1000) {
it.onNext(i)
}
}
.observeOn(Schedulers.computation(), false, 1)
.subscribe {
Thread.sleep(100)
}The code from above is written in Kotlin.