Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

W-16941297: Scatter Gather timeout exception #14192

Merged
merged 27 commits into from
Feb 13, 2025
Merged

Conversation

anandkarandikar
Copy link
Contributor

@anandkarandikar anandkarandikar commented Jan 31, 2025

Ticket

W-16941297

Cause

When Scatter Gather times out, the routes that opened db connections, stay open. The clean up code that is registered by the connector for when the event context is completed is never invoked. Event contexts need to complete in order for their clean up jobs to be called.
When a timeout happens in a Reactor chain, the original Publisher is cancelled and the subscription switches to a new Publisher (the fallback publisher of the timeout). The original Publisher in this context contains the assembled inner chain. So, its cancellation prevents the AbstractPipeline from completing the context after all its processors.

Other ideas

StreamingGhostBuster was deemed to have handled this since it's intention is to clean up unclosed stream and its related CursorStreamProvider object. However, in this situation, the reference is a strong reference thus StreamingGhostBuster wouldn't work.

Fix

  • When an item is emitted from the timeout publisher, we take care of completing the child context with error.
  • The completion needs to be propagated recursively because there could be nested chains too.
  • Added kill switch in case we break something.
  • Calling the .error(...) without the Scheduler pool caused the Scatter Gather to wait until the longest SLEEP(n) completes.
  • To help alleviate this behavior, this change utilizes timeoutScheduler making the Scatter Gather timeout as expected and present the composite routing exception messages to the user instantly while the SELECT SLEEP(n) query continue to execute. Once those complete, the .error(...) method is called and submitted to timeoutScheduler.
  • The timeoutScheduler that's created as cpuLightScheduler is incapable of handling nested Scatter Gather's or large number of routes. This fix was tested with 70 routes with almost all of them timing out. Changing this to ioScheduler was capable to handling this scaling issue.

Test Coverage

  • Currently there are tests in org/mule/runtime/core/internal/routing/forkjoin that have timeout events being raised.
  • Also leveraging a timeout scenario with Scatter Gather using test-extensions (marvel-extension) to mimic the delayed scenario. This is a similar approach to W-16941297: SG timeout issue mule-integration-tests#2634 but without needing the actual database in the picture, because we need to ensure that the underlying streams are closed.

