Skip to content

Add new Threadpool for Cluster Coordination Activities #83576

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

original-brownbear
Copy link
Contributor

Using the generic pool for these all the activities surrounding cluster coordination
seems broken. They all get blocked on the same mutex in the Coordinator eventually
and run effectively sequentially.
Particularly in clusters with larger node counts this could lead to situations where
lots of generic threads are needlessly spun up only for the purpose of waiting on the mutex.
Since we at times also lock on the mutex in the coordinator on transport threads, it is
particularly unfortunate when there's lots of generic threads waiting on it.

=> fix this by using a single threaded pool for coordination work.
This also allows inspecting the queue size for these tasks for debugging purposes instead of having them hidden in the generic pool's queue.

Using the generic pool for these all the activities surrounding cluster coordination
seems broken. They all get blocked on the same mutex in the `Coordinator` eventually
and run effectively sequentially.
Particularly in clusters with larger node counts this could lead to situations where
lots of generic threads are needlessly spun up only for the purpose of waiting on the mutex.
Since we at times also lock on the mutex in the coordinator on transport threads, it is
particularly unforuntate when there's lots of generic threads waiting on it.

=> fix this by using a single threaded pool for coordination work.
@original-brownbear original-brownbear added :Distributed Coordination/Cluster Coordination Cluster formation and cluster state publication, including cluster membership and fault detection. team-discuss v8.2.0 labels Feb 7, 2022
@elasticmachine elasticmachine added the Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. label Feb 7, 2022
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@DaveCTurner
Copy link
Contributor

Since we at times also lock on the mutex in the coordinator on transport threads

That sounds like a bug, we do nontrivial IO (writing the cluster state to disk) under this mutex. Let's fix this.

I'm +1 on the general idea here, my thinking being that the CoordinatorTests already run all this stuff on a single thread which should give us confidence that there's no lurking deadlocks. We'd need to spend some time checking that we really do run all this stuff in those tests. I also worry about removing concurrency from the code paths that don't acquire the mutex today.

@@ -219,6 +220,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5), false)
);
builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1, false));
builders.put(Names.COORDINATION, new ScalingExecutorBuilder(Names.COORDINATION, 1, 1, TimeValue.timeValueSeconds(30), false));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to not introduce a new scaling thread pool that does'nt handle rejections after shutdown. Given the limited area where this thread pool is used I think we could handle rejection everywhere and set the rejectAftershutdown flag to true here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ I made it a fixed pool of size 1 now. And fixed the one spot where this could have failed that I found.

@original-brownbear
Copy link
Contributor Author

Jenkins run elasticsearch-ci/part-2 (unrelated + known)

@original-brownbear
Copy link
Contributor Author

@DaveCTurner

I also worry about removing concurrency from the code paths that don't acquire the mutex today.

I looked into this when coming up with this PR. The spots I found were:

  • Lag detector mostly doesn't block but sometimes blocks on the coordinator mutex. If it doesn't block it's super fast though. I figured it's not critical so not worth optimizing by running it on generic, certainly no point in spinning up an extra thread to run the lag detector IMO.
  • Some connecting and disconnecting happens outside the mutex. This stuff is super fast because the triggers for these events generally fork to generic already anyway, so moving it all on this one thread is fine IMO and shouldn't introduce much if any latency. Plus, there's no guarantees that GENERIC will actually get scheduled faster.

I should also add that the motivation for this PR is around some of the transport requests that fork to generic logging slow warnings. I can't for the life of me come up with a reason why they would be so slow. But I think there's a certain chance that it's the unfortunate combination of an extremely contended generic pool queue and just handling many of these requests at times.

That sounds like a bug, we do nontrivial IO (writing the cluster state to disk) under this mutex. Let's fix this.

Sorry this turned out to be (sorta) fake news. We actually only run into this in tests because of the way we schedule some callbacks on shutdown. In production operation I couldn't see any contention here so nothing to worry about. The contention on this lock however is pretty extreme in some scenarios and running up tons of waiting GENERIC threads on a non-fair mutex seems broken.

@DaveCTurner
Copy link
Contributor

DaveCTurner commented Feb 8, 2022

Great, thanks for checking. Yes the lag detector should be non-critical.

Some connecting and disconnecting happens outside the mutex.

If we're not already connected then opening a connection will do its stuff on the transport worker and ultimately complete the listener on the GENERIC threadpool. Looks a bit tricky to fix because of how multiple listeners can subscribe to a single connection attempt, but it'd be really helpful to know for sure that we're always on the COORDINATION thread. Not sure it makes sense always to fork, given how mostly we'll already be connected so it's a no-op, but is there a good way to get back to the right thread here only if needed?

