Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ jobs:
with:
java-version: 1.11
- name: Build
run: ./gradlew build
run: ./gradlew ferro-rx:build
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
#### ** THIS REPO IS DEPRECATED AND IS NO LONGER BEING ACTIVELY MAINTAINED **

![GitHub branch checks state](https://img.shields.io/github/checks-status/germesdev/ferro/master)
[![](https://jitpack.io/v/germesdev/ferro.svg)](https://jitpack.io/#germesdev/ferro)

Ferro evolved into modules `core-ui`, `core-mvp`, `mvp-dialog`, `mvp-widjet` [Surf Android Standard](https://github.com/surfstudio/SurfAndroidStandard/) repository (Documentation in Russian). It contains modules, which is used for developing Android projects by mobile studio [Surf](http://surfstudio.ru/).
Ferro evolved into modules `core-ui`, `core-mvp`, `mvp-dialog`
, `mvp-widjet` [Surf Android Standard](https://github.com/surfstudio/SurfAndroidStandard/)
repository (Documentation in Russian). It contains modules, which is used for developing Android
projects by mobile studio [Surf](http://surfstudio.ru/).

# Ferro

Expand Down
2 changes: 1 addition & 1 deletion ferro-rx/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8

dependencies {
implementation 'io.reactivex:rxjava:1.1.6'
implementation 'io.reactivex.rxjava2:rxjava:2.0.6'
}

/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package com.agna.ferro.rx;

import io.reactivex.CompletableObserver;
import io.reactivex.CompletableOperator;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.ArrayCompositeDisposable;
import io.reactivex.internal.disposables.DisposableHelper;

/**
* This operator freezes Completable events (onError, onComplete) when freeze selector emits true,
* and unfreeze it after freeze selector emits false.
* If freeze selector does not emit any elements, all events would be frozen
* <p>
* Completable after this operator can emit event in different threads
* You should pass this operator in method {@link io.reactivex.Completable#lift(CompletableOperator)}
* for apply it
*/
public class CompletableOperatorFreeze implements CompletableOperator {

private final Observable<Boolean> freezeSelector;

public CompletableOperatorFreeze(Observable<Boolean> freezeSelector) {
this.freezeSelector = freezeSelector;
}

@Override
public CompletableObserver apply(CompletableObserver child) throws Exception {
return new FreezeObserver(child, freezeSelector);
}

private static final class FreezeObserver implements CompletableObserver {

private final CompletableObserver child;
private final Observable<Boolean> freezeSelector;

private final ArrayCompositeDisposable compositeDisposable = new ArrayCompositeDisposable(2);
private Disposable s;

private boolean frozen = true;
private boolean done = false;
private Throwable error = null;

private FreezeObserver(CompletableObserver child, Observable<Boolean> freezeSelector) {
this.child = child;
this.freezeSelector = freezeSelector;
}

@Override
public void onError(Throwable e) {
if (isFinished()) {
return;
}
synchronized (this) {
error = e;
if (!frozen) {
forceOnError(e);
}
}
}

@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
freezeSelector.subscribe(new Observer<Boolean>() {
@Override
public void onComplete() {
forceOnComplete();
}

@Override
public void onError(Throwable e) {
forceOnError(e);
}

@Override
public void onSubscribe(Disposable s) {
compositeDisposable.setResource(1, s);
}

@Override
public void onNext(Boolean freeze) {
setFrozen(freeze);
}
});

compositeDisposable.setResource(0, s);
child.onSubscribe(compositeDisposable);
}
}

@Override
public void onComplete() {
if (isFinished()) {
return;
}
synchronized (this) {
done = true;
if (!frozen) {
forceOnComplete();
}
}
}

private void forceOnError(Throwable e) {
compositeDisposable.dispose();
child.onError(e);
}

private void forceOnComplete() {
compositeDisposable.dispose();
child.onComplete();
}

private synchronized void setFrozen(boolean frozen) {
this.frozen = frozen;
if (!frozen) {
if (error != null) {
forceOnError(error);
} else if (done) {
forceOnComplete();
}
}
}

private boolean isFinished() {
return done || error != null;
}
}
}
215 changes: 215 additions & 0 deletions ferro-rx/src/main/java/com/agna/ferro/rx/FlowableOperatorFreeze.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package com.agna.ferro.rx;

import io.reactivex.FlowableOperator;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BiFunction;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.subscribers.SerializedSubscriber;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;

/**
* This operator freezes Flowable events (onNext, onError, onComplete) when freeze selector emits true,
* and unfreeze it after freeze selector emits false.
* If freeze selector does not emit any elements, all events would be frozen
* If you want reduce num of elements in freeze buffer, you can define replaceFrozenEventPredicate.
* When Observable frozen and source observable emits normal (onNext) event, before it is added to
* the end of buffer, it compare with all already buffered events using replaceFrozenEventPredicate,
* and if replaceFrozenEventPredicate return true, buffered element would be removed.
* <p>
* Flowable after this operator can emit event in different threads
* You should pass this operator in method {@link io.reactivex.Flowable#lift(FlowableOperator)}
* for apply it
*/
public class FlowableOperatorFreeze<T> implements FlowableOperator<T, T> {

private final Observable<Boolean> freezeSelector;
private final BiFunction<T, T, Boolean> replaceFrozenEventPredicate;

public FlowableOperatorFreeze(Observable<Boolean> freezeSelector,
BiFunction<T, T, Boolean> replaceFrozenEventPredicate) {
this.freezeSelector = freezeSelector;
this.replaceFrozenEventPredicate = replaceFrozenEventPredicate;
}

public FlowableOperatorFreeze(Observable<Boolean> freezeSelector) {
this(freezeSelector, new BiFunction<T, T, Boolean>() {
@Override
public Boolean apply(T frozenEvent, T newEvent) {
return false;
}
});
}

@Override
public Subscriber<? super T> apply(Subscriber<? super T> child) throws Exception {
return new FreezeObserver<>(
new SerializedSubscriber<>(child),
replaceFrozenEventPredicate,
freezeSelector);
}

private static final class FreezeObserver<T> implements Subscriber<T>, Subscription {

private final Subscriber<T> child;
private final BiFunction<T, T, Boolean> replaceFrozenEventPredicate;
private final Observable<Boolean> freezeSelector;
private final List<T> frozenEventsBuffer = new LinkedList<>();

private Disposable resourceDisposable = DisposableHelper.DISPOSED;
private Subscription s;

private boolean frozen = true;
private boolean done = false;
private Throwable error = null;

private FreezeObserver(Subscriber<T> child,
BiFunction<T, T, Boolean> replaceFrozenEventPredicate,
Observable<Boolean> freezeSelector) {
this.child = child;
this.replaceFrozenEventPredicate = replaceFrozenEventPredicate;
this.freezeSelector = freezeSelector;
}

@Override
public void onComplete() {
if (isFinished()) {
return;
}
synchronized (this) {
done = true;
if (!frozen) {
forceOnComplete();
}
}
}

@Override
public void onError(Throwable e) {
if (isFinished()) {
return;
}
synchronized (this) {
error = e;
if (!frozen) {
forceOnError(e);
}
}
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;
freezeSelector.subscribe(new Observer<Boolean>() {
@Override
public void onComplete() {
forceOnComplete();
}

@Override
public void onError(Throwable e) {
forceOnError(e);
}

@Override
public void onSubscribe(Disposable s) {
resourceDisposable = s;
}

@Override
public void onNext(Boolean freeze) {
setFrozen(freeze);
}
});

child.onSubscribe(this);
}
}

@Override
public void onNext(T event) {
if (isFinished()) {
return;
}
synchronized (this) {
if (frozen) {
bufferEvent(event);
} else {
child.onNext(event);
}
}
}

@Override
public void request(long n) {
s.request(n);
}

@Override
public void cancel() {
s.cancel();
resourceDisposable.dispose();
}

private void bufferEvent(T event) {
for (ListIterator<T> it = frozenEventsBuffer.listIterator(); it.hasNext(); ) {
T frozenEvent = it.next();
try {
if (replaceFrozenEventPredicate.apply(frozenEvent, event)) {
it.remove();
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
forceOnError(ex);
return;
}
}
frozenEventsBuffer.add(event);
}

private void forceOnComplete() {
resourceDisposable.dispose();
s.cancel();
child.onComplete();
}

private void forceOnError(Throwable e) {
resourceDisposable.dispose();
s.cancel();
child.onError(e);
}

private synchronized void setFrozen(boolean frozen) {
this.frozen = frozen;
if (!frozen) {
emitFrozenEvents();
if (error != null) {
forceOnError(error);
}
if (done) {
forceOnComplete();
}
}
}

private void emitFrozenEvents() {
for (T event : frozenEventsBuffer) {
child.onNext(event);
}
frozenEventsBuffer.clear();
}

private boolean isFinished() {
return done || error != null;
}
}
}
Loading