Skip to content

Commit 48653fc

Browse files
committed
Merge pull request #76 from dpsm/feature/cursor
Adding support to create self closing Observables from Cursors
2 parents 0dfd189 + d700bfa commit 48653fc

File tree

3 files changed

+142
-11
lines changed

3 files changed

+142
-11
lines changed

rxandroid/src/main/java/rx/android/content/ContentObservable.java

+10
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import android.content.Intent;
77
import android.content.IntentFilter;
88
import android.content.SharedPreferences;
9+
import android.database.Cursor;
910
import android.os.Build;
1011
import android.os.Handler;
1112

@@ -138,6 +139,15 @@ public static Observable<String> fromSharedPreferencesChanges(SharedPreferences
138139
return Observable.create(new OnSubscribeSharedPreferenceChange(sharedPreferences));
139140
}
140141

142+
/**
143+
* Create Observable that emits the specified {@link android.database.Cursor} for each available position
144+
* of the cursor moving to the next position before each call and closing the cursor whether the
145+
* Observable completes or an error occurs.
146+
*/
147+
public static Observable<Cursor> fromCursor(final Cursor cursor) {
148+
return Observable.create(new OnSubscribeCursor(cursor));
149+
}
150+
141151
private ContentObservable() {
142152
}
143153
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package rx.android.content;
15+
16+
import android.database.Cursor;
17+
18+
import rx.Observable;
19+
import rx.Subscriber;
20+
21+
/**
22+
* Emits a {@link android.database.Cursor} for every available position.
23+
*/
24+
final class OnSubscribeCursor implements Observable.OnSubscribe<Cursor> {
25+
26+
private final Cursor cursor;
27+
28+
OnSubscribeCursor(final Cursor cursor) {
29+
this.cursor = cursor;
30+
}
31+
32+
@Override
33+
public void call(final Subscriber<? super Cursor> subscriber) {
34+
try {
35+
while (!subscriber.isUnsubscribed() && cursor.moveToNext()) {
36+
subscriber.onNext(cursor);
37+
}
38+
} finally {
39+
if (!cursor.isClosed()) {
40+
cursor.close();
41+
}
42+
}
43+
subscriber.onCompleted();
44+
}
45+
46+
}

rxandroid/src/test/java/rx/android/content/ContentObservableTest.java

+86-11
Original file line numberDiff line numberDiff line change
@@ -13,34 +13,43 @@
1313
*/
1414
package rx.android.content;
1515

16-
import static org.mockito.Mockito.verify;
16+
import android.app.Activity;
17+
import android.app.Fragment;
18+
import android.database.Cursor;
19+
import android.support.v4.app.FragmentActivity;
1720

1821
import org.junit.Before;
1922
import org.junit.Test;
2023
import org.junit.runner.RunWith;
24+
import org.mockito.Matchers;
2125
import org.mockito.Mock;
2226
import org.mockito.MockitoAnnotations;
2327
import org.robolectric.Robolectric;
2428
import org.robolectric.RobolectricTestRunner;
2529
import org.robolectric.annotation.Config;
2630

27-
import rx.Observable;
28-
import rx.Observer;
29-
import rx.android.content.ContentObservable;
30-
import rx.android.TestUtil;
31-
import rx.observers.TestObserver;
32-
33-
import android.app.Activity;
34-
import android.app.Fragment;
35-
import android.support.v4.app.FragmentActivity;
36-
3731
import java.util.concurrent.Callable;
3832
import java.util.concurrent.CountDownLatch;
3933
import java.util.concurrent.ExecutionException;
4034
import java.util.concurrent.Executors;
4135
import java.util.concurrent.Future;
4236
import java.util.concurrent.TimeUnit;
4337

38+
import rx.Observable;
39+
import rx.Observer;
40+
import rx.Subscriber;
41+
import rx.android.TestUtil;
42+
import rx.observers.TestObserver;
43+
import rx.observers.TestSubscriber;
44+
45+
import static org.mockito.Mockito.doThrow;
46+
import static org.mockito.Mockito.mock;
47+
import static org.mockito.Mockito.never;
48+
import static org.mockito.Mockito.spy;
49+
import static org.mockito.Mockito.times;
50+
import static org.mockito.Mockito.verify;
51+
import static org.mockito.Mockito.when;
52+
4453

4554
@RunWith(RobolectricTestRunner.class)
4655
@Config(manifest = Config.NONE)
@@ -157,4 +166,70 @@ public void bindActivityToSourceFromDifferentThread() throws InterruptedExceptio
157166
}
158167

159168

169+
public void givenCursorWhenFromCursorInvokedThenObservableCallsOnNextWhileHasNext() {
170+
final Subscriber<Cursor> subscriber = spy(new TestSubscriber<Cursor>());
171+
final Cursor cursor = mock(Cursor.class);
172+
173+
when(cursor.isAfterLast()).thenReturn(false, false, true);
174+
when(cursor.moveToNext()).thenReturn(true, true, false);
175+
when(cursor.getCount()).thenReturn(2);
176+
177+
Observable<Cursor> observable = ContentObservable.fromCursor(cursor);
178+
observable.subscribe(subscriber);
179+
180+
verify(subscriber, times(2)).onNext(cursor);
181+
verify(subscriber, never()).onError(Matchers.any(Throwable.class));
182+
verify(subscriber).onCompleted();
183+
}
184+
185+
@Test
186+
public void givenEmptyCursorWhenFromCursorInvokedThenObservableCompletesWithoutCallingOnNext() {
187+
final Subscriber<Cursor> subscriber = spy(new TestSubscriber<Cursor>());
188+
final Cursor cursor = mock(Cursor.class);
189+
190+
Observable<Cursor> observable = ContentObservable.fromCursor(cursor);
191+
observable.subscribe(subscriber);
192+
193+
verify(subscriber, never()).onNext(cursor);
194+
verify(subscriber, never()).onError(Matchers.any(Throwable.class));
195+
verify(subscriber).onCompleted();
196+
}
197+
198+
@Test
199+
public void givenCursorWhenFromCursorCalledThenEmitsAndClosesCursorAfterCompletion() {
200+
final Subscriber<Cursor> subscriber = spy(new TestSubscriber<Cursor>());
201+
final Cursor cursor = mock(Cursor.class);
202+
203+
when(cursor.isAfterLast()).thenReturn(false, true);
204+
when(cursor.moveToNext()).thenReturn(true, false);
205+
when(cursor.getCount()).thenReturn(1);
206+
207+
Observable<Cursor> observable = ContentObservable.fromCursor(cursor);
208+
observable.subscribe(subscriber);
209+
210+
verify(subscriber, never()).onError(Matchers.any(Throwable.class));
211+
verify(subscriber).onNext(cursor);
212+
verify(cursor).close();
213+
verify(subscriber).onCompleted();
214+
}
215+
216+
@Test
217+
public void givenCursorWhenFromCursorCalledThenEmitsAndClosesCursorAfterError() {
218+
final Subscriber<Cursor> subscriber = spy(new TestSubscriber<Cursor>());
219+
final Cursor cursor = mock(Cursor.class);
220+
final RuntimeException throwable = mock(RuntimeException.class);
221+
doThrow(throwable).when(subscriber).onNext(cursor);
222+
223+
when(cursor.isAfterLast()).thenReturn(false, true);
224+
when(cursor.moveToNext()).thenReturn(true, false);
225+
when(cursor.getCount()).thenReturn(1);
226+
227+
Observable<Cursor> observable = ContentObservable.fromCursor(cursor);
228+
observable.subscribe(subscriber);
229+
230+
verify(subscriber, never()).onCompleted();
231+
verify(subscriber).onNext(cursor);
232+
verify(subscriber).onError(throwable);
233+
verify(cursor).close();
234+
}
160235
}

0 commit comments

Comments
 (0)