@@ -11,6 +11,7 @@ import com.alexdeww.reactiveviewmodel.core.property.State
1111import io.reactivex.rxjava3.core.*
1212import io.reactivex.rxjava3.functions.Consumer
1313import io.reactivex.rxjava3.functions.Function
14+ import java.util.concurrent.atomic.AtomicBoolean
1415
1516typealias OnLiveDataAction <T > = (data: T ) -> Unit
1617
@@ -70,10 +71,65 @@ fun Completable.bindProgress(progressConsumer: Consumer<Boolean>): Completable =
7071 .doOnSubscribe { progressConsumer.accept(true ) }
7172 .doFinally { progressConsumer.accept(false ) }
7273
73- fun <T : Any > Single<T>.bindProgressAny (progressConsumer : Consumer <Boolean >): Single <T > = this
74- .doOnSubscribe { progressConsumer.accept(true ) }
75- .doOnSuccess { progressConsumer.accept(false ) }
76- .doOnError { progressConsumer.accept(false ) }
74+ private open class FinallyConsumerWrapper <T : Any >(
75+ private val consumer : Consumer <T >
76+ ) {
77+ private val isFinally = AtomicBoolean (false )
78+
79+ fun reset (value : T ) {
80+ isFinally.set(false )
81+ consumer.accept(value)
82+ }
83+
84+ fun finally (value : T ) {
85+ if (isFinally.compareAndSet(false , true )) consumer.accept(value)
86+ }
87+ }
88+
89+ private class ProgressConsumerWrapper (
90+ consumer : Consumer <Boolean >
91+ ) : FinallyConsumerWrapper<Boolean>(consumer) {
92+ fun begin () = reset(true )
93+ fun end () = finally (false )
94+ }
95+
96+ fun <T : Any > Observable<T>.bindProgressAny (progressConsumer : Consumer <Boolean >): Observable <T > {
97+ val consumerWrapper = ProgressConsumerWrapper (progressConsumer)
98+ return this
99+ .doOnSubscribe { consumerWrapper.begin() }
100+ .doOnNext { consumerWrapper.end() }
101+ .doOnComplete { consumerWrapper.end() }
102+ .doOnError { consumerWrapper.end() }
103+ .doOnDispose { consumerWrapper.end() }
104+ }
105+
106+ fun <T : Any > Single<T>.bindProgressAny (progressConsumer : Consumer <Boolean >): Single <T > {
107+ val consumerWrapper = ProgressConsumerWrapper (progressConsumer)
108+ return this
109+ .doOnSubscribe { consumerWrapper.begin() }
110+ .doOnSuccess { consumerWrapper.end() }
111+ .doOnError { consumerWrapper.end() }
112+ .doOnDispose { consumerWrapper.end() }
113+ }
114+
115+ fun <T : Any > Maybe<T>.bindProgressAny (progressConsumer : Consumer <Boolean >): Maybe <T > {
116+ val consumerWrapper = ProgressConsumerWrapper (progressConsumer)
117+ return this
118+ .doOnSubscribe { consumerWrapper.begin() }
119+ .doOnSuccess { consumerWrapper.end() }
120+ .doOnComplete { consumerWrapper.end() }
121+ .doOnError { consumerWrapper.end() }
122+ .doOnDispose { consumerWrapper.end() }
123+ }
124+
125+ fun Completable.bindProgressAny (progressConsumer : Consumer <Boolean >): Completable {
126+ val consumerWrapper = ProgressConsumerWrapper (progressConsumer)
127+ return this
128+ .doOnSubscribe { consumerWrapper.begin() }
129+ .doOnComplete { consumerWrapper.end() }
130+ .doOnError { consumerWrapper.end() }
131+ .doOnDispose { consumerWrapper.end() }
132+ }
77133
78134/* *
79135 * Returns the [Observable] that emits items when active, and buffers them when [idle][isIdle].
@@ -86,7 +142,7 @@ fun <T : Any> Observable<T>.bufferWhileIdle(
86142 bufferSize : Int? = null
87143): Observable <T > {
88144 val itemsObservable = this
89- .withLatestFrom(isIdle, { t: T , idle: Boolean -> t to idle })
145+ .withLatestFrom(isIdle) { t: T , idle: Boolean -> t to idle }
90146 .publish()
91147 .refCount(2 )
92148
0 commit comments