Skip to content

Relocating many shards at once results in canceled relocations #18739

Closed
@jonaf

Description

@jonaf

Elasticsearch version: 1.7.3 (build hash 05d4530)

JVM version: java-1.8.0-openjdk-1.8.0.77-0.b03.9.amzn1.x86_64 (OpenJDK 8 update 77)

OS version: Linux 4.1.17-22.30.amzn1.x86_64 x86_64 GNU/Linux (Amazon Linux, quite similar to CentOS)

Description of the problem including expected versus actual behavior: In previous versions of Elasticsearch, we would "resize" an Elasticsearch cluster in AWS EC2 by disabling shard allocation, provisioning new EC2 nodes that joined the existing cluster, and using the reroute API to move shards from the old nodes to the new ones. We would allow Elasticsearch to do this at maximum speed by scheduling all shard relocations at once (we run an algorithm to decide which nodes get which shard movements, and then send one, big reroute API call to Elasticsearch). In previous versions (1.4 and earlier) of ES, this worked like a charm, even with ~30,000 shard movements (yes I know -- you can imagine how big the EC2 clusters are). However, when we upgraded to Elasticsearch 1.7.3, we noticed that when we did this, we would schedule, say, 9k shard movements, and Elasticsearch would start relocating shards, but after about an hour, the ES cluster health API would suddenly start reporting that there were only ~800 relocations. Since we knew it was impossible for ES to have moved the other ~8,000 shards in such an incredibly short window, we knew something was up. Upon investigation (or, enabling debug logging for "index.shard" activity), we saw that the shard relocations were canceled. Destination nodes had a log message like this:

[2016-06-03 21:31:01,898][DEBUG][index.shard ] [elasticsearch-36-48] [&shared-banana-bilberry][4] state: [RECOVERING]->[CLOSED], reason [failed recovery]

We also saw some errors that look like this:

[2016-06-03 21:31:01,898][WARN ][indices.cluster ] [elasticsearch-36-48] [[&shared-banana-bilberry][4]] marking and sending shard failed due to [failed recovery]
org.elasticsearch.indices.recovery.RecoveryFailedException: [&shared-banana-bilberry][4]: Recovery failed from [elasticsearch-37-41][5pss4NQJRWifhVst_AaPoQ][elasticsearch-37-41][inet[/10.100.37.41:9300]]{client_bv=false, availability_zone=a, master=false} into [elasticsearch-36-48][SZpn-ZwFTe2qrqk9JdLWdw][elasticsearch-36-48][inet[/10.100.36.48:9300]]{client_bv=false, availability_zone=a, master=false} (no activity after [30m])
at org.elasticsearch.indices.recovery.RecoveriesCollection$RecoveryMonitor.doRun(RecoveriesCollection.java:235)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:36)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.ElasticsearchTimeoutException: no activity after [30m]
... 5 more

We also, in some cases, got out of memory exceptions trying to spawn new threads. Those errors took this form:

Caused by: org.elasticsearch.transport.RemoteTransportException: [elasticsearch-40-173][inet[/10.100.40.173:9300]][internal:index/shard/
recovery/start_recovery]
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1368)
at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:79)
at org.elasticsearch.transport.netty.MessageChannelHandler.handleRequest(MessageChannelHandler.java:224)
at org.elasticsearch.transport.netty.MessageChannelHandler.messageReceived(MessageChannelHandler.java:114)
at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)

