Skip to content

1.x: add groupBy overload with evictingMapFactory #3931

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ apply plugin: 'nebula.rxjava-project'
dependencies {
testCompile 'junit:junit:4.12'
testCompile 'org.mockito:mockito-core:1.10.19'
testCompile 'com.google.guava:guava:19.0'

perfCompile 'org.openjdk.jmh:jmh-core:1.11.3'
perfCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.11.3'
Expand Down
59 changes: 59 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6356,6 +6356,65 @@ public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? su
return lift(new OperatorGroupBy<T, K, R>(keySelector, elementSelector));
}

/**
* Groups the items emitted by an {@code Observable} according to a specified criterion, and emits these
* grouped items as {@link GroupedObservable}s. The emitted {@code GroupedObservable} allows only a single
* {@link Subscriber} during its lifetime and if this {@code Subscriber} unsubscribes before the
* source terminates, the next emission by the source having the same key will trigger a new
* {@code GroupedObservable} emission.
* <p>
* <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/groupBy.png" alt="">
* <p>
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param keySelector
* a function that extracts the key for each item
* @param elementSelector
* a function that extracts the return element for each item
* @param evictingMapFactory
* a function that given an eviction action returns a {@link Map} instance that will be used to assign
* items to the appropriate {@code GroupedObservable}s. The {@code Map} instance must be thread-safe
* and any eviction must trigger a call to the supplied action (synchronously or asynchronously).
* This can be used to limit the size of the map by evicting keys by maximum size or access time for
* instance. Here's an example using Guava's {@code CacheBuilder} from v19.0:
* <pre>
* {@code
* Func1<Action1<K>, Map<K, Object>> mapFactory
* = action -> CacheBuilder.newBuilder()
* .maximumSize(1000)
* .expireAfterAccess(12, TimeUnit.HOURS)
* .removalListener(notification -> action.call(notification.getKey()))
* .<K, Object> build().asMap();
* }
* </pre>
*
* @param <K>
* the key type
* @param <R>
* the element type
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a
* unique key value and each of which emits those items from the source Observable that share that
* key value
* @throws NullPointerException
* if {@code evictingMapFactory} is null
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
*/
@Experimental
public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector,
final Func1<? super T, ? extends R> elementSelector, final Func1<Action1<K>, Map<K, Object>> evictingMapFactory) {
if (evictingMapFactory == null) {
throw new NullPointerException("evictingMapFactory cannot be null");
}
return lift(new OperatorGroupBy<T, K, R>(keySelector, elementSelector, evictingMapFactory));
}

