Closed
Description
Hello guys,
I was in need of observable( actually a single) that is able to expire cached upstream observable after a period of time.
I was unable to bend the operators from the stdlib to my desire, so I came up with this piece of code :
public class OnSubscribeTTLCache<T> implements Observable.OnSubscribe<T>
{
private final Observable<T> source;
private final long ttl;
private final Func0<Long> clock;
private volatile Observable<T> current;
private volatile long nextExpirationTime;
public static <T> Observable<T> cache(Observable<T> source, long ttl, Func0<Long> clock)
{
return Observable.create(new OnSubscribeTTLCache<>(source, ttl, clock));
}
public OnSubscribeTTLCache(Observable<T> source, long ttl, Func0<Long> clock)
{
this.ttl = ttl;
this.source = source;
this.clock = clock;
this.current = source;
this.nextExpirationTime = Long.MIN_VALUE;
}
@Override
public void call(Subscriber<? super T> subscriber)
{
final long currentTime = clock.call();
if (currentTime > nextExpirationTime)
{
nextExpirationTime = currentTime + ttl;
current = source.cacheWithInitialCapacity(1);
}
current.unsafeSubscribe(subscriber);
}
}
Can someone have a look and tell me if I am doing it right ?