Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Add scheduler creation factories #5002

Merged
merged 12 commits into from
Jan 25, 2017

Conversation

ZacSweers
Copy link
Contributor

Resolves #4993

This is a pretty vanilla copy from RxJava 1's implementation. Note that I had to tune NewThread scheduler to not be a singleton to support this.

We had talked about borrowing from project reactor's APIs for different overloads, let me know if you think we should add more fine-grained controls through these.

Resolves ReactiveX#4993

This is a pretty vanilla copy from RxJava 1's implementation. Note that I had to tune NewThread scheduler to not be a singleton to support this.

We had talked about borrowing from project reactor's APIs for different overloads, let me know if you think we should add more fine-grained controls through these.
@ZacSweers
Copy link
Contributor Author

One other thing I'm worried about - is Schedulers the best place for this? RxJava 1 had them in its RxJavaHooks class. My concern is that having these alongside the normal io()/computation()/etc factories will make them look overly inviting for use and accidental abuse.

@@ -56,6 +54,9 @@
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);

EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);

NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason why this was moved to the end?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It requires WORKER_THREAD_FACTORY to be initialized now since it's a parameter

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

* system properties for configuring new thread creation. Cannot be null.
*/
public SingleScheduler(ThreadFactory threadFactory) {
this.threadFactory = ObjectHelper.requireNonNull(threadFactory, "threadFactory was null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either this is unnecessary or you forgot to do this with the other new constructors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, meant to remove it from all these constructors and just do it in the factory methods, accidentally missed this one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -180,6 +181,90 @@ public static Scheduler from(Executor executor) {
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()}.
* @return the created Scheduler instance
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add @since 2.0.5 - experimental as well for every new method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* @return the created Scheduler instance
*/
@Experimental
public static Scheduler newComputation() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we'd want to allow non-parameterized instances to be created.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you expand on that a bit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use case for creating yet another computation/io/newThread scheduler with the default settings? I thought you wanted to override the ThreadFactory.

*/
@Experimental
public static Scheduler newComputation(ThreadFactory threadFactory) {
return new ComputationScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory == null"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usual NPE message is threadFactory is null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, will update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akarnokd
Copy link
Member

One other thing I'm worried about - is Schedulers the best place for this? RxJava 1 had them in its RxJavaHooks class. My concern is that having these alongside the normal io()/computation()/etc factories will make them look overly inviting for use and accidental abuse.

Another problem is that people will try to override the scheduler-init with these method calls for RxJavaPlugins but referencing Schedulers will init the default schedulers regardless.

Indeed, RxJavaPlugins sounds like a better place for the methods, but keep only the ThreadFactory variants.

@ZacSweers
Copy link
Contributor Author

Heading to bed right now but will update the PR in the morning with requested changes. Also let me know if there's anywhere I should put tests, as I wasn't sure what the right place would be.

@ZacSweers
Copy link
Contributor Author

Indeed, RxJavaPlugins sounds like a better place for the methods, but keep only the ThreadFactory variants.

Sounds good to me, will do that in my updates tomorrow

@akarnokd
Copy link
Member

Yes, tests that verify the custom thread factory actually worked by checking a custom thread name for each case.

@akarnokd akarnokd added this to the 2.1 milestone Jan 18, 2017
@codecov-io
Copy link

Current coverage is 95.46% (diff: 79.48%)

Merging #5002 into 2.x will increase coverage by 0.02%

@@                2.x      #5002   diff @@
==========================================
  Files           592        592          
  Lines         37989      38009    +20   
  Methods           0          0          
  Messages          0          0          
  Branches       5772       5772          
==========================================
+ Hits          36257      36284    +27   
+ Misses          764        761     -3   
+ Partials        968        964     -4   

Powered by Codecov. Last update 9c34eb1...2ae2414

@ZacSweers
Copy link
Contributor Author

ZacSweers commented Jan 19, 2017

Indeed, RxJavaPlugins sounds like a better place for the methods

8b4d461

but keep only the ThreadFactory variants.

8009333

Also tweaked the naming a bit. newNewThread was a little weird (went with create), and added Scheduler suffix since we're not in the Schedulers class anymore.

Yes, tests that verify the custom thread factory actually worked by checking a custom thread name for each case.

I tried setting this up matching some of the cdl-based approaches in the plugin tests but wan't able to get it working (just hangs). Pushed what I had in a1029b4, any insight?

@akarnokd akarnokd changed the title Add scheduler creation factories 2.x: Add scheduler creation factories Jan 19, 2017
@ZacSweers
Copy link
Contributor Author

Heading on vacation for a couple weeks, but let me know what you think of getting the tests to work. I should have some time here and there to update the PR

@akarnokd
Copy link
Member

I've already told you: use the runnable in the factory method on the Thread constructor.

@ZacSweers
Copy link
Contributor Author

use the runnable in the factory method on the Thread constructor.

Aren't I just testing the test ThreadFactory implementations then, and not that the schedulers are hooked up properly to power the scheduler?


private static void verifyThread(Worker w, Predicate<Thread> threadPredicate) {
try {
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove double try

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private static void verifyThread(Worker w, Predicate<Thread> threadPredicate) {
try {
try {
final AtomicReference<Thread> value = new AtomicReference<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change to AtomicBoolean and just run the predicate inside the worker?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then you don't even need to supply the Thread to the Predicate actually. It can verify whatever it wants.

@JakeWharton
Copy link
Contributor

What's the problem with the tests? They seem to be verifying that the supplied factory was used.

@JakeWharton
Copy link
Contributor

Oh you didn't pass the supplied Runnable when calling new Thread() inside the factory.

@ZacSweers
Copy link
Contributor Author

Oh you didn't pass the supplied Runnable when calling new Thread() inside the factory.

Ah! This is what I was missing, thanks for pointing that out. I've updated tests (which should now all be passing), and also made them a bit more robust (full integration with schedulers) to simulate a more real world use case.

@ZacSweers
Copy link
Contributor Author

Failing test looks like a flake...

@ZacSweers
Copy link
Contributor Author

Wait no it's not. Repro'd locally, will fix

@ZacSweers
Copy link
Contributor Author

Fixed, I think. I added manual shutdowns of schedulers after they're done. Only IO had this lifecycle issue over tests, but I did the shutdown in all to be safe.

@ZacSweers
Copy link
Contributor Author

Ok this time it seems actually flaky, as the same commit passed in a different travis job against my fork - https://travis-ci.org/hzsweers/RxJava/builds/195074173

}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These need some rewording emphasizing that the there is always a new Scheduler instance, unlike with the non-new method call.

@@ -56,6 +54,9 @@
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);

EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority);

NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

* @return the created Scheduler instance
*/
@Experimental
public static Scheduler newComputation() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use case for creating yet another computation/io/newThread scheduler with the default settings? I thought you wanted to override the ThreadFactory.

ThreadFactory factory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread("Test");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this doesn't allow executing tasks because you ignore r.

return "Test".equals(thread.getName());
}
});
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to shut down the scheduler (in a finally block) to avoid leaking threads.

@@ -927,6 +929,58 @@ public static Completable onAssembly(Completable source) {
}

/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()}
* except using {@code threadFactory} for thread creation.
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add notes to all of these methods something like this:

Note that the returned Scheduler must be shut down manually if the ThreadFactory doesn't create a daemon thread, otherwise the JVM may not quit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "Note that this takes precedence..." may be misleading. The number of threads is still based on the system configuration. I'd remove this sentence entirely

@akarnokd akarnokd merged commit f53e029 into ReactiveX:2.x Jan 25, 2017
@ZacSweers ZacSweers deleted the schedulerFactories branch January 25, 2017 08:23
@ZacSweers
Copy link
Contributor Author

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants