Skip to content

Commit

Permalink
Add tests (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacSweers committed Jan 19, 2017
1 parent 66111c5 commit a1029b4
Showing 1 changed file with 113 additions and 13 deletions.
126 changes: 113 additions & 13 deletions src/test/java/io/reactivex/plugins/RxJavaPluginsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,6 @@

package io.reactivex.plugins;

import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*;

import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.*;
import org.reactivestreams.*;

import io.reactivex.*;
import io.reactivex.Observable;
import io.reactivex.Observer;
Expand All @@ -47,6 +34,17 @@
import io.reactivex.internal.subscriptions.ScalarSubscription;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.schedulers.Schedulers;
import org.junit.*;
import org.reactivestreams.*;

import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import static org.junit.Assert.*;

public class RxJavaPluginsTest {

Expand Down Expand Up @@ -1890,4 +1888,106 @@ public void accept(final Throwable throwable) throws Exception {
RxJavaPlugins.reset();
}
}

private static void verifyThread(Worker w, Predicate<Thread> threadPredicate) {
try {
try {
final AtomicReference<Thread> value = new AtomicReference<>();
final CountDownLatch cdl = new CountDownLatch(1);

w.schedule(new Runnable() {
@Override
public void run() {
value.set(Thread.currentThread());
cdl.countDown();
}
});

cdl.await();

Thread t = value.get();
assertNotNull(t);
assertTrue(threadPredicate.test(t));

} catch (Exception e) {
fail();
}
} catch (Exception e) {
fail();
} finally {
w.dispose();
}
}

@Test
public void testCreateComputationScheduler() {
ThreadFactory factory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread("Test");
}
};

Scheduler scheduler = RxJavaPlugins.createComputationScheduler(factory);
verifyThread(scheduler.createWorker(), new Predicate<Thread>() {
@Override
public boolean test(Thread thread) throws Exception {
return "Test".equals(thread.getName());
}
});
}

@Test
public void testCreateIoScheduler() {
ThreadFactory factory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread("Test");
}
};

Scheduler scheduler = RxJavaPlugins.createIoScheduler(factory);
verifyThread(scheduler.createWorker(), new Predicate<Thread>() {
@Override
public boolean test(Thread thread) throws Exception {
return "Test".equals(thread.getName());
}
});
}

@Test
public void testCreateNewThreadScheduler() {
ThreadFactory factory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread("Test");
}
};

Scheduler scheduler = RxJavaPlugins.createNewThreadScheduler(factory);
verifyThread(scheduler.createWorker(), new Predicate<Thread>() {
@Override
public boolean test(Thread thread) throws Exception {
return "Test".equals(thread.getName());
}
});
}

@Test
public void testCreateSingleScheduler() {
ThreadFactory factory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread("Test");
}
};

Scheduler scheduler = RxJavaPlugins.createSingleScheduler(factory);
verifyThread(scheduler.createWorker(), new Predicate<Thread>() {
@Override
public boolean test(Thread thread) throws Exception {
return "Test".equals(thread.getName());
}
});
}
}

0 comments on commit a1029b4

Please sign in to comment.