Skip to content

RxScala: Add retryWhen/repeatWhen methods #1555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 8, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,30 @@ class RxScalaDemo extends JUnitSuite {
}.subscribe(s => println(s), e => e.printStackTrace())
}

@Test def retryWhenExample(): Unit = {
Observable[String]({ subscriber =>
println("subscribing")
subscriber.onError(new RuntimeException("always fails"))
}).retryWhen(attempts => {
attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
println("delay retry by " + i + " second(s)")
Observable.timer(Duration(i, TimeUnit.SECONDS))
})
}).toBlocking.foreach(s => println(s))
}

@Test def repeatWhenExample(): Unit = {
Observable[String]({ subscriber =>
println("subscribing")
subscriber.onCompleted()
}).repeatWhen(attempts => {
attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
println("delay repeat by " + i + " second(s)")
Observable.timer(Duration(i, TimeUnit.SECONDS)).materialize
})
}, NewThreadScheduler()).toBlocking.foreach(s => println(s))
}

@Test def liftExample1(): Unit = {
// Add "No. " in front of each item
val o = List(1, 2, 3).toObservable.lift {
Expand Down
149 changes: 149 additions & 0 deletions language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -3175,6 +3175,94 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.retry(f))
}

/**
* Returns an Observable that emits the same values as the source observable with the exception of an
* {@code onError}. An {@code onError} notification from the source will result in the emission of a
* {@link Notification} to the Observable provided as an argument to the {@code notificationHandler}
* function. If the Observable returned {@code onCompletes} or {@code onErrors} then {@code retry} will call
* {@code onCompleted} or {@code onError} on the child subscription. Otherwise, this Observable will
* resubscribe to the source Observable.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retryWhen.f.png" alt="">
*
* Example:
*
* This retries 3 times, each time incrementing the number of seconds it waits.
*
* <pre> {@code
* Observable[String]({ subscriber =>
* println("subscribing")
* subscriber.onError(new RuntimeException("always fails"))
* }).retryWhen(attempts => {
* attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
* println("delay retry by " + i + " second(s)")
* Observable.timer(Duration(i, TimeUnit.SECONDS))
* })
* }).toBlocking.foreach(s => println(s))
* } </pre>
*
* Output is:
*
* <pre> {@code
* subscribing
* delay retry by 1 second(s)
* subscribing
* delay retry by 2 second(s)
* subscribing
* delay retry by 3 second(s)
* subscribing
* } </pre>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retryWhen} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
* </dl>
*
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the
* retry
* @return the source Observable modified with retry logic
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
* @since 0.20
*/
def retryWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any]): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on).asJavaObservable
}

toScalaObservable[T](asJavaObservable.retryWhen(f))
}

/**
* Returns an Observable that emits the same values as the source observable with the exception of an {@code onError}.
* An onError will emit a {@link Notification} to the observable provided as an argument to the notificationHandler
* func. If the observable returned {@code onCompletes} or {@code onErrors} then retry will call {@code onCompleted}
* or {@code onError} on the child subscription. Otherwise, this observable will resubscribe to the source observable, on a particular Scheduler.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retryWhen.f.png" alt="">
* <p>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the
* retry
* @param scheduler the Scheduler on which to subscribe to the source Observable
* @return the source Observable modified with retry logic
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
* @since 0.20
*/
def retryWhen(notificationHandler: Observable[Notification[Any]] => Observable[Notification[Any]], scheduler: Scheduler): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: rx.Notification[_ <: Any]]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on).map({ n => n.asJavaNotification }).asJavaObservable
}

toScalaObservable[T](asJavaObservable.retryWhen(f, scheduler))
}

/**
* Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely.
* <p>
Expand Down Expand Up @@ -3237,6 +3325,67 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.repeat(count, scheduler))
}

/**
* Returns an Observable that emits the same values as the source Observable with the exception of an
* {@code onCompleted}. An {@code onCompleted} notification from the source will result in the emission of
* a {@link Notification} to the Observable provided as an argument to the {@code notificationHandler}
* function. If the Observable returned {@code onCompletes} or {@code onErrors} then {@code repeatWhen} will
* call {@code onCompleted} or {@code onError} on the child subscription. Otherwise, this Observable will
* resubscribe to the source Observable, on a particular Scheduler.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the repeat.
* @param scheduler the Scheduler to emit the items on
* @return the source Observable modified with repeat logic
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
* @since 0.20
*/
def repeatWhen(notificationHandler: Observable[Notification[Any]] => Observable[Notification[Any]], scheduler: Scheduler): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: rx.Notification[_ <: Any]]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on).map({ n => n.asJavaNotification }).asJavaObservable
}

toScalaObservable[T](asJavaObservable.repeatWhen(f, scheduler))
}

/**
* Returns an Observable that emits the same values as the source Observable with the exception of an
* {@code onCompleted}. An {@code onCompleted} notification from the source will result in the emission of
* a {@link Notification} to the Observable provided as an argument to the {@code notificationHandler}
* function. If the Observable returned {@code onCompletes} or {@code onErrors} then {@code repeatWhen} will
* call {@code onCompleted} or {@code onError} on the child subscription. Otherwise, this Observable will
* resubscribe to the source observable.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeatWhen} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
* </dl>
*
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the repeat.
* @return the source Observable modified with repeat logic
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
* @since 0.20
*/
def repeatWhen(notificationHandler: Observable[Notification[Any]] => Observable[Notification[Any]]): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: rx.Notification[_ <: Any]]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on).map({ n => n.asJavaNotification }).asJavaObservable
}

toScalaObservable[T](asJavaObservable.repeatWhen(f))
}

/**
* Converts an Observable into a [[BlockingObservable]] (an Observable with blocking operators).
*
Expand Down