-
Notifications
You must be signed in to change notification settings - Fork 689
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
W-16941297: Scatter Gather timeout exception #14192
Conversation
…tractForkJoinStrategyFactory
timeoutScheduler = schedulerService.cpuLightScheduler(SchedulerConfig.config() | ||
timeoutScheduler = schedulerService.ioScheduler(SchedulerConfig.config() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would need a good justification for this change.
The tasks being submitted to that scheduler are better defined by "CPU light" rather than "I/O intensive". If this change is needed for something to work, we definitely need to understand why, because initially it doesn't make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would having a underlying db connection counted as an I/O category?
We were able to notice that cpuLightScheduler
doesn't work when Scatter Gather contains a lot of routes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that matters. What matters is what you are doing in the tasks that you submit to the scheduler. For example if the tasks require sleeping or blocking on I/O a lot.
In this case I think the problem is that you are submitting just too many tasks, beyond the estimated capacity for the pool type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's possible. I've been scaling the SG with nested SG that maybe really not the actually customer scenario. Given that the bound is dependent on the # of cores, its possible my laptop wasn't able to handle that many cpu bound tasks.
core/src/main/java/org/mule/runtime/core/internal/event/AbstractEventContext.java
Show resolved
Hide resolved
@@ -146,7 +146,7 @@ private void handleTimeoutExceptionIfPresent(Scheduler timeoutScheduler, | |||
EventContext context = pair.getFirst().getContext(); | |||
if (context instanceof AbstractEventContext) { | |||
((AbstractEventContext) context).forEachChild(ctx -> timeoutScheduler | |||
.submit(() -> ctx.error(error.get().getCause()))); | |||
.submit(() -> ctx.error(new MessagingException(pair.getFirst(), error.get().getCause())))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Despite working fine, there were failures in the log
ERROR 2025-01-28 17:26:29,170 [pool-5-thread-2] [processor: ; event: ] org.mule.runtime.core.privileged.processor.MessageProcessors: Uncaught exception in childContextResponseHandler
java.lang.ClassCastException: class java.util.concurrent.TimeoutException cannot be cast to class org.mule.runtime.core.privileged.exception.MessagingException (java.util.concurrent.TimeoutException is in module java.base of loader 'bootstrap'; org.mule.runtime.core.privileged.exception.MessagingException is in module org.mule.runtime.core@4.9.0-20241025 of loader jdk.internal.loader.Loader @10cc327a)
at org.mule.runtime.core@4.9.0-20241025/org.mule.runtime.core.privileged.processor.MessageProcessors.lambda$childContextResponseHandler$14(MessageProcessors.java:582) ~[mule-core-4.9.0-20241025.jar:?]
at org.mule.runtime.core@4.9.0-20241025/org.mule.runtime.core.internal.event.AbstractEventContext.signalConsumerSilently(AbstractEventContext.java:310) ~[?:?]
at org.mule.runtime.core@4.9.0-20241025/org.mule.runtime.core.internal.event.AbstractEventContext.receiveResponse(AbstractEventContext.java:210) ~[?:?]
at org.mule.runtime.core@4.9.0-20241025/org.mule.runtime.core.internal.event.AbstractEventContext.error(AbstractEventContext.java:189) ~[?:?]
at org.mule.runtime.core.components@4.9.0-20241025/org.mule.runtime.core.internal.routing.forkjoin.AbstractForkJoinStrategyFactory.lambda$handleTimeoutExceptionIfPresent$6(AbstractForkJoinStrategyFactory.java:173) ~[?:?]
Therefore, creating a MessageException instance
--validate |
...in/java/org/mule/runtime/core/internal/routing/forkjoin/AbstractForkJoinStrategyFactory.java
Outdated
Show resolved
Hide resolved
timeoutScheduler = schedulerService.cpuLightScheduler(SchedulerConfig.config() | ||
timeoutScheduler = schedulerService.ioScheduler(SchedulerConfig.config() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that matters. What matters is what you are doing in the tasks that you submit to the scheduler. For example if the tasks require sleeping or blocking on I/O a lot.
In this case I think the problem is that you are submitting just too many tasks, beyond the estimated capacity for the pool type.
--validate |
...in/java/org/mule/runtime/core/internal/routing/forkjoin/AbstractForkJoinStrategyFactory.java
Outdated
Show resolved
Hide resolved
...t/java/org/mule/test/module/extension/streaming/AbstractBytesStreamingExtensionTestCase.java
Outdated
Show resolved
Hide resolved
...sions/marvel-extension/src/main/java/org/mule/test/marvel/drstrange/DrStrangeOperations.java
Outdated
Show resolved
Hide resolved
SonarQube Quality Gate |
@Override | ||
public Publisher<CoreEvent> decorateTimeoutPublisher(Publisher<CoreEvent> timeoutPublisher) { | ||
// When the timeout happens, the subscription to the original publisher is cancelled, so the inner MessageProcessorChains | ||
// never finishes and the child contexts are never completed, hence we have to complete them manually on timeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// never finishes and the child contexts are never completed, hence we have to complete them manually on timeout | |
// never finish and the child contexts are never completed, hence we have to complete them manually on timeout |
((AbstractEventContext) eventContext).forEachChild(allContexts::push); | ||
|
||
while (!allContexts.isEmpty()) { | ||
BaseEventContext ctx = allContexts.pop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be worth clarifying why we iterate in reverse order in this part
Co-authored-by: Axel Sanguinetti <asanguinetti@salesforce.com> (cherry picked from commit b4b8679)
Co-authored-by: Axel Sanguinetti <asanguinetti@salesforce.com> (cherry picked from commit b4b8679)
Co-authored-by: Axel Sanguinetti <asanguinetti@salesforce.com> (cherry picked from commit b4b8679)
Co-authored-by: Axel Sanguinetti <asanguinetti@salesforce.com> (cherry picked from commit b4b8679)
Ticket
W-16941297
Cause
When Scatter Gather times out, the routes that opened db connections, stay open. The clean up code that is registered by the connector for when the event context is completed is never invoked. Event contexts need to complete in order for their clean up jobs to be called.
When a timeout happens in a Reactor chain, the original Publisher is cancelled and the subscription switches to a new Publisher (the fallback publisher of the timeout). The original Publisher in this context contains the assembled inner chain. So, its cancellation prevents the AbstractPipeline from completing the context after all its processors.
Other ideas
StreamingGhostBuster
was deemed to have handled this since it's intention is to clean up unclosed stream and its related CursorStreamProvider object. However, in this situation, the reference is a strong reference thus StreamingGhostBuster wouldn't work.Fix
Calling the.error(...)
without the Scheduler pool caused the Scatter Gather to wait until the longestSLEEP(n)
completes.To help alleviate this behavior, this change utilizestimeoutScheduler
making the Scatter Gather timeout as expected and present the composite routing exception messages to the user instantly while theSELECT SLEEP(n)
query continue to execute. Once those complete, the.error(...)
method is called and submitted totimeoutScheduler
.ThetimeoutScheduler
that's created ascpuLightScheduler
is incapable of handling nested Scatter Gather's or large number of routes. This fix was tested with 70 routes with almost all of them timing out. Changing this toioScheduler
was capable to handling this scaling issue.Test Coverage
org/mule/runtime/core/internal/routing/forkjoin
that have timeout events being raised.