/**
* Groups the items emitted by an {@code Observable} according to a specified criterion, and emits these
* grouped items as {@link GroupedObservable}s. The emitted {@code GroupedObservable} allows only a single
Expand Down
79 changes: 70 additions & 9 deletions src/main/java/rx/internal/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

import rx.*;
import rx.Observable.*;
import rx.exceptions.Exceptions;
import rx.functions.*;
import rx.internal.producers.ProducerArbiter;
import rx.internal.util.*;
import rx.observables.GroupedObservable;
import rx.plugins.RxJavaHooks;
import rx.observers.Subscribers;
import rx.subscriptions.Subscriptions;

/**
Expand All @@ -46,35 +48,50 @@ public final class OperatorGroupBy<T, K, V> implements Operator<GroupedObservabl
final Func1<? super T, ? extends V> valueSelector;
final int bufferSize;
final boolean delayError;
final Func1<Action1<K>, Map<K, Object>> mapFactory; //nullable

@SuppressWarnings({ "unchecked", "rawtypes" })
public OperatorGroupBy(Func1<? super T, ? extends K> keySelector) {
this(keySelector, (Func1)UtilityFunctions.<T>identity(), RxRingBuffer.SIZE, false);
this(keySelector, (Func1)UtilityFunctions.<T>identity(), RxRingBuffer.SIZE, false, null);
}

public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector) {
this(keySelector, valueSelector, RxRingBuffer.SIZE, false);
this(keySelector, valueSelector, RxRingBuffer.SIZE, false, null);
}

public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, Func1<Action1<K>, Map<K, Object>> mapFactory) {
this(keySelector, valueSelector, RxRingBuffer.SIZE, false, mapFactory);
}

public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError) {
public OperatorGroupBy(Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError, Func1<Action1<K>, Map<K, Object>> mapFactory) {
this.keySelector = keySelector;
this.valueSelector = valueSelector;
this.bufferSize = bufferSize;
this.delayError = delayError;
this.mapFactory = mapFactory;
}

@Override
public Subscriber<? super T> call(Subscriber<? super GroupedObservable<K, V>> t) {
final GroupBySubscriber<T, K, V> parent = new GroupBySubscriber<T, K, V>(t, keySelector, valueSelector, bufferSize, delayError);
public Subscriber<? super T> call(Subscriber<? super GroupedObservable<K, V>> child) {
final GroupBySubscriber<T, K, V> parent;
try {
parent = new GroupBySubscriber<T, K, V>(child, keySelector, valueSelector, bufferSize, delayError, mapFactory);
} catch (Throwable ex) {
//Can reach here because mapFactory.call() may throw in constructor of GroupBySubscriber
Exceptions.throwOrReport(ex, child);
Subscriber<? super T> parent2 = Subscribers.empty();
parent2.unsubscribe();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add Subscribers.unsubscribed()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, separate PR I'd suggest.

return parent2;
}

t.add(Subscriptions.create(new Action0() {
child.add(Subscriptions.create(new Action0() {
@Override
public void call() {
parent.cancel();
}
}));

t.setProducer(parent.producer);
child.setProducer(parent.producer);

return parent;
}
Expand All @@ -101,6 +118,7 @@ public static final class GroupBySubscriber<T, K, V>
final Map<Object, GroupedUnicast<K, V>> groups;
final Queue<GroupedObservable<K, V>> queue;
final GroupByProducer producer;
final Queue<K> evictedKeys;

static final Object NULL_KEY = new Object();

Expand All @@ -117,13 +135,14 @@ public static final class GroupBySubscriber<T, K, V>

final AtomicInteger wip;

public GroupBySubscriber(Subscriber<? super GroupedObservable<K, V>> actual, Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError) {
public GroupBySubscriber(Subscriber<? super GroupedObservable<K, V>> actual, Func1<? super T, ? extends K> keySelector,
Func1<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError,
Func1<Action1<K>, Map<K, Object>> mapFactory) {
this.actual = actual;
this.keySelector = keySelector;
this.valueSelector = valueSelector;
this.bufferSize = bufferSize;
this.delayError = delayError;
this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
this.queue = new ConcurrentLinkedQueue<GroupedObservable<K, V>>();
this.s = new ProducerArbiter();
this.s.request(bufferSize);
Expand All @@ -132,6 +151,32 @@ public GroupBySubscriber(Subscriber<? super GroupedObservable<K, V>> actual, Fun
this.requested = new AtomicLong();
this.groupCount = new AtomicInteger(1);
this.wip = new AtomicInteger();
if (mapFactory == null) {
this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
this.evictedKeys = null;
} else {
this.evictedKeys = new ConcurrentLinkedQueue<K>();
this.groups = createMap(mapFactory, new EvictionAction<K>(evictedKeys));
}
}

static class EvictionAction<K> implements Action1<K> {

final Queue<K> evictedKeys;

EvictionAction(Queue<K> evictedKeys) {
this.evictedKeys = evictedKeys;
}

@Override
public void call(K key) {
evictedKeys.offer(key);
}
}

@SuppressWarnings("unchecked")
private Map<Object, GroupedUnicast<K, V>> createMap(Func1<Action1<K>, Map<K, Object>> mapFactory, Action1<K> evictionAction) {
return (Map<Object, GroupedUnicast<K,V>>)(Map<Object, ?>) mapFactory.call(evictionAction);
}

@Override
Expand Down Expand Up @@ -187,6 +232,16 @@ public void onNext(T t) {
}

group.onNext(v);

if (evictedKeys != null) {
K evictedKey;
while ((evictedKey = evictedKeys.poll()) != null) {
GroupedUnicast<K, V> g = groups.get(evictedKey);
if (g != null) {
g.onComplete();
}
}
}

if (notNew) {
s.request(1);
Expand Down Expand Up @@ -215,6 +270,9 @@ public void onCompleted() {
e.onComplete();
}
groups.clear();
if (evictedKeys != null) {
evictedKeys.clear();
}

done = true;
groupCount.decrementAndGet();
Expand Down Expand Up @@ -308,6 +366,9 @@ void errorAll(Subscriber<? super GroupedObservable<K, V>> a, Queue<?> q, Throwab
q.clear();
List<GroupedUnicast<K, V>> list = new ArrayList<GroupedUnicast<K, V>>(groups.values());
groups.clear();
if (evictedKeys != null) {
evictedKeys.clear();
}

for (GroupedUnicast<K, V> e : list) {
e.onError(ex);
Expand Down
Loading