Skip to content

Reactivate core tests and combineLatest #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ subprojects {
}
}

project(':rxjava-core') {
sourceSets.test.java.srcDir 'src/test/java'
}

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func0;
import rx.util.functions.Func2;

/**
Expand Down Expand Up @@ -187,14 +186,12 @@ public void testPeriodicScheduling() throws Exception {
final CountDownLatch latch = new CountDownLatch(4);

final Action0 innerAction = mock(Action0.class);
final Action0 unsubscribe = mock(Action0.class);
final Func0<Subscription> action = new Func0<Subscription>() {
final Action0 action = new Action0() {
@Override
public Subscription call() {
public void call() {
try {
innerAction.call();
assertTrue(SwingUtilities.isEventDispatchThread());
return Subscriptions.create(unsubscribe);
} finally {
latch.countDown();
}
Expand All @@ -210,7 +207,6 @@ public Subscription call() {
sub.unsubscribe();
waitForEmptyEventQueue();
verify(innerAction, times(4)).call();
verify(unsubscribe, times(4)).call();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import javax.swing.AbstractButton;

import rx.Observable;
import static rx.Observable.filter;
import rx.swing.sources.AbstractButtonSource;
import rx.swing.sources.ComponentEventSource;
import rx.swing.sources.KeyEventSource;
Expand Down Expand Up @@ -68,7 +67,7 @@ public static Observable<KeyEvent> fromKeyEvents(Component component) {
* @return Observable of key events.
*/
public static Observable<KeyEvent> fromKeyEvents(Component component, final Set<Integer> keyCodes) {
return filter(fromKeyEvents(component), new Func1<KeyEvent, Boolean>() {
return fromKeyEvents(component).filter(new Func1<KeyEvent, Boolean>() {
@Override
public Boolean call(KeyEvent event) {
return keyCodes.contains(event.getKeyCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void call() {
* @see SwingObservable.fromKeyEvents(Component, Set)
*/
public static Observable<Set<Integer>> currentlyPressedKeysOf(Component component) {
return Observable.<KeyEvent, Set<Integer>>scan(fromKeyEventsOf(component), new HashSet<Integer>(), new Func2<Set<Integer>, KeyEvent, Set<Integer>>() {
return fromKeyEventsOf(component).<Set<Integer>>scan(new HashSet<Integer>(), new Func2<Set<Integer>, KeyEvent, Set<Integer>>() {
@Override
public Set<Integer> call(Set<Integer> pressedKeys, KeyEvent event) {
Set<Integer> afterEvent = new HashSet<Integer>(pressedKeys);
Expand Down
38 changes: 33 additions & 5 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
*/
package rx;

import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -27,14 +23,14 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;


import rx.concurrency.Schedulers;
import rx.observables.BlockingObservable;
import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
import rx.operators.OperationAll;
import rx.operators.OperationBuffer;
import rx.operators.OperationCache;
import rx.operators.OperationCombineLatest;
import rx.operators.OperationConcat;
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
Expand Down Expand Up @@ -1085,6 +1081,38 @@ public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observabl
return create(OperationZip.zip(w0, w1, w2, w3, reduceFunction));
}

/**
* Combines the given observables, emitting an event containing an aggregation of the latest values of each of the source observables
* each time an event is received from one of the source observables, where the aggregation is defined by the given function.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/combineLatest.png">
*
* @param w0
* The first source observable.
* @param w1
* The second source observable.
* @param combineFunction
* The aggregation function used to combine the source observable values.
* @return An Observable that combines the source Observables with the given combine function
*/
public static <R, T0, T1> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, combineFunction));
}

/**
* @see #combineLatest(Observable, Observable, Func2)
*/
public static <R, T0, T1, T2> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Func3<T0, T1, T2, R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, w2, combineFunction));
}

/**
* @see #combineLatest(Observable, Observable, Func2)
*/
public static <R, T0, T1, T2, T3> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<T0, T1, T2, T3, R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, w2, w3, combineFunction));
}

/**
* Creates an Observable which produces buffers of collected values.
*
Expand Down