timeoutScheduler = schedulerService.cpuLightScheduler(SchedulerConfig.config()
timeoutScheduler = schedulerService.ioScheduler(SchedulerConfig.config()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would need a good justification for this change.
The tasks being submitted to that scheduler are better defined by "CPU light" rather than "I/O intensive". If this change is needed for something to work, we definitely need to understand why, because initially it doesn't make sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would having a underlying db connection counted as an I/O category?
We were able to notice that cpuLightScheduler doesn't work when Scatter Gather contains a lot of routes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that matters. What matters is what you are doing in the tasks that you submit to the scheduler. For example if the tasks require sleeping or blocking on I/O a lot.
In this case I think the problem is that you are submitting just too many tasks, beyond the estimated capacity for the pool type.

Copy link
Contributor Author

@anandkarandikar anandkarandikar Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's possible. I've been scaling the SG with nested SG that maybe really not the actually customer scenario. Given that the bound is dependent on the # of cores, its possible my laptop wasn't able to handle that many cpu bound tasks.

@@ -146,7 +146,7 @@ private void handleTimeoutExceptionIfPresent(Scheduler timeoutScheduler,
EventContext context = pair.getFirst().getContext();
if (context instanceof AbstractEventContext) {
((AbstractEventContext) context).forEachChild(ctx -> timeoutScheduler
.submit(() -> ctx.error(error.get().getCause())));
.submit(() -> ctx.error(new MessagingException(pair.getFirst(), error.get().getCause()))));
Copy link
Contributor Author

@anandkarandikar anandkarandikar Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite working fine, there were failures in the log

ERROR 2025-01-28 17:26:29,170 [pool-5-thread-2] [processor: ; event: ] org.mule.runtime.core.privileged.processor.MessageProcessors: Uncaught exception in childContextResponseHandler
java.lang.ClassCastException: class java.util.concurrent.TimeoutException cannot be cast to class org.mule.runtime.core.privileged.exception.MessagingException (java.util.concurrent.TimeoutException is in module java.base of loader 'bootstrap'; org.mule.runtime.core.privileged.exception.MessagingException is in module org.mule.runtime.core@4.9.0-20241025 of loader jdk.internal.loader.Loader @10cc327a)
	at org.mule.runtime.core@4.9.0-20241025/org.mule.runtime.core.privileged.processor.MessageProcessors.lambda$childContextResponseHandler$14(MessageProcessors.java:582) ~[mule-core-4.9.0-20241025.jar:?]
	at org.mule.runtime.core@4.9.0-20241025/org.mule.runtime.core.internal.event.AbstractEventContext.signalConsumerSilently(AbstractEventContext.java:310) ~[?:?]
	at org.mule.runtime.core@4.9.0-20241025/org.mule.runtime.core.internal.event.AbstractEventContext.receiveResponse(AbstractEventContext.java:210) ~[?:?]
	at org.mule.runtime.core@4.9.0-20241025/org.mule.runtime.core.internal.event.AbstractEventContext.error(AbstractEventContext.java:189) ~[?:?]
	at org.mule.runtime.core.components@4.9.0-20241025/org.mule.runtime.core.internal.routing.forkjoin.AbstractForkJoinStrategyFactory.lambda$handleTimeoutExceptionIfPresent$6(AbstractForkJoinStrategyFactory.java:173) ~[?:?]

Therefore, creating a MessageException instance

@anandkarandikar
Copy link
Contributor Author

--validate

timeoutScheduler = schedulerService.cpuLightScheduler(SchedulerConfig.config()
timeoutScheduler = schedulerService.ioScheduler(SchedulerConfig.config()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that matters. What matters is what you are doing in the tasks that you submit to the scheduler. For example if the tasks require sleeping or blocking on I/O a lot.
In this case I think the problem is that you are submitting just too many tasks, beyond the estimated capacity for the pool type.

@anandkarandikar
Copy link
Contributor Author

--validate

Copy link

@Override
public Publisher<CoreEvent> decorateTimeoutPublisher(Publisher<CoreEvent> timeoutPublisher) {
// When the timeout happens, the subscription to the original publisher is cancelled, so the inner MessageProcessorChains
// never finishes and the child contexts are never completed, hence we have to complete them manually on timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// never finishes and the child contexts are never completed, hence we have to complete them manually on timeout
// never finish and the child contexts are never completed, hence we have to complete them manually on timeout

((AbstractEventContext) eventContext).forEachChild(allContexts::push);

while (!allContexts.isEmpty()) {
BaseEventContext ctx = allContexts.pop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be worth clarifying why we iterate in reverse order in this part

@asanguinetti asanguinetti merged commit b4b8679 into master Feb 13, 2025
8 checks passed
@asanguinetti asanguinetti deleted the fix/W-16941297 branch February 13, 2025 16:45
asanguinetti pushed a commit that referenced this pull request Feb 13, 2025
Co-authored-by: Axel Sanguinetti <asanguinetti@salesforce.com>
(cherry picked from commit b4b8679)
asanguinetti pushed a commit that referenced this pull request Feb 13, 2025
Co-authored-by: Axel Sanguinetti <asanguinetti@salesforce.com>
(cherry picked from commit b4b8679)
asanguinetti pushed a commit that referenced this pull request Feb 14, 2025
Co-authored-by: Axel Sanguinetti <asanguinetti@salesforce.com>
(cherry picked from commit b4b8679)
asanguinetti pushed a commit that referenced this pull request Feb 14, 2025
Co-authored-by: Axel Sanguinetti <asanguinetti@salesforce.com>
(cherry picked from commit b4b8679)
asanguinetti added a commit that referenced this pull request Feb 14, 2025
(cherry picked from commit b4b8679)
Co-authored-by: Anand Karandikar <164932509+anandkarandikar@users.noreply.github.com>
asanguinetti pushed a commit that referenced this pull request Feb 15, 2025
Co-authored-by: Axel Sanguinetti <asanguinetti@salesforce.com>
(cherry picked from commit b4b8679)
(cherry picked from commit 615b721)
(cherry picked from commit 24d822d)
asanguinetti added a commit that referenced this pull request Feb 15, 2025
Co-authored-by: Axel Sanguinetti <asanguinetti@salesforce.com>
(cherry picked from commit b4b8679)
(cherry picked from commit 615b721)
(cherry picked from commit 24d822d)

Co-authored-by: Anand Karandikar <164932509+anandkarandikar@users.noreply.github.com>
asanguinetti added a commit that referenced this pull request Feb 17, 2025
(cherry picked from commit b4b8679)
(cherry picked from commit b236715)
(cherry picked from commit 3b38c65)
Co-authored-by: Anand Karandikar <164932509+anandkarandikar@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants