-
Notifications
You must be signed in to change notification settings - Fork 7.6k
1.x: improve ExecutorScheduler worker unsubscription #3842
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Do you have a real case that the order does matter? I think this affects all CompositeSubscription usages. |
I see this as an eager cancellation matter than a race. There is no need for the flag but just check tasks.isUnsubscribed() before and run some cleanup: do {
if (tasks.isUnsubscribed()) {
queue.clear();
return;
}
ScheduledAction sa = queue.poll();
if (tasks.isUnsubscribed()) {
queue.clear();
return;
}
if (!sa.isUnsubscribed()) {
sa.run();
}
} while (wip.decrementAndGet() != 0); |
@zsxwing I'm confusing the issue talking about @akarnokd thanks, that looks good, I'll amend the PR. It's interesting you don't classify it as a bug but I guess we haven't documented our expectations of
The relevant bit I suppose is halts the processing of waiting tasks. |
@davidmoten Gotcha. |
@zsxwing |
8c313d0
to
8781ef9
Compare
@akarnokd Don't you think that second check of I'm also wondering if @Override
public void unsubscribe() {
queue.clear();
tasks.unsubscribe();
} and then @Override
public void run() {
do {
if (tasks.isUnsubscribed()) {
return;
}
ScheduledAction sa = queue.poll();
if (sa == null) {
return;
}
if (!sa.isUnsubscribed()) {
sa.run();
}
} while (wip.decrementAndGet() != 0);
} |
No need for stronger guarantees. That |
8781ef9
to
ebf0e3f
Compare
Thanks @akarnokd . In terms of guarantees I was going to suggest that the javadoc of |
Updated PR, squashed commits. |
👍 |
1 similar comment
👍 |
@davidmoten mind thinking about how to fix |
@zsxwing sure, I'll have a look at I think there is an outstanding race with this PR: if (!sa.isUnsubscribed()) {
sa.run();
} might have to be: if (!tasks.isUnsubscribed()) {
sa.run();
} I'll have a look a bit later today and I'll submit another PR if needed. |
|
Yep I just noticed that myself, ta. |
@zsxwing I think it should look like this: @Override
public void run() {
do {
if (tasks.isUnsubscribed()) {
queue.clear();
return;
}
ScheduledAction sa = queue.poll();
if (sa == null) {
return;
}
if (!sa.isUnsubscribed()) {
if (!tasks.isUnsubscribed()) {
sa.run();
} else {
queue.clear();
return;
}
}
} while (wip.decrementAndGet() != 0);
} |
There is an unsubscribe race condition similar to #3842 in `CachedThreadScheduler.EventLoopWorker` and `EventLoopsScheduler.EventLoopWorker`. Image the following execution order: | Execution Order | thread 1 | thread 2 | | ------------- | ------------- | ------------- | | 1 | | submit task A | | 2 | | submit task B | | 3 | unsubscribe Worker | | | 4 | unsubscribe task A | | | 5 | | task A won't run as it's unsubscribed | | 6 | | run task B | | 7 | unsubscribe task B | | So task B will run but its previous task A will be skipped. This PR adds a check before running an action and moves `workerUnderConcurrentUnsubscribeShouldNotAllowLaterTasksToRunDueToUnsubscriptionRace` to `AbstractSchedulerConcurrencyTests` to test all concurrent schedulers.
I noticed that when a worker built from
Schedulers.from(Executors.newFixedThread(1))
is unsubscribed the use of aCompositeSubscription
to track task subscriptions means that the tasks may be unsubscribed in any old order (CompositeSubscription
holds its subscriptions in aHashSet
). This means that if the worker is given task A and task B then the race can prevent A from running but allow B to run! I've included a unit test in this PR that demos it. Fails every time on my machine on the first loop.This PR is really for discussion about the problem and possible fixes.
I have included a possible fix which is to track overall subscription using a volatile boolean and check that boolean before running any task. If this was considered the way to go some further simplification would take place in the operator (might not need to check individual task subscriptions).
I haven't checked other schedulers for this sort of problem yet.