Skip to content

Cache with expiration in stdlib ?  #4033

Closed
@bobymicroby

Description

@bobymicroby

Hello guys,

I was in need of observable( actually a single) that is able to expire cached upstream observable after a period of time.

I was unable to bend the operators from the stdlib to my desire, so I came up with this piece of code :

public class OnSubscribeTTLCache<T> implements Observable.OnSubscribe<T>
{

    private final Observable<T> source;
    private final long ttl;
    private final Func0<Long> clock;

    private volatile Observable<T> current;
    private volatile long nextExpirationTime;

    public static <T> Observable<T> cache(Observable<T> source, long ttl, Func0<Long> clock)
    {
        return Observable.create(new OnSubscribeTTLCache<>(source, ttl, clock));
    }

    public OnSubscribeTTLCache(Observable<T> source, long ttl, Func0<Long> clock)
    {
        this.ttl = ttl;
        this.source = source;
        this.clock = clock;
        this.current = source;
        this.nextExpirationTime = Long.MIN_VALUE;
    }

    @Override
    public void call(Subscriber<? super T> subscriber)
    {
        final long currentTime = clock.call();

        if (currentTime > nextExpirationTime)
        {
            nextExpirationTime = currentTime + ttl;
            current = source.cacheWithInitialCapacity(1);
        }
        current.unsafeSubscribe(subscriber);
    }
}

Can someone have a look and tell me if I am doing it right ?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions