Skip to content

Commit

Permalink
spring-projectsGH-8705 Expose errorOnTimeout on MessagingGateway
Browse files Browse the repository at this point in the history
Fixes spring-projects#8705

an internal `MethodInvocationGateway` is a `MessagingGatewaySupport`
extension with all the logic available there.
One of the option introduced in `5.2.2` to be able to throw a `MessageTimeoutException`
instead of returning `null` when no reply received in time from downstream flow

* Expose an `errorOnTimeout` on the `@MessagingGateway` and `GatewayEndpointSpec`
* Propagate this option from a `GatewayProxyFactoryBean` down to its internal
`MethodInvocationGateway` implementation
* Modify couple tests to react for `errorOnTimeout` set to `true`
* Document the feature
  • Loading branch information
artembilan committed Aug 18, 2023
1 parent ef5db30 commit 6cd3f45
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.springframework.core.annotation.AliasFor;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.gateway.MessagingGatewaySupport;

/**
* A stereotype annotation to provide an Integration Messaging Gateway Proxy
Expand Down Expand Up @@ -165,4 +166,13 @@
*/
boolean proxyDefaultMethods() default false;

/**
* If errorOnTimeout is true, the null won't be returned as a result of a gateway method invocation.
* Instead, a {@link org.springframework.integration.MessageTimeoutException} is thrown
* or an error message is published to the error channel.
* @since 6.2
* @see MessagingGatewaySupport#setErrorOnTimeout(boolean)
*/
boolean errorOnTimeout() default false;

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -98,4 +98,16 @@ public GatewayEndpointSpec replyTimeout(Long replyTimeout) {
return this;
}

/**
* Set a error on timeout flag.
* @param errorOnTimeout true to produce an error in case on reply timeout.
* @return the spec
* @since 6.2
* @see org.springframework.integration.gateway.GatewayProxyFactoryBean#setErrorOnTimeout(boolean)
*/
public GatewayEndpointSpec errorOnTimeout(boolean errorOnTimeout) {
this.handler.setErrorOnTimeout(errorOnTimeout);
return this;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,6 +128,8 @@ protected void onInit() {

populateAsyncExecutorIfAny();

setErrorOnTimeout(this.gatewayAttributes.getBoolean("errorOnTimeout"));

boolean proxyDefaultMethods = this.gatewayAttributes.getBoolean("proxyDefaultMethods");
if (proxyDefaultMethods) { // Override only if annotation attribute is different
setProxyDefaultMethods(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public void setReplyTimeout(Long replyTimeout) {
this.gatewayProxyFactoryBean.setDefaultReplyTimeout(replyTimeout);
}

public void setErrorOnTimeout(boolean errorOnTimeout) {
this.gatewayProxyFactoryBean.setErrorOnTimeout(errorOnTimeout);
}

@Override
protected Object handleRequestMessage(Message<?> requestMessage) {
if (this.exchanger == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ public class GatewayProxyFactoryBean<T> extends AbstractEndpoint

private MetricsCaptor metricsCaptor;

private boolean errorOnTimeout;

/**
* Create a Factory whose service interface type can be configured by setter injection.
* If none is set, it will fall back to the default service interface type,
Expand Down Expand Up @@ -455,6 +457,18 @@ public void registerMetricsCaptor(MetricsCaptor metricsCaptorToRegister) {
this.gatewayMap.values().forEach(gw -> gw.registerMetricsCaptor(metricsCaptorToRegister));
}

/**
* If errorOnTimeout is true, the null won't be returned as a result of a gateway method invocation.
* Instead, a {@link org.springframework.integration.MessageTimeoutException} is thrown
* or an error message is published to the error channel.
* @param errorOnTimeout true to create the error message on reply timeout.
* @since 6.2
* @see MessagingGatewaySupport#setErrorOnTimeout(boolean)
*/
public void setErrorOnTimeout(boolean errorOnTimeout) {
this.errorOnTimeout = errorOnTimeout;
}

@Override
@SuppressWarnings("unchecked")
protected void onInit() {
Expand Down Expand Up @@ -881,6 +895,7 @@ private MethodInvocationGateway doCreateMethodInvocationGateway(Method method,
gateway.setBeanFactory(getBeanFactory());
gateway.setShouldTrack(this.shouldTrack);
gateway.registerMetricsCaptor(this.metricsCaptor);
gateway.setErrorOnTimeout(this.errorOnTimeout);
gateway.afterPropertiesSet();

return gateway;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.MessageRejectedException;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
Expand All @@ -43,6 +44,7 @@
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;
Expand Down Expand Up @@ -79,9 +81,11 @@ void testGatewayFlow() {
assertThat(receive.getPayload()).isEqualTo("From Gateway SubFlow: FOO");
assertThat(this.gatewayError.receive(1)).isNull();

message = MessageBuilder.withPayload("bar").setReplyChannel(replyChannel).build();
Message<String> otherMessage = MessageBuilder.withPayload("bar").setReplyChannel(replyChannel).build();

this.gatewayInput.send(message);
assertThatExceptionOfType(MessageHandlingException.class)
.isThrownBy(() -> this.gatewayInput.send(otherMessage))
.withCauseExactlyInstanceOf(MessageTimeoutException.class);

assertThat(replyChannel.receive(1)).isNull();

Expand Down Expand Up @@ -173,7 +177,8 @@ public static class ContextConfiguration {
@Bean
public IntegrationFlow gatewayFlow() {
return IntegrationFlow.from("gatewayInput")
.gateway("gatewayRequest", g -> g.errorChannel("gatewayError").replyTimeout(10L))
.gateway("gatewayRequest",
g -> g.errorChannel("gatewayError").replyTimeout(10L).errorOnTimeout(true))
.gateway((f) -> f.transform("From Gateway SubFlow: "::concat))
.get();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,14 +59,17 @@
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.annotation.AnnotationConstants;
import org.springframework.integration.annotation.BridgeTo;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.GatewayHeader;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationProperties;
Expand All @@ -93,6 +96,7 @@
import org.springframework.util.ClassUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -150,6 +154,9 @@ public class GatewayInterfaceTests {
@Autowired
private MessageChannel gatewayChannel;

@Autowired
private PollableChannel gatewayQueueChannel;

@Autowired
private MessageChannel errorChannel;

Expand Down Expand Up @@ -468,19 +475,19 @@ public void testAnnotationGatewayProxyFactoryBean() {
assertThat(TestUtils.getPropertyValue(this.annotationGatewayProxyFactoryBean,
"defaultRequestTimeout", Expression.class).getValue()).isEqualTo(1111L);
assertThat(TestUtils.getPropertyValue(this.annotationGatewayProxyFactoryBean,
"defaultReplyTimeout", Expression.class).getValue()).isEqualTo(222L);
"defaultReplyTimeout", Expression.class).getValue()).isEqualTo(0L);

Collection<MessagingGatewaySupport> messagingGateways =
this.annotationGatewayProxyFactoryBean.getGateways().values();
assertThat(messagingGateways.size()).isEqualTo(1);

MessagingGatewaySupport gateway = messagingGateways.iterator().next();
assertThat(gateway.getRequestChannel()).isSameAs(this.gatewayChannel);
assertThat(gateway.getRequestChannel()).isSameAs(this.gatewayQueueChannel);
assertThat(gateway.getReplyChannel()).isSameAs(this.gatewayChannel);
assertThat(gateway.getErrorChannel()).isSameAs(this.errorChannel);
Object requestMapper = TestUtils.getPropertyValue(gateway, "requestMapper");

assertThat(TestUtils.getPropertyValue(requestMapper, "payloadExpression.expression")).isEqualTo("@foo");
assertThat(TestUtils.getPropertyValue(requestMapper, "payloadExpression.expression")).isEqualTo("args[0]");

Map globalHeaderExpressions = TestUtils.getPropertyValue(requestMapper, "globalHeaderExpressions", Map.class);
assertThat(globalHeaderExpressions.size()).isEqualTo(1);
Expand All @@ -489,6 +496,9 @@ public void testAnnotationGatewayProxyFactoryBean() {
assertThat(barHeaderExpression).isNotNull();
assertThat(barHeaderExpression).isInstanceOf(LiteralExpression.class);
assertThat(((LiteralExpression) barHeaderExpression).getValue()).isEqualTo("baz");

assertThatExceptionOfType(MessageTimeoutException.class)
.isThrownBy(() -> this.gatewayByAnnotationGPFB.foo("test"));
}

@Test
Expand Down Expand Up @@ -667,6 +677,12 @@ public MessageChannel gatewayChannel() {
return new DirectChannel();
}

@Bean
@BridgeTo(poller = @Poller(fixedDelay = "1000"))
public MessageChannel gatewayQueueChannel() {
return new QueueChannel();
}

@Bean
@BridgeTo
public MessageChannel gatewayThreadChannel() {
Expand Down Expand Up @@ -802,13 +818,14 @@ public interface IgnoredHeaderGateway {
}

@MessagingGateway(
defaultRequestChannel = "${gateway.channel:gatewayChannel}",
defaultRequestChannel = "${gateway.channel:gatewayQueueChannel}",
defaultReplyChannel = "${gateway.channel:gatewayChannel}",
defaultPayloadExpression = "${gateway.payload:@foo}",
defaultPayloadExpression = "${gateway.payload:args[0]}",
errorChannel = "${gateway.channel:errorChannel}",
asyncExecutor = "${gateway.executor:exec}",
defaultRequestTimeout = "${gateway.timeout:1111}",
defaultReplyTimeout = "${gateway.timeout:222}",
defaultReplyTimeout = "${gateway.timeout:0}",
errorOnTimeout = true,
defaultHeaders = {
@GatewayHeader(name = "${gateway.header.name:bar}",
value = "${gateway.header.value:baz}")
Expand Down
6 changes: 6 additions & 0 deletions src/reference/antora/modules/ROOT/pages/gateway.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,8 @@ If a component downstream is still running (perhaps because of an infinite loop
However, if the timeout has been reached before the actual reply was produced, it could result in a 'null' return from the gateway method.
You should understand that the reply message (if produced) is sent to a reply channel after the gateway method invocation might have returned, so you must be aware of that and design your flow with it in mind.

Also see an `errorOnTimeout` option to throw a `MessageTimeoutException` instead of returning `null`.

[[downstream-component-returns-null-]]
=== Downstream Component Returns 'null'

Expand Down Expand Up @@ -838,4 +840,8 @@ At that time, the calling thread starts waiting for the reply.
If the flow was completely synchronous, the reply is immediately available.
For asynchronous flows, the thread waits for up to this time.

Starting with version 6.2, the `errorOnTimeout` option of an internal `MethodInvocationGateway` extension of the `MessagingGatewaySupport` is exposed to the `@MessagingGateway` and `GatewayEndpointSpec`.
This option has exactly the same meaning as for any inbound gateway explained in the end of xref:endpoint-summary.adoc#endpoint-summary[Endpoint Summary] chapter.
In other words, setting this option to `true`, would lead to the `MessageTimeoutException` being thrown from a send-and-receive gateway operation instead of returning `null` when receive timeout is exhausted.

See xref:dsl/integration-flow-as-gateway.adoc[`IntegrationFlow` as Gateway] in the Java DSL chapter for options to define gateways through `IntegrationFlow`.
3 changes: 3 additions & 0 deletions src/reference/antora/modules/ROOT/pages/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ See xref:endpoint.adoc#endpoint-pollingconsumer[Polling Consumer] for more infor
- Java, Groovy and Kotlin DSLs have now context-specific methods in the `IntegrationFlowDefinition` with a single `Consumer` argument to configure an endpoint and its handler with one builder and readable options.
See, for example, `transformWith()`, `splitWith()` in xref:dsl.adoc#java-dsl[ Java DSL Chapter].

- The `@MessagingGateway` and `GatewayEndpointSpec` for Java DSL expose now an `errorOnTimeout` option of an internal `MethodInvocationGateway` extension of the `MessagingGatewaySupport`.
See xref:gateway.adoc#gateway-no-response[ Gateway Behavior When No response Arrives] for more information.

[[x6.2-websockets]]
=== WebSockets Changes

Expand Down

0 comments on commit 6cd3f45

Please sign in to comment.