Also yikes we acquire the mutex on the master update thread when reconfiguring (I opened #83682)

unfortunate combination of an extremely contended generic pool queue

Just trying to understand this a bit more. AIUI this PR mostly removes contention at the consumer side of the threadpool queue? I suppose it'll fix contention between enqueuing coordinator and non-coordinator activities but is that a problem? Maybe appending to a nonempty queue is cheaper than hunting for (or creating) a spare executor when the queue is empty? Still very surprising if it takes ≥5s to complete.

@original-brownbear
Copy link
Contributor Author

Not sure it makes sense always to fork, given how mostly we'll already be connected so it's a no-op, but is there a good way to get back to the right thread here only if needed?

Not that I'm aware of. We had similar situations before where it would be nice to "know if we have to fork" but we just don't have the primitive for that. We could probably add it, but I'm not sure it's absolutely necessary.

I suppose it'll fix contention between enqueuing coordinator and non-coordinator activities but is that a problem?

Yea exactly, this is what this aims at. Whether this is a problem I can't say, but it's pretty much the last explanation for some issues that I can think of. What I have as far as clues go is this:

  • Profiling showed contention on enqueuing to the generic pool (in fact this is the first time ever that I've seen Futex* something methods get hot enough to show up in the async profiler). I fixed one big thing that was contending the pool on data node in Fix PendingReplicationActions Submitting lots of NOOP Tasks to GENERIC #82092 (we were submitted endless noop tasks to GENERIC)
  • I recently learnt that creating threads can be quite slow in cloud environments that see CPU contention from a user issue. We were creating too many generic threads from too many parallel recoveries and it was causing endless delays (as in seconds) in scheduling on GENERIC even though other pools ran fine (or at least better).
  • the checker inbound handlers are slow somewhere on some nodes. The only place they can be slow is scheduling on generic I think. I can't think of any other thing that could be slow in those code paths. The only alternative explanation I can come up with would be memory pressure => GC blocks but I don't have that in the logs in all cases.

Maybe appending to a nonempty queue is cheaper than hunting for (or creating) a spare executor when the queue is empty?

Hunting for should be cheaper than appending with the ExecutorScalingQueue which is just a linked transfer queue. Hunting calls tryTransfer which is faster than transfer even with timeout 0. Creating a thread is more expensive obviously. I could see a combination of slowness in creating threads taking turns with contention on an empty pool where tons of generic pools are contending on an empty pool maybe causing long waits randomly. The pool is 128 in size at least. Running into 128 with a 4 CPU master node while it GCs and/or does IO, I could see randomly blocking for a long time.

I figured we have a single mutex all of this stuff runs on. There's absolutely no reason to spin up extra threads to wait on a mutex in parallel and there's almost nothing running outside the mutex in the spots I changed. This change saves CPU + memory overall and saves time on transport threads in particular by never forcing a thread creation from the transport thread itself for these. Also saves a little time on tests actually it seems. At least profiling with YourKit claims that many seconds of CPU time (not wall time) go into acquiring that mutex in server's internal cluster test.

@DaveCTurner
Copy link
Contributor

No further comments really, I'm good to proceed, but I'd like to see if anyone raises anything we've not already covered in a team discussion too.

If we do make this single-threaded then I think we may as well follow up by trying to remove the mutex entirely. Obviously that would need #83682 and the point about opening connections forking to GENERIC and a bunch of other things too.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.


@Override
public void onRejection(Exception e) {
logger.debug("threadpool was shut down", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I follow this, is that not identical to the default ThreadedRunnable behavior (used by schedule), except not checking that the threadpool was indeed shutdown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea the problem was that inside ThreadedRunnable.run we just call:

                executor.execute(runnable);

If this is called with an AbstractRunnable then we will call onRejection on it instead of throwing the rejection and the handling in ThreadedRunnable doesn't come into play => we need to manually catch this case here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, thanks, I have been here before 🙂

@original-brownbear
Copy link
Contributor Author

Thanks all!

@original-brownbear original-brownbear merged commit acdd148 into elastic:master Feb 22, 2022
@original-brownbear original-brownbear deleted the coordinating-thread branch February 22, 2022 13:33
probakowski pushed a commit to probakowski/elasticsearch that referenced this pull request Feb 23, 2022
Using the generic pool for these all the activities surrounding cluster coordination
seems broken. They all get blocked on the same mutex in the `Coordinator` eventually
and run effectively sequentially.
Particularly in clusters with larger node counts this could lead to situations where
lots of generic threads are needlessly spun up only for the purpose of waiting on the mutex.
Since we at times also lock on the mutex in the coordinator on transport threads, it is
particularly unforuntate when there's lots of generic threads waiting on it.

=> fix this by using a single threaded pool for coordination work.
DaveCTurner added a commit to DaveCTurner/elasticsearch that referenced this pull request Mar 1, 2022
In elastic#83576 we moved cluster coordination activities to a threadpool which
rejects actions on shutdown. elastic#84483 is a test failure caused by a
missing rejection handler, which this commit addresses.

Closes elastic#84483
DaveCTurner added a commit that referenced this pull request Mar 1, 2022
In #83576 we moved cluster coordination activities to a threadpool which
rejects actions on shutdown. #84483 is a test failure caused by a
missing rejection handler, which this commit addresses.

Closes #84483
@boicehuang
Copy link
Contributor

@DaveCTurner @original-brownbear I wonder if we can move FOLLOWER_CHECK_ACTION and LEADER_CHECK_ACTION
handler into the cluster coordination thread pool? They are part of cluster coordination work.

@DaveCTurner
Copy link
Contributor

It's deliberate that those actions' request handlers run on SAME because there's almost never any need to fork off the transport worker thread. In the rare cases where there is a need to fork we already use the CLUSTER_COORDINATION threadpool.

DaveCTurner added a commit to DaveCTurner/elasticsearch that referenced this pull request Apr 28, 2022
In elastic#83576 we made it so that forking off the transport thread when
handling a follower check might be rejected if we're shutting down. In
that case we don't pass the rejection exception back out and instead
rely on the transport connection's close to notify the caller. We also
manipulate the transport channel directly which misses some exception
cases. This commit adjusts the implementation to fix these things.
elasticsearchmachine pushed a commit that referenced this pull request May 6, 2022
In #83576 we made it so that forking off the transport thread when
handling a follower check might be rejected if we're shutting down. In
that case we don't pass the rejection exception back out and instead
rely on the transport connection's close to notify the caller. We also
manipulate the transport channel directly which misses some exception
cases. This commit adjusts the implementation to fix these things.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Coordination/Cluster Coordination Cluster formation and cluster state publication, including cluster membership and fault detection. >refactoring Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. v8.2.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants