Skip to content

1.x: new fromAsync to bridge the callback world with the reactive #4179

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions src/main/java/rx/AsyncEmitter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx;

import rx.annotations.Experimental;

/**
* Abstraction over a RxJava Subscriber that allows associating
* a resource with it and exposes the current number of downstream
* requested amount.
* <p>
* The onNext, onError and onCompleted methods should be called
* in a sequential manner, just like the Observer's methods. The
* other methods are threadsafe.
*
* @param <T> the value type to emit
*/
@Experimental
public interface AsyncEmitter<T> extends Observer<T> {

/**
* Sets a Subscription on this emitter; any previous Subscription
* or Cancellation will be unsubscribed/cancelled.
* @param s the subscription, null is allowed
*/
void setSubscription(Subscription s);

/**
* Sets a Cancellable on this emitter; any previous Subscription
* or Cancellation will be unsubscribed/cancelled.
* @param c the cancellable resource, null is allowed
*/
void setCancellation(Cancellable c);
/**
* The current outstanding request amount.
* <p>This method it threadsafe.
* @return the current outstanding request amount
*/
long requested();

/**
* A functional interface that has a single close method
* that can throw.
*/
interface Cancellable {

/**
* Cancel the action or free a resource.
* @throws Exception on error
*/
void cancel() throws Exception;
}

/**
* Options to handle backpressure in the emitter.
*/
enum BackpressureMode {
NONE,

ERROR,

BUFFER,

DROP,

LATEST
}
}
44 changes: 44 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1680,6 +1680,50 @@ public static <T> Observable<T> from(T[] array) {
return create(new OnSubscribeFromArray<T>(array));
}

/**
* Provides an API (via a cold Observable) that bridges the reactive world with the callback-style,
* generally non-backpressured world.
* <p>
* Example:
* <pre><code>
* Observable.&lt;Event&gt;fromAsync(emitter -&gt; {
* Callback listener = new Callback() {
* &#64;Override
* public void onEvent(Event e) {
* emitter.onNext(e);
* if (e.isLast()) {
* emitter.onCompleted();
* }
* }
*
* &#64;Override
* public void onFailure(Exception e) {
* emitter.onError(e);
* }
* };
*
* AutoCloseable c = api.someMethod(listener);
*
* emitter.setCancellable(c::close);
*
* }, BackpressureMode.BUFFER);
* </code></pre>
* <p>
* You should call the AsyncEmitter's onNext, onError and onCompleted methods in a serialized fashion. The
* rest of its methods are threadsafe.
*
* @param asyncEmitter the emitter that is called when a Subscriber subscribes to the returned {@code Observable}
* @param backpressure the backpressure mode to apply if the downstream Subscriber doesn't request (fast) enough
* @return the new Observable instance
* @see AsyncEmitter
* @see AsyncEmitter.BackpressureMode
* @see AsyncEmitter.Cancellable
*/
@Experimental
public static <T> Observable<T> fromAsync(Action1<AsyncEmitter<T>> asyncEmitter, AsyncEmitter.BackpressureMode backpressure) {
return create(new OnSubscribeFromAsync<T>(asyncEmitter, backpressure));
}

/**
* Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then
* emits the value returned from that function.
Expand Down
17 changes: 15 additions & 2 deletions src/main/java/rx/internal/operators/BackpressureUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ public static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T o
}

/**
* Adds {@code n} to {@code requested} and returns the value prior to addition once the
* Adds {@code n} (not validated) to {@code requested} and returns the value prior to addition once the
* addition is successful (uses CAS semantics). If overflows then sets
* {@code requested} field to {@code Long.MAX_VALUE}.
*
* @param requested
* atomic long that should be updated
* @param n
* the number of requests to add to the requested count
* the number of requests to add to the requested count, positive (not validated)
* @return requested value just prior to successful addition
*/
public static long getAndAddRequest(AtomicLong requested, long n) {
Expand Down Expand Up @@ -413,4 +413,17 @@ public static long produced(AtomicLong requested, long n) {
}
}
}

/**
* Validates the requested amount and returns true if it is positive.
* @param n the requested amount
* @return true if n is positive
* @throws IllegalArgumentException if n is negative
*/
public static boolean validate(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
return n != 0L;
}
}
Loading