As it turns out, the root reason for this is because Elasticsearch spawns a new RecoveryMonitor thread (in the "generic" thread pool, which is a cached (unbounded!) thread pool) for every relocating shard, in order to monitor its state (see: https://github.com/elastic/elasticsearch/blob/1.7/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java#L68).

Previous versions of ES didn't have this monitoring feature, and so we could schedule tens of thousands of relocations, send the request off to Elasticsearch, and just wait for it to complete.

As it stands now, once we send this reroute API call off to Elasticsearch, ES will cancel the majority of the shards in the request and we'll eventually see that, from our application's perspective, we've relocated all the shards. We then re-run the allocation algorithm as a final verification step, and see that there are still thousands (or tens of thousands) of shards remaining on the old EC2 nodes, which must be relocated, so we send off the reroute API again, and so on. This works, but every time we do it, we run the risk of getting out of memory exceptions.

In a nutshell, I'm convinced that Elasticsearch documentation and code should reject reroute API calls and put a maximum on the number of relocating shards equivalent to whatever Elasticsearch can reasonably handle. This way, I can send a reroute API call with 30,000 shards in it, and Elasticsearch will give me a response that says, "OK, I can move these 10,000 shards, but these other 20,000 shards cannot be moved yet -- you'll have to send them in a later batch after the first 10,000 are done." This way I can make my application my responsive to the maximum capabilities of Elasticsearch and just send the maximum batch size the current Elasticsearch cluster can handle. It's important for this kind of feedback to come from Elasticsearch because otherwise it's incredibly difficult to decide how many relocations I can send to the reroute API at once, since it will vary by the performance characteristics (size, number of cores, disk performance, and settings) of the Elasticsearch cluster itself.

One other idea might be to have a separate thread pool for recovery, and put a cap on it based on the number of cores or something, and that way I can just ask Elasticsearch the maximum relocations that I can give it at once, and I can batch up my reroute API calls to send batches of the most efficient size. This, of course, still doesn't help with the fact that sending the maximum batch size may ultimately result in some or many of the queued up relocating shards to timeout after 30 minutes.

Another idea might be instead of timing out due to "inactivity," timing out if there is some error condition -- I believe the activity of the shard itself is less relevant than, for example, whether the node sending the shard is still part of the cluster.

Lastly, I think that it would be very helpful if the log messages were a little bit more clear about what's happening. The log message on the destination nodes look like this, and it's somewhat confusing at first glance, but what's actually happening is the node created a local shard so that the copy could occur, then RecoveryMonitor threw an error since there was no activity for 30 minutes, and then the node has to actually delete the local shard to cancel the relocation. This series of events is not made clear by these log messages, even though that's exactly what they mean is happening:

[2016-06-04 00:04:15,301][DEBUG][index.shard ] [elasticsearch-36-48] [&shared-cloudberry-cherry][5] state: [CREATED]
[2016-06-04 00:04:15,301][DEBUG][index.shard ] [elasticsearch-36-48] [&shared-cloudberry-cherry][5] scheduling optimizer / merger every 1s
[2016-06-04 00:04:15,301][DEBUG][index.shard ] [elasticsearch-36-48] [&shared-cloudberry-cherry][5] state: [CREATED]->[RECOVERING], reason [from [elasticsearch-47-212][7A2hpTC1T-Sh428skXDbcQ][elasticsearch-47-212][inet[/10.100.47.212:9300]]{client_bv=false, availability_zone=c, master=false}]

followed by

[2016-06-04 04:26:14,984][DEBUG][indices.recovery ] [elasticsearch-36-48] [&shared-cloudberry-cherry][5] recovery canceled (reason: [shard closed])
[2016-06-04 04:26:14,984][DEBUG][index.shard ] [elasticsearch-36-48] [&shared-cloudberry-cherry][5] state: [RECOVERING]->[CLOSED], reason [removing shard (not allocated)]

I think, at least, the "reason" could be clarified that in fact, all of this is happening due to the lack of shard activity. Although the shard activity error does appear, it appears out-of-order in the logs, and so it's not clear that this "reason" is actually a direct result of the warning:

[2016-06-04 01:04:15,324][WARN ][indices.cluster ] [elasticsearch-36-48] [[&shared-cherry-strawberry][3]] marking and sending shard failed due to [failed recovery]
org.elasticsearch.indices.recovery.RecoveryFailedException: [&shared-cherry-strawberry][3]: Recovery failed from [elasticsearch-36-34][ogK7RcvLTXqxI-d7jetmKQ][elasticsearch-36-34][inet[/10.100.36.34:9300]]{client_bv=false, availability_zone=a, master=false} into [elasticsearch-36-48][SZpn-ZwFTe2qrqk9JdLWdw][elasticsearch-36-48][inet[/10.100.36.48:9300]]{client_bv=false, availability_zone=a, master=false} (no activity after [30m])
at org.elasticsearch.indices.recovery.RecoveriesCollection$RecoveryMonitor.doRun(RecoveriesCollection.java:235)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:36)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Metadata

Metadata

Assignees

No one assigned

    Labels

    :Distributed Indexing/DistributedA catch all label for anything in the Distributed Indexing Area. Please avoid if you can.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions