Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send the gateway Timeout Exception into the errorChannel and not a TemporaryReplyChannel #8705

Closed
mstankala opened this issue Aug 17, 2023 · 3 comments

Comments

@mstankala
Copy link

mstankala commented Aug 17, 2023

Expected Behavior

Gateway timeout exceptions, like any other exceptions thrown in the flow, are send to the errorChannel.
Also a custom errorMessage handling is possible.

Current Behavior

A gateway timeout exception is not send to the errorChannel (like any other exceptions in the flow), but a TemporaryReplyChannel.
In this case a custom errorChannel exception-handling is not impossible.

Context
Because of the breaking-change in Version 6.1.0. (https://github.com/spring-projects/spring-integration/wiki/Spring-Integration-6.0-to-6.1-Migration-Guide#do-not-block-by-default).

Code example, with this behavior:

 return IntegrationFlow
                .from(Http.inboundGateway(...))
                .channel("gatewayTimeoutTest")
                .get();

return IntegrationFlow.from("gatewayTimeoutTest")
                .enrichHeaders(Map.of(MessageHeaders.ERROR_CHANNEL, "MY_ERROR_CHANNEL"))
                .gateway(longRunningSubflow())
                .get();


return IntegrationFlow.from("MY_ERROR_CHANNEL")
                 // .custom-log/handle/etc.
                 .get(); 

Because a custom errorChannel was defined, I expect that all exceptions from the longRunningSubflow(), are passed in payload to 'MY_ERROR_CHANNEL' ().
It works fine, but in case the longRunningSubflow() needs more then 30 Seconds, a timeout exception is thrown, ignoring the
errorChannel configuration, instead it is passed to a dynamically created TemporaryReplyChannel.

@mstankala mstankala added status: waiting-for-triage The issue need to be evaluated and its future decided type: enhancement labels Aug 17, 2023
@artembilan
Copy link
Member

How about to configure that error channel exactly on the gateway()?
The MessagingGatewaySupport does not consult request message headers for that purpose:

private Object sendAndReceive(Object object, boolean shouldConvert) {
		...
		try {
			...
				replyMessage = doSendAndReceive(channel, object, requestMessage);
                        ...
		}
		catch (Throwable ex) { // NOSONAR (catch throwable)
			logger.debug(() -> "failure occurred in gateway sendAndReceive: " + ex.getMessage());
			reply = ex;
		}

		if (reply instanceof Throwable || reply instanceof ErrorMessage) {
			...
			return handleSendAndReceiveError(object, requestMessage, error, shouldConvert);
		}
		return reply;
	}


private Object handleSendAndReceiveError(Object object, @Nullable Message<?> requestMessage, Throwable error,
			boolean shouldConvert) {

		MessageChannel errorChan = getErrorChannel();
		if (errorChan != null) {
			ErrorMessage errorMessage = buildErrorMessage(requestMessage, error);
			Message<?> errorFlowReply = sendErrorMessageAndReceive(errorChan, errorMessage);
                         ...
		}
		else if (error instanceof Error) {
			throw (Error) error;
		}
		else {
                        ...
			rethrow(errorToReThrow, "gateway received checked Exception");
			return null; // unreachable
		}
	}

There is also this option for a gateway to configure:

	/**
	 * If errorOnTimeout is true, construct an instance that will send an
	 * {@link org.springframework.messaging.support.ErrorMessage} with a
	 * {@link org.springframework.integration.MessageTimeoutException} payload to the error channel
	 * if a reply is expected but none is received. If no error channel is configured,
	 * the {@link org.springframework.integration.MessageTimeoutException} will be thrown.
	 * @param errorOnTimeout true to create the error message on reply timeout.
	 * @return the spec
	 * @since 5.2.2
	 * @see MessagingGatewaySupport#setErrorOnTimeout
	 */
	public S errorOnTimeout(boolean errorOnTimeout) {

@artembilan artembilan added status: waiting-for-reporter Needs a feedback from the reporter and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Aug 17, 2023
@mstankala
Copy link
Author

mstankala commented Aug 18, 2023

Thanks Artem,

How about to configure that error channel exactly on the gateway()?

No, it doesn't work. Here ist my Example.
If I throw a simple IllegalArgumentException, then it is send to the errorChannel as expected.
In case of replayTimeout, the errorChannel is not used to log the error.
Se my running example below.

There is also this option for a gateway to configure: errorOnTimeout()

I don't see this option in my gatewaySpec. Can you provide an example?


@Configuration
public class GatewayTimeoutTestFlow {

    private final String logCat = "GatewayTimeoutTestFlow";

    @Bean
    public IntegrationFlow testMeFlow(
         @Qualifier("myExecutorService") ExecutorService myExecutorService) {
        return IntegrationFlow
                .from(Http.inboundGateway("testme").requestMapping(r -> r.methods(GET)))
                .enrichHeaders(Map.of(MessageHeaders.ERROR_CHANNEL, "MY_ERROR_CHANNEL"))
                .log(INFO, logCat, m -> "Starting flow")
                .handle((h, s) -> List.of("first", "second"))
                .gateway(flow -> flow
                                .split()
                                .channel(c -> c.executor(myExecutorService))
                                .gateway(veryImportantLongRunningBusinessProcess(),
                                        spec -> spec.errorChannel("MY_ERROR_CHANNEL"))
                                .aggregate(),
                        spec -> spec.errorChannel("MY_ERROR_CHANNEL"))
                .log(INFO, logCat, m -> "Done flow")
                .get();
    }

    private IntegrationFlow veryImportantLongRunningBusinessProcess() {
        return flow -> flow
                .log(INFO, logCat, m -> "- processing item " + m)
                .transform(m -> {
                    if (false) {
                        // NOTE: this exception here is passed to errorChannel:
                        throw new IllegalArgumentException("failed");
                    }
                    final CountDownLatch latch = new CountDownLatch(1);
                    try {
                        latch.await(35, TimeUnit.SECONDS); // Simulate a 35-second delay
                    } catch (Exception ignored) {
                    }
                    return List.of(m + "->OK");
                })
                .log(INFO, logCat, m -> "- done with item " + m);
    }

    @Bean
    public IntegrationFlow myErrorChannel() {
        String logCat = LoggingUtils.getLogCat(new Object() {
        });
        return IntegrationFlow.from("MY_ERROR_CHANNEL")
                .log(ERROR, logCat, m -> "Flow failed, because of this error: " + m.getPayload())
                // .custom-log/handle/etc.
                .get();
    }

    @Bean
    public ExecutorService myExecutorService() {
        return Executors.newFixedThreadPool(4, new ThreadFactoryBuilder()
                .setNameFormat("my-thread-%d")
                .build());
    }
}

@artembilan
Copy link
Member

artembilan commented Aug 18, 2023

You are right, my bad: the errorOnTimeout() is available on a MessagingGatewaySpec for all those inbound gateways.
The GatewayEndpointSpec doesn't have that.
Since this gateway() is technically similar to any other inbound gateway I think we have to fix it respectively and expose this errorOnTimeout() option.
Meanwhile try with requiresReply(true), so when the gateway time-outs, the reply is nullandAbstractReplyProducingMessageHandlerwill throw aReplyRequiredException`.

If OK with this, I'll go ahead and expose that errorOnTimeout() to be configured on a gateway() and @MessagingGateway.
Looks like we have some respective explanation in that annotation:

	/**
	 * Allows to specify how long this gateway will wait for the reply {@code Message}
	 * before returning. The {@code null} is returned if the gateway times out.
	 * Value is specified in milliseconds; it can be a simple long
	 * value or a SpEL expression; array variable #args is available.
	 * See {@link Gateway#replyTimeout()} for per-method configuration.
	 * @return the suggested timeout in milliseconds, if any
	 */
	String defaultReplyTimeout() default IntegrationContextUtils.DEFAULT_TIMEOUT_STRING;

But I believe this errorOnTimeout would be a great addition to throw a MessageTimeoutException instead of null if that is a requirement for target application.

@artembilan artembilan added in: core and removed status: waiting-for-reporter Needs a feedback from the reporter labels Aug 18, 2023
@artembilan artembilan added this to the 6.2.0-M2 milestone Aug 18, 2023
artembilan added a commit to artembilan/spring-integration that referenced this issue Aug 18, 2023
Fixes spring-projects#8705

an internal `MethodInvocationGateway` is a `MessagingGatewaySupport`
extension with all the logic available there.
One of the option introduced in `5.2.2` to be able to throw a `MessageTimeoutException`
instead of returning `null` when no reply received in time from downstream flow

* Expose an `errorOnTimeout` on the `@MessagingGateway` and `GatewayEndpointSpec`
* Propagate this option from a `GatewayProxyFactoryBean` down to its internal
`MethodInvocationGateway` implementation
* Modify couple tests to react for `errorOnTimeout` set to `true`
* Document the feature
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants