Skip to content

1.x: improve ExecutorScheduler worker unsubscription #3842

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 8, 2016

Conversation

davidmoten
Copy link
Collaborator

I noticed that when a worker built from Schedulers.from(Executors.newFixedThread(1)) is unsubscribed the use of a CompositeSubscription to track task subscriptions means that the tasks may be unsubscribed in any old order (CompositeSubscription holds its subscriptions in a HashSet). This means that if the worker is given task A and task B then the race can prevent A from running but allow B to run! I've included a unit test in this PR that demos it. Fails every time on my machine on the first loop.

This PR is really for discussion about the problem and possible fixes.

I have included a possible fix which is to track overall subscription using a volatile boolean and check that boolean before running any task. If this was considered the way to go some further simplification would take place in the operator (might not need to check individual task subscriptions).

I haven't checked other schedulers for this sort of problem yet.

@zsxwing
Copy link
Member

zsxwing commented Apr 7, 2016

Do you have a real case that the order does matter? I think this affects all CompositeSubscription usages.

@akarnokd
Copy link
Member

akarnokd commented Apr 7, 2016

I see this as an eager cancellation matter than a race. There is no need for the flag but just check tasks.isUnsubscribed() before and run some cleanup:

do {
    if (tasks.isUnsubscribed()) {
        queue.clear();
        return;
    }

    ScheduledAction sa = queue.poll();

    if (tasks.isUnsubscribed()) {
        queue.clear();
        return;
    }
    if (!sa.isUnsubscribed()) {
        sa.run();
    }
} while (wip.decrementAndGet() != 0);

@davidmoten
Copy link
Collaborator Author

@zsxwing I'm confusing the issue talking about CompositeSubscription, it's really about subscription management in ExecutorScheduler. I don't have a problem with CompositeSubscription in general.

@akarnokd thanks, that looks good, I'll amend the PR. It's interesting you don't classify it as a bug but I guess we haven't documented our expectations of Schedulers in this area. Do you think Scheduler.unsubscribe() should offer stronger guarantees, similar to the guarantee offered by ExecutorService.shutdownNow()?

Attempts to stop all actively executing tasks, halts the
processing of waiting tasks, and returns a list of the tasks
that were awaiting execution.

The relevant bit I suppose is halts the processing of waiting tasks.

@zsxwing
Copy link
Member

zsxwing commented Apr 7, 2016

@davidmoten Gotcha. EventLoopWorker has the same issue. You can try Schedulers.computation() or Schedulers.io() to reproduce it in your test.

@davidmoten
Copy link
Collaborator Author

@zsxwing computation() and io() both fail too, thanks.

@davidmoten davidmoten force-pushed the scheduler-unsub-order branch from 8c313d0 to 8781ef9 Compare April 8, 2016 04:14
@davidmoten
Copy link
Collaborator Author

@akarnokd Don't you think that second check of tasks.isUnsubscribed() in your snippet is overkill? Can I remove?

I'm also wondering if unsubscribe() can look like this:

    @Override
    public void unsubscribe() {
        queue.clear();
        tasks.unsubscribe();
    }

and then run() would be:

        @Override
        public void run() {
            do {
                if (tasks.isUnsubscribed()) {
                    return;
                }

                ScheduledAction sa = queue.poll();
                if (sa == null) {
                    return;
                }
                if (!sa.isUnsubscribed()) {
                    sa.run();
                }
            } while (wip.decrementAndGet() != 0);
        }

@akarnokd
Copy link
Member

akarnokd commented Apr 8, 2016

No need for stronger guarantees.

That unsubscribe you are suggesting delays the tasks.unsubscribe() and the queue could still be in use; swap the two lines. In addition, you'd still need a clear in the drain loop because it could be still scheduled with a non-empty queue.

@davidmoten davidmoten force-pushed the scheduler-unsub-order branch from 8781ef9 to ebf0e3f Compare April 8, 2016 07:09
@davidmoten davidmoten changed the title 1.x: fix ExecutorScheduler worker unsubscription race 1.x: improve ExecutorScheduler worker unsubscription Apr 8, 2016
@davidmoten
Copy link
Collaborator Author

Thanks @akarnokd . In terms of guarantees I was going to suggest that the javadoc of Scheduler at the class level be supplemented with a statement similar to that of ExecutorService.shutdownNow() once all schedulers had been enhanced like this PR.

@davidmoten
Copy link
Collaborator Author

Updated PR, squashed commits.

@akarnokd
Copy link
Member

akarnokd commented Apr 8, 2016

👍

1 similar comment
@zsxwing
Copy link
Member

zsxwing commented Apr 8, 2016

👍

@zsxwing zsxwing merged commit 31dc74a into ReactiveX:1.x Apr 8, 2016
@zsxwing
Copy link
Member

zsxwing commented Apr 8, 2016

@davidmoten mind thinking about how to fix EventLoopWorker? :)

@davidmoten
Copy link
Collaborator Author

@zsxwing sure, I'll have a look at EventLoopWorker

I think there is an outstanding race with this PR:

if (!sa.isUnsubscribed()) {
    sa.run();
}

might have to be:

if (!tasks.isUnsubscribed()) {
    sa.run();
}

I'll have a look a bit later today and I'll submit another PR if needed.

@zsxwing
Copy link
Member

zsxwing commented Apr 8, 2016

if (!sa.isUnsubscribed()) { is necessary. sa may be unsubscribed by the user.

@davidmoten
Copy link
Collaborator Author

Yep I just noticed that myself, ta.

@davidmoten
Copy link
Collaborator Author

@zsxwing I think it should look like this:

        @Override
        public void run() {
            do {
                if (tasks.isUnsubscribed()) {
                    queue.clear();
                    return;
                }
                ScheduledAction sa = queue.poll();
                if (sa == null) {
                    return;
                }
                if (!sa.isUnsubscribed()) {
                    if (!tasks.isUnsubscribed()) {
                        sa.run();
                    } else {
                        queue.clear();
                        return;
                    }
                }
            } while (wip.decrementAndGet() != 0);
        }

zsxwing added a commit that referenced this pull request Apr 20, 2016
There is an unsubscribe race condition similar to #3842 in `CachedThreadScheduler.EventLoopWorker` and `EventLoopsScheduler.EventLoopWorker`. Image the following execution order:

| Execution Order  | thread 1 | thread 2 |
| ------------- | ------------- | ------------- |
| 1 |  | submit task A |
| 2 |  | submit task B |
| 3 | unsubscribe Worker  |  |
| 4 | unsubscribe task A |   |
| 5 | | task A won't run as it's unsubscribed |
| 6 | | run task B |
| 7 | unsubscribe task B |  |

So task B will run but its previous task A will be skipped.

This PR adds a check before running an action and moves `workerUnderConcurrentUnsubscribeShouldNotAllowLaterTasksToRunDueToUnsubscriptionRace` to `AbstractSchedulerConcurrencyTests` to test all concurrent schedulers.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants