Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some way to pass context to scheduled threads when using PollerMetadata (or make it customizable, or let it pass a TaskDecorator, or use the passed TaskExecutor) #9524

Closed
nightswimmings opened this issue Oct 2, 2024 · 13 comments

Comments

@nightswimmings
Copy link

nightswimmings commented Oct 2, 2024

I need my integration flows to see parent thread context (threadlocals, requestcontext, securitycontext etc..) so I use the setDecoratorTask on the custom TaskExecutor. This works well when IntegrationFlow is built from a Jms.messageDrivenChannelAdapter for instance.

But I also have an IntegrationFlow.fromSupplier(supplier, spec -> spec.taskExecutor(custom)), and in this case, it looks like a TaskScheduler executor is built-in by default to wrap my custom TaskExecutor in PollerMetadata so it opens an intermediate thread that makes my TaskDecorator not being able to see the main thread context and therefore propagate it to spawned tasks.

TaskScheduler does not seem to be configurable and I can't find an option to pass a TaskDecorator to it's "sheduling-" name prefixed threads

@nightswimmings nightswimmings added status: waiting-for-triage The issue need to be evaluated and its future decided type: enhancement labels Oct 2, 2024
@artembilan
Copy link
Member

Let's if this helps somehow: https://docs.spring.io/spring-integration/reference/6.4-SNAPSHOT/configuration/namespace-taskscheduler.html!

In other words you can provide your own TaskScheduler instance ad register it with that IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME.

Also pay attention to this chapter: https://docs.spring.io/spring-integration/reference/6.4-SNAPSHOT/security.html#security-context-propagation.

The point is that threads around QueueChannel are fully different between producer and consumer of the message.
The producer sends and forgets.
The consumer is able to pick up the message only from a scheduled thread which has no knowledge about producer side.
Therefore only the way to do that is via message headers.

Not sure, though, what kind of context could be propagated from the source polling channel adapter...

@artembilan artembilan added status: waiting-for-reporter Needs a feedback from the reporter and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Oct 2, 2024
@nightswimmings
Copy link
Author

nightswimmings commented Oct 2, 2024

Thanks for all the interesting info!!

I would not like to expose a global TaskScheduler (or touch the autoconfigured one), because my needs are specific to this integrationflow, and because I think that exposing a TaskScheduler has also side-effects over other non-integration spring domains (like @Scheduleds)

About the question on what kind of context propagate, for instance when using a filter that checks the message corresponds to your TenantContext, or simply use any bean that is scoped (@JobScoped or @RequestScoped in filter() or handle()). For this you need to decorate the task and propagate the contexts. If you use a fromSupplier() as a source, your taskexecutor is wrapped on a scheduler and the context is not propagated, so I think some convenient methods for dealing with this in the PollerMetadata could be helpful

@artembilan
Copy link
Member

Yeah... I might need more info about your use-case.
For now I can suggest you to look into an Advice option of the PollerSpec.advice(Advice... advice).
That is done around the whole poll() process including publishing message downstream and its handling.
Another way would be through catching that context from your supplier and store it into some message header.
Then that message header could be restored to ThreadLocal of the target thread using ContextHolderRequestHandlerAdvice: https://docs.spring.io/spring-integration/reference/6.4-SNAPSHOT/handler-advice/context-holder.html

@nightswimmings
Copy link
Author

I'll check the first option! The other one I already disregarded!
Thanks!

@nightswimmings
Copy link
Author

nightswimmings commented Oct 12, 2024

BTW, the documentation says

When polling consumers provide an explicit task executor reference in their configuration, the invocation of the handler methods happens within that executor’s thread pool and not the main scheduler pool. However, when no task executor is provided for an endpoint’s poller, it is invoked by one of the main scheduler’s threads.

And this does not seem the case, because it looks like the task is executed in the scheduler which wraps the call to the custom executor, instead of passing it to the latter directly

@nightswimmings
Copy link
Author

Perhaps is easier if I explain my need. Basically I just need to create an IntegrationFlow from a supplier that is a @Request-Scoped List bean. IntegrationFlows cannot be Request-Scoped, so I need a way to set the context at supplier time before retrieving next value from the list in the supplier lambda

@artembilan
Copy link
Member

it looks like the task is executed in the scheduler which wraps the call to the custom executor, instead of passing it to the latter directly

Perhaps we look into different things.
The logic there is like this:

this.runningTask = taskScheduler.schedule(createPoller(), this.trigger);
...
private Runnable createPoller() {
		return () ->
				this.taskExecutor.execute(() -> {
					int count = 0;
					while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
						if (this.maxMessagesPerPoll == 0) {
							logger.info("Polling disabled while 'maxMessagesPerPoll == 0'");
							break;
						}
						if (pollForMessage() == null) {
							break;
						}
						count++;
					}
				});
}

