Description
The buffer only gets truncated when a new item is added. So any subscribers will always receive that last emission even if it's older than the time-limit.
'Observable<Long> retrofit = Observable.interval(2L, 2L, TimeUnit.SECONDS).take(10);
ConnectableObservable<Long> connectableRetrofit;
connectableRetrofit = retrofit.replay(1, 5, TimeUnit.SECONDS);
System.out.println("Subscribing A");
connectableRetrofit.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("subscriber! A ========== " + aLong);
}
});
System.out.println("Connecting!");
connectableRetrofit.connect();
Thread.sleep(10000L);
System.out.println("Subscribing B");
connectableRetrofit.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("subscriber! B ========== " + aLong);
}
});
Thread.sleep(30000L);
System.out.println("Subscribing C");
connectableRetrofit.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("subscriber! C ========== " + aLong);
}
});`
Here subscriber C is subbing after 30 secs so you'd expect the buffer to have truncated the last value since the time-limit is 5 secs.
This is the output:
Subscribing A
Connecting!
subscriber! A ========== 0
subscriber! A ========== 1
subscriber! A ========== 2
subscriber! A ========== 3
subscriber! A ========== 4
Subscribing B
subscriber! B ========== 4
subscriber! A ========== 5
subscriber! B ========== 5
subscriber! A ========== 6
subscriber! B ========== 6
subscriber! A ========== 7
subscriber! B ========== 7
subscriber! A ========== 8
subscriber! B ========== 8
subscriber! A ========== 9
subscriber! B ========== 9
Subscribing C
subscriber! C ========== 9