Skip to content

Commit

Permalink
* Using safe Observable for processing events
Browse files Browse the repository at this point in the history
  • Loading branch information
longbkiter07 committed Dec 14, 2016
1 parent 466db83 commit 9d44bf7
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ public boolean onOptionsItemSelected(MenuItem item) {
mRegularRecyclerViewAdapter.setUserAt(new User("custom_name" + Math.random(), 100, User.Gender.male),
(int) (Math.random() * mRegularRecyclerViewAdapter.getItemCount() - 1));
} else {
mUserRecyclerViewAdapter.getObservableAdapterManager().update((int) (Math.random() * mUserRecyclerViewAdapter.getItemCount() - 1),
new User("custom_name" + Math.random(), 100, User.Gender.male)).subscribe();
mUserRecyclerViewAdapter.getObservableAdapterManager().update(
new User("custom_name" + Math.random(), 100, User.Gender.male),
(int) (Math.random() * mUserRecyclerViewAdapter.getItemCount() - 1)).subscribe();
}
break;
}
Expand Down
2 changes: 1 addition & 1 deletion config.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ ext {
minSdkVersion : 15,
targetSdkVersion : 23,
versionCode : 25,
versionName : "0.3.3"
versionName : "0.3.4"
]

bintray = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import android.support.annotation.Nullable;
import android.support.v7.widget.RecyclerView;
import android.util.Log;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -12,6 +13,7 @@
import rx.Observable;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Func0;
import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;
Expand All @@ -23,6 +25,8 @@ public class ObservableAdapterManager<D> {

private static final int MAX_SIZE_TO_CALL_DIFF = 512; //ms

private static final String TAG = ObservableAdapterManager.class.getSimpleName();

private final List<D> mItems;

private final DataComparable<D> mDataComparable;
Expand All @@ -48,6 +52,29 @@ public ObservableAdapterManager(List<D> items, DataComparable<D> dataComparable)
this(null, items, dataComparable);
}

private static <T> Observable<T> makeObservableSafe(Observable<T> observable) {
return createSafeObservable(() -> null)
.flatMap(o -> observable);
}

private static <T> Observable<T> createSafeObservable(Func0<T> func) {
return Observable.create(subscriber -> {
if (!subscriber.isUnsubscribed()) {
try {
T result = func.call();
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(result);
subscriber.onCompleted();
}
} catch (Exception e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}
}
});
}

private void unsubscribe() {
if (mProcessingSubscription != null && !mProcessingSubscription.isUnsubscribed()) {
mProcessingSubscription.unsubscribe();
Expand All @@ -63,18 +90,23 @@ private void init() {
mProcessingSubscription = mProcessingSubject
.onBackpressureBuffer()
.observeOn(Schedulers.computation())
.concatMap(dBehavior -> processBehaviors(dBehavior).doOnNext(aVoid1 -> {
mFinishedSubject.onNext(dBehavior);
}))
.subscribe();
.concatMap(dBehavior -> processBehaviors(dBehavior)
.doOnNext(aVoid1 -> {
mFinishedSubject.onNext(dBehavior);
}))
.subscribe(aVoid -> {
Log.w(TAG, "completed event");
}, throwable -> {
Log.w(TAG, throwable.getMessage(), throwable);
});
}

public void attachTo(RecyclerView.Adapter adapter) {
mAdapter = adapter;
}

private Observable<Void> processSetWithDiffCallback(Behavior<D> behavior) {
return ObservableDiffCallback.calculate(mDataComparable, new ArrayList<>(mItems), behavior.mItems)
return makeObservableSafe(ObservableDiffCallback.calculate(mDataComparable, new ArrayList<>(mItems), behavior.mItems)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(diffResult -> {
mItems.clear();
Expand All @@ -83,23 +115,23 @@ private Observable<Void> processSetWithDiffCallback(Behavior<D> behavior) {
diffResult.dispatchUpdatesTo(mAdapter);
}
})
.<Void>map(diffResult -> null);
.<Void>map(diffResult -> null));
}

private Observable<Void> processSetWithNotify(Behavior<D> behavior) {
return Observable.defer(() -> {
return makeObservableSafe(ObservableAdapterManager.<Void>createSafeObservable(() -> {
mItems.clear();
mItems.addAll(behavior.mItems);
if (mAdapter != null) {
mAdapter.notifyDataSetChanged();
}
return Observable.<Void>just(null);
return null;
})
.subscribeOn(AndroidSchedulers.mainThread());
.subscribeOn(AndroidSchedulers.mainThread()));
}

private Observable<Void> processSingleOperator(Behavior<D> behavior) {
return Observable.defer(() -> {
return ObservableAdapterManager.<Void>createSafeObservable(() -> {
switch (behavior.mAction) {
case ADD:
int size = behavior.mItems.size();
Expand Down Expand Up @@ -165,7 +197,7 @@ private Observable<Void> processSingleOperator(Behavior<D> behavior) {
}
break;
}
return Observable.<Void>just(null);
return null;
}).subscribeOn(AndroidSchedulers.mainThread());
}

Expand All @@ -176,7 +208,7 @@ private boolean shouldCallUpdate(D oldItem, D newItem) {
}

private Observable<Void> processBehaviors(Behavior<D> behavior) {
return Observable.defer(() -> {
return makeObservableSafe(Observable.defer(() -> {
if (behavior.mAction == Action.SET) {
if (mDataComparable != null && mItems.size() <= MAX_SIZE_TO_CALL_DIFF
&& behavior.mItems.size() <= MAX_SIZE_TO_CALL_DIFF) {
Expand All @@ -187,7 +219,7 @@ private Observable<Void> processBehaviors(Behavior<D> behavior) {
} else {
return processSingleOperator(behavior);
}
});
}));
}

private Observable<Void> submitBehavior(Behavior<D> behavior) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.android.plugins.RxAndroidPlugins;
import rx.android.plugins.RxAndroidSchedulersHook;
import rx.observers.TestSubscriber;
Expand Down Expand Up @@ -40,6 +44,47 @@ public Scheduler getMainThreadScheduler() {
});
}

@Test
public void testScheduler() throws Exception {
Scheduler scheduler = Schedulers.newThread();
Subscription subscription = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler)
.doOnNext(aLong -> System.out.println("onNext 1:" + aLong))
.map(aLong -> {
for (int i = 0; i < 100000000; i++) {
aLong += i;
aLong ^= i;
}
return aLong;
})
.doOnNext(aLong -> System.out.println("onNext 2:" + aLong + "-" + System.currentTimeMillis() / 1000))
.subscribeOn(scheduler)
.doOnTerminate(() -> System.out.println("terminated"))
.doOnUnsubscribe(() -> System.out.println("unsubscribed-" + System.currentTimeMillis() / 1000))
.doOnCompleted(() -> System.out.println("completed-" + System.currentTimeMillis() / 1000))
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Long aLong) {
if (!isUnsubscribed()) {
System.out.println("onNext event 1:" + aLong);
}
System.out.println("onNext event 2:" + aLong);
}
});
Thread.sleep(3200);
subscription.unsubscribe();
Thread.sleep(2000);
}

@Test(timeout = 5000)
public void testQueueingEvent() throws Exception {
ObservableAdapterManager<TestData> observableAdapterManager = new ObservableAdapterManager<TestData>(null, new ArrayList<>(),
Expand Down Expand Up @@ -92,6 +137,48 @@ public boolean areItemsTheSame(TestData oldData, TestData newData) {
assertThat(testSubscriber.getOnNextEvents().size(), equalTo(15));
}

@Test(timeout = 5000)
public void testClearData() throws Exception {
ObservableAdapterManager<TestData> observableAdapterManager = new ObservableAdapterManager<TestData>(null, new ArrayList<>(),
new DataComparable<TestData>() {
@Override
public boolean areContentsTheSame(TestData oldData, TestData newData) {
return oldData.name.equals(newData.name);
}

@Override
public boolean areItemsTheSame(TestData oldData, TestData newData) {
return oldData.id.equals(newData.id);
}
});
PublishSubject<List<TestData>> subject = PublishSubject.create();
TestSubscriber<Void> testSubscriber = new TestSubscriber<>();
subject
.flatMap(testData -> observableAdapterManager.setItems(testData))
.doOnNext(aVoid -> System.out.println("onNext event"))
.doOnUnsubscribe(() -> System.out.println("onUnsubscribed"))
.subscribe(testSubscriber);
subject.onNext(generateTestData(100));
subject.onNext(generateTestData(0));
subject.onNext(generateTestData(101));
subject.onNext(generateTestData(0));
subject.onNext(generateTestData(102));
subject.onNext(generateTestData(0));
subject.onNext(generateTestData(200));
subject.onNext(generateTestData(0));
subject.onNext(generateTestData(201));
subject.onNext(generateTestData(200));
subject.onNext(generateTestData(0));
subject.onNext(generateTestData(201));
subject.onNext(generateTestData(200));
subject.onNext(generateTestData(0));
subject.onNext(generateTestData(201));
Thread.sleep(15);
observableAdapterManager.clearEvents();
Thread.sleep(2000);
System.out.println("event count:" + testSubscriber.getValueCount());
}

private static class TestData {

String id;
Expand Down

0 comments on commit 9d44bf7

Please sign in to comment.