So, scheduler triggers that simple Runnable which, in turn pushes the task into that taskExecutor.
So, you Supplier is called as a part of that pollForMessage().
Therefore whatever is there in the docs is correct.
The problem is that this taskExecutor is really triggered from a scheduled thread, which indeed has no knowledge about your request scope.
It it really cannot, because it is a scheduled thread initiated by the start of the AbstractPollingEndpoint.
There is just no any request scope and it cannot be because this thread is out of web server loop.

You probably need to revise your solution into something what is really initiated from the request.
Something like @MessagingGateway to be called from your @RequestMapping.
There you can gather all the required context and store it into the message headers.
The singleton flow then would be able to stay stateless and deal with context initiated with a message in its headers.

See more info in docs: https://docs.spring.io/spring-integration/reference/gateway.html

@nightswimmings
Copy link
Author

Thanks for the explanation @artembilan
I finally went the fast way,

   @Bean(name = IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME)
    public TaskScheduler taskScheduler(ThreadPoolTaskSchedulerBuilder threadPoolTaskSchedulerBuilder) {
        ThreadPoolTaskScheduler taskScheduler = new DecorableThreadPoolTaskScheduler(customContextPropagatorTaskDecorator,
                                                                                              threadPoolTaskSchedulerBuilder.build());
        taskScheduler.afterPropertiesSet();
        return taskScheduler;
    }
    
    @RequiredArgsConstructor
    public static class DecorableThreadPoolTaskScheduler extends ThreadPoolTaskScheduler implements TaskScheduler {

        private final TaskDecorator taskDecorator;

        @Delegate(excludes = OverriddenMethods.class)
        private final ThreadPoolTaskScheduler delegate;

        @Override @Nullable
        public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
            return delegate.schedule(taskDecorator.decorate(task), trigger);
        }

        @SuppressWarnings("all")
        interface OverriddenMethods {
            ScheduledFuture<?> schedule(Runnable task, Trigger trigger);
        }
    }
I don't like messing with the global TaskExecutor but I had to unblock the situation. I'll check your suggestions

Perhaps if the API allowed to set a TaskDecorator for the Scheduler... I even tried a backdoor like
spec -> spec.autoStartup(false).poller(poller).getObject().getT1().getObject().setTaskScheduler()
but looks like it needs to be initialised first

@artembilan
Copy link
Member

We may just expose the whole taskScheduler() option to be more specific for particular channel adapter.
However I'd like to know what does that customContextPropagatorTaskDecorator do, so that helps your use-case?
Thanks

@artembilan
Copy link
Member

Doesn't look like you need a custom TaskSheduler impl, though.
The ThreadPoolTaskScheduler has a respective option:

	/**
	 * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
	 * about to be executed.
	 * <p>Note that such a decorator is not being applied to the user-supplied
	 * {@code Runnable}/{@code Callable} but rather to the scheduled execution
	 * callback (a wrapper around the user-supplied task).
	 * <p>The primary use case is to set some execution context around the task's
	 * invocation, or to provide some monitoring/statistics for task execution.
	 * @since 6.2
	 */
	public void setTaskDecorator(TaskDecorator taskDecorator) {

The ThreadPoolTaskSchedulerBuilder from Spring Boot can be supplied with a customizers:

	/**
	 * Set the {@link ThreadPoolTaskSchedulerCustomizer
	 * threadPoolTaskSchedulerCustomizers} that should be applied to the
	 * {@link ThreadPoolTaskScheduler}. Customizers are applied in the order that they
	 * were added after builder configuration has been applied. Setting this value will
	 * replace any previously configured customizers.
	 * @param customizers the customizers to set
	 * @return a new builder instance
	 * @see #additionalCustomizers(ThreadPoolTaskSchedulerCustomizer...)
	 */
	public ThreadPoolTaskSchedulerBuilder customizers(ThreadPoolTaskSchedulerCustomizer... customizers) {

@artembilan artembilan added this to the 6.4.0-RC1 milestone Oct 14, 2024
@artembilan artembilan added in: core and removed status: waiting-for-reporter Needs a feedback from the reporter labels Oct 14, 2024
@artembilan
Copy link
Member

So, since we have figured out what exactly is your need, I decided to go ahead and exposed SourcePollingChannelAdapterSpec.taskScheduler() option.
This has nothing to do with polling logic, but rather and endpoint behavior, therefore it is here, but not in the PollerMetadata.

Still want to here a use-case for that customContextPropagatorTaskDecorator 😄

@nightswimmings
Copy link
Author

This is perfect!

I am triggering a job on every REST request, and the polling source for my StandardIntegrationFlow is a @RequestScoped collection that i fill previously in every request. Then I start and stop the StandardIntegrationFlow manually at my convenience to process the input

@artembilan
Copy link
Member

OK. Dynamic IntegrationFlow. Then passing scope is legit request.
Therefor whatever fix I've will be helpful for you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants