Skip to content

Commit 237a728

Browse files
doOnRequest
Similar to doOnSubscribe, doOnNext, etc
1 parent b02e572 commit 237a728

File tree

2 files changed

+97
-0
lines changed

2 files changed

+97
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4159,6 +4159,22 @@ public final void onNext(T args) {
41594159

41604160
return lift(new OperatorDoOnEach<T>(observer));
41614161
}
4162+
4163+
/**
4164+
* Modifies the source {@code Observable} so that it invokes the given action when it receives a request for more items.
4165+
* <dl>
4166+
* <dt><b>Scheduler:</b></dt>
4167+
* <dd>{@code doOnRequest} does not operate by default on a particular {@link Scheduler}.</dd>
4168+
* </dl>
4169+
*
4170+
* @param onRequest
4171+
* the action that gets called when an observer requests items from this {@code Observable}
4172+
* @return the source {@code Observable} modified so as to call this Action when appropriate
4173+
*/
4174+
@Beta
4175+
public final Observable<T> doOnRequest(final Action1<Long> onRequest) {
4176+
return lift(new OperatorDoOnRequest<T>(onRequest));
4177+
}
41624178

41634179
/**
41644180
* Modifies the source {@code Observable} so that it invokes the given action when it is subscribed from
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable.Operator;
19+
import rx.Producer;
20+
import rx.Subscriber;
21+
import rx.functions.Action1;
22+
23+
/**
24+
* This operator modifies an {@link rx.Observable} so a given action is invoked when the {@link rx.Observable.Producer} receives a request.
25+
*
26+
* @param <T>
27+
* The type of the elements in the {@link rx.Observable} that this operator modifies
28+
*/
29+
public class OperatorDoOnRequest<T> implements Operator<T, T> {
30+
31+
private final Action1<Long> request;
32+
33+
public OperatorDoOnRequest(Action1<Long> request) {
34+
this.request = request;
35+
}
36+
37+
@Override
38+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
39+
40+
final ParentSubscriber<T> parent = new ParentSubscriber<T>(child);
41+
42+
child.setProducer(new Producer() {
43+
44+
@Override
45+
public void request(long n) {
46+
request.call(n);
47+
parent.requestMore(n);
48+
}
49+
50+
});
51+
52+
return parent;
53+
}
54+
55+
private final class ParentSubscriber<T> extends Subscriber<T> {
56+
private final Subscriber<? super T> child;
57+
58+
private ParentSubscriber(Subscriber<? super T> child) {
59+
this.child = child;
60+
}
61+
62+
private void requestMore(long n) {
63+
request(n);
64+
}
65+
66+
@Override
67+
public void onCompleted() {
68+
child.onCompleted();
69+
}
70+
71+
@Override
72+
public void onError(Throwable e) {
73+
child.onError(e);
74+
}
75+
76+
@Override
77+
public void onNext(T t) {
78+
child.onNext(t);
79+
}
80+
}
81+
}

0 commit comments

Comments
 (0)