Skip to content

Commit

Permalink
* Fixed behavior buffer completing events
Browse files Browse the repository at this point in the history
  • Loading branch information
longbkiter07 committed Dec 5, 2016
1 parent a1b4be9 commit 466db83
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
2 changes: 1 addition & 1 deletion config.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ ext {
minSdkVersion : 15,
targetSdkVersion : 23,
versionCode : 25,
versionName : "0.3.2"
versionName : "0.3.3"
]

bintray = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;

/**
Expand All @@ -28,7 +29,7 @@ public class ObservableAdapterManager<D> {

private final PublishSubject<Behavior<D>> mProcessingSubject;

private final PublishSubject<Long> mFinishedSubject;
private final BehaviorSubject<Behavior<D>> mFinishedSubject;

@Nullable private RecyclerView.Adapter mAdapter;

Expand All @@ -38,7 +39,7 @@ public ObservableAdapterManager(@Nullable RecyclerView.Adapter adapter, List<D>
mItems = items;
mDataComparable = dataComparable;
mProcessingSubject = PublishSubject.create();
mFinishedSubject = PublishSubject.create();
mFinishedSubject = BehaviorSubject.create();
mAdapter = adapter;
init();
}
Expand All @@ -63,7 +64,7 @@ private void init() {
.onBackpressureBuffer()
.observeOn(Schedulers.computation())
.concatMap(dBehavior -> processBehaviors(dBehavior).doOnNext(aVoid1 -> {
mFinishedSubject.onNext(dBehavior.createdAt);
mFinishedSubject.onNext(dBehavior);
}))
.subscribe();
}
Expand Down Expand Up @@ -175,23 +176,25 @@ private boolean shouldCallUpdate(D oldItem, D newItem) {
}

private Observable<Void> processBehaviors(Behavior<D> behavior) {
if (behavior.mAction == Action.SET) {
if (mDataComparable != null && mItems.size() <= MAX_SIZE_TO_CALL_DIFF
&& behavior.mItems.size() <= MAX_SIZE_TO_CALL_DIFF) {
return processSetWithDiffCallback(behavior);
return Observable.defer(() -> {
if (behavior.mAction == Action.SET) {
if (mDataComparable != null && mItems.size() <= MAX_SIZE_TO_CALL_DIFF
&& behavior.mItems.size() <= MAX_SIZE_TO_CALL_DIFF) {
return processSetWithDiffCallback(behavior);
} else {
return processSetWithNotify(behavior);
}
} else {
return processSetWithNotify(behavior);
return processSingleOperator(behavior);
}
} else {
return processSingleOperator(behavior);
}
});
}

private Observable<Void> submitBehavior(Behavior<D> behavior) {
return Observable.defer(() -> mFinishedSubject
.doOnSubscribe(() -> mProcessingSubject.onNext(behavior))
.filter(aLong -> aLong >= behavior.createdAt).take(1)
.<Void>map(dBehavior -> null));
return mFinishedSubject.filter(dBehavior -> dBehavior == behavior)
.take(1)
.<Void>map(aLong -> null)
.doOnSubscribe(() -> mProcessingSubject.onNext(behavior));
}

public Observable<Void> add(D item) {
Expand Down Expand Up @@ -262,8 +265,6 @@ private static class Behavior<D> {

final int mDestPos;

final long createdAt;

public Behavior(D item, int pos, Action action) {
this(Arrays.asList(item), pos, action, pos);
}
Expand All @@ -285,7 +286,6 @@ public Behavior(List<? extends D> items, int pos, Action action, int destPos) {
mPos = pos;
mAction = action;
mDestPos = destPos;
createdAt = System.currentTimeMillis();
}

public Behavior(D item, Action action) {
Expand Down

0 comments on commit 466db83

Please sign in to comment.