Skip to content

[ML] MlDailyMaintenanceService can block network threads #31683

@droberts195

Description

@droberts195

When ML is enabled the MlDailyMaintenanceService will run each day on the master node sometime during the two hours after midnight.

We have seen a hot threads dump where the findUnusedStateDocs part of the daily maintenance is blocking a Netty thread:

    0.0% (0s out of 500ms) cpu usage by thread 'elasticsearch[a_master_node][transport_client_boss][T#2]'
     10/10 snapshots sharing following 88 elements
       sun.misc.Unsafe.park(Native Method)
       java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
       java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
       java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
       java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
       org.elasticsearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:251)
       org.elasticsearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:94)
       org.elasticsearch.common.util.concurrent.FutureUtils.get(FutureUtils.java:50)
       org.elasticsearch.action.support.AdapterActionFuture.actionGet(AdapterActionFuture.java:34)
       org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator.initScroll(BatchedDocumentsIterator.java:103)
       org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator.next(BatchedDocumentsIterator.java:80)
       org.elasticsearch.xpack.ml.job.retention.UnusedStateRemover.findUnusedStateDocs(UnusedStateRemover.java:67)
       org.elasticsearch.xpack.ml.job.retention.UnusedStateRemover.remove(UnusedStateRemover.java:51)
       org.elasticsearch.xpack.ml.action.TransportDeleteExpiredDataAction.deleteExpiredData(TransportDeleteExpiredDataAction.java:71)
       org.elasticsearch.xpack.ml.action.TransportDeleteExpiredDataAction.lambda$deleteExpiredData$1(TransportDeleteExpiredDataAction.java:72)
       org.elasticsearch.xpack.ml.action.TransportDeleteExpiredDataAction$$Lambda$3975/994332408.accept(Unknown Source)
       org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:60)
       org.elasticsearch.xpack.ml.job.retention.AbstractExpiredJobDataRemover.removeData(AbstractExpiredJobDataRemover.java:49)
       org.elasticsearch.xpack.ml.job.retention.AbstractExpiredJobDataRemover.lambda$removeData$0(AbstractExpiredJobDataRemover.java:59)
       org.elasticsearch.xpack.ml.job.retention.AbstractExpiredJobDataRemover$$Lambda$3979/1077479085.accept(Unknown Source)
       org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:60)
       org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover.deleteModelSnapshots(ExpiredModelSnapshotsRemover.java:110)
       org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover.access$000(ExpiredModelSnapshotsRemover.java:42)
       org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover$2.onResponse(ExpiredModelSnapshotsRemover.java:120)
       org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover$2.onResponse(ExpiredModelSnapshotsRemover.java:116)
       org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:43)
       org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:85)
       org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:81)
       org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:43)
       org.elasticsearch.xpack.ml.action.TransportDeleteModelSnapshotAction$1.onResponse(TransportDeleteModelSnapshotAction.java:89)
       org.elasticsearch.xpack.ml.action.TransportDeleteModelSnapshotAction$1.onResponse(TransportDeleteModelSnapshotAction.java:81)
       org.elasticsearch.xpack.core.ml.job.persistence.JobDataDeleter.lambda$deleteModelSnapshots$1(JobDataDeleter.java:67)
       org.elasticsearch.xpack.core.ml.job.persistence.JobDataDeleter$$Lambda$3985/792193087.accept(Unknown Source)
       org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:60)
       org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:43)
       org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:85)
       org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:81)
       org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:43)
       org.elasticsearch.action.bulk.TransportBulkAction$BulkOperation$1.finishHim(TransportBulkAction.java:379)
       org.elasticsearch.action.bulk.TransportBulkAction$BulkOperation$1.onResponse(TransportBulkAction.java:360)
       org.elasticsearch.action.bulk.TransportBulkAction$BulkOperation$1.onResponse(TransportBulkAction.java:349)
       org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:85)
       org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:81)
       org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:43)
       org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.finishOnSuccess(TransportReplicationAction.java:921)
       org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase$1.handleResponse(TransportReplicationAction.java:840)
       org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase$1.handleResponse(TransportReplicationAction.java:826)
       org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1088)
       org.elasticsearch.transport.TcpTransport$2.doRun(TcpTransport.java:1484)
       org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
       org.elasticsearch.common.util.concurrent.EsExecutors$1.execute(EsExecutors.java:135)
       org.elasticsearch.transport.TcpTransport.handleResponse(TcpTransport.java:1476)
       org.elasticsearch.transport.TcpTransport.messageReceived(TcpTransport.java:1432)
       org.elasticsearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:64)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
       io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
       io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
       io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297)
       io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:413)
       io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
       io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
       io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:241)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
       io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
       io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1336)
       io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1127)
       io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1162)
       io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
       io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
       io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
       io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
       io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
       io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
       io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
       io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
       io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
       io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:545)
       io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499)
       io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
       io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
       java.lang.Thread.run(Thread.java:748)

MlDailyMaintenanceService is supposed to run in the ml_utility threadpool. We need to audit which thread each part of its work currently runs in, then ensure every stage actually does run in the ml_utility threadpool.

Additionally, there must be a flaw in the way MlDailyMaintenanceService is tested that means the assertions that actions don't run in network threads are not triggering.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions