Skip to content

Commit

Permalink
GH-8705 Expose errorOnTimeout on MessagingGateway
Browse files Browse the repository at this point in the history
Fixes #8705

an internal `MethodInvocationGateway` is a `MessagingGatewaySupport`
extension with all the logic available there.
One of the option introduced in `5.2.2` to be able to throw a `MessageTimeoutException`
instead of returning `null` when no reply received in time from downstream flow

* Expose an `errorOnTimeout` on the `@MessagingGateway` and `GatewayEndpointSpec`
* Propagate this option from a `GatewayProxyFactoryBean` down to its internal
`MethodInvocationGateway` implementation
* Modify couple tests to react for `errorOnTimeout` set to `true`
* Document the feature

Fix language in Docs

Co-authored-by: Gary Russell <grussell@vmware.com>
  • Loading branch information
artembilan and garyrussell committed Aug 22, 2023
1 parent d85c5e3 commit 4e310bb
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.springframework.core.annotation.AliasFor;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.gateway.MessagingGatewaySupport;

/**
* A stereotype annotation to provide an Integration Messaging Gateway Proxy
Expand Down Expand Up @@ -165,4 +166,13 @@
*/
boolean proxyDefaultMethods() default false;

/**
* If errorOnTimeout is true, null won't be returned as a result of a gateway method invocation when a timeout occurs.
* Instead, a {@link org.springframework.integration.MessageTimeoutException} is thrown
* or an error message is published to the error channel.
* @since 6.2
* @see MessagingGatewaySupport#setErrorOnTimeout(boolean)
*/
boolean errorOnTimeout() default false;

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -98,4 +98,16 @@ public GatewayEndpointSpec replyTimeout(Long replyTimeout) {
return this;
}

/**
* Set a error on timeout flag.
* @param errorOnTimeout true to produce an error in case of a reply timeout.
* @return the spec.
* @since 6.2
* @see org.springframework.integration.gateway.GatewayProxyFactoryBean#setErrorOnTimeout(boolean)
*/
public GatewayEndpointSpec errorOnTimeout(boolean errorOnTimeout) {
this.handler.setErrorOnTimeout(errorOnTimeout);
return this;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,6 +128,8 @@ protected void onInit() {

populateAsyncExecutorIfAny();

setErrorOnTimeout(this.gatewayAttributes.getBoolean("errorOnTimeout"));

boolean proxyDefaultMethods = this.gatewayAttributes.getBoolean("proxyDefaultMethods");
if (proxyDefaultMethods) { // Override only if annotation attribute is different
setProxyDefaultMethods(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public void setReplyTimeout(Long replyTimeout) {
this.gatewayProxyFactoryBean.setDefaultReplyTimeout(replyTimeout);
}

public void setErrorOnTimeout(boolean errorOnTimeout) {
this.gatewayProxyFactoryBean.setErrorOnTimeout(errorOnTimeout);
}

@Override
protected Object handleRequestMessage(Message<?> requestMessage) {
if (this.exchanger == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ public class GatewayProxyFactoryBean<T> extends AbstractEndpoint

private MetricsCaptor metricsCaptor;

private boolean errorOnTimeout;

/**
* Create a Factory whose service interface type can be configured by setter injection.
* If none is set, it will fall back to the default service interface type,
Expand Down Expand Up @@ -455,6 +457,18 @@ public void registerMetricsCaptor(MetricsCaptor metricsCaptorToRegister) {
this.gatewayMap.values().forEach(gw -> gw.registerMetricsCaptor(metricsCaptorToRegister));
}

/**
* If errorOnTimeout is true, null won't be returned as a result of a gateway method invocation, when a timeout occurs.
* Instead, a {@link org.springframework.integration.MessageTimeoutException} is thrown
* or an error message is published to the error channel.
* @param errorOnTimeout true to create the error message on reply timeout.
* @since 6.2
* @see MessagingGatewaySupport#setErrorOnTimeout(boolean)
*/
public void setErrorOnTimeout(boolean errorOnTimeout) {
this.errorOnTimeout = errorOnTimeout;
}

@Override
@SuppressWarnings("unchecked")
protected void onInit() {
Expand Down Expand Up @@ -881,6 +895,7 @@ private MethodInvocationGateway doCreateMethodInvocationGateway(Method method,
gateway.setBeanFactory(getBeanFactory());
gateway.setShouldTrack(this.shouldTrack);
gateway.registerMetricsCaptor(this.metricsCaptor);
gateway.setErrorOnTimeout(this.errorOnTimeout);
gateway.afterPropertiesSet();

return gateway;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.MessageRejectedException;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
Expand All @@ -43,6 +44,7 @@
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;
Expand Down Expand Up @@ -79,9 +81,11 @@ void testGatewayFlow() {
assertThat(receive.getPayload()).isEqualTo("From Gateway SubFlow: FOO");
assertThat(this.gatewayError.receive(1)).isNull();

message = MessageBuilder.withPayload("bar").setReplyChannel(replyChannel).build();
Message<String> otherMessage = MessageBuilder.withPayload("bar").setReplyChannel(replyChannel).build();

this.gatewayInput.send(message);
assertThatExceptionOfType(MessageHandlingException.class)
.isThrownBy(() -> this.gatewayInput.send(otherMessage))
.withCauseExactlyInstanceOf(MessageTimeoutException.class);

assertThat(replyChannel.receive(1)).isNull();

Expand Down Expand Up @@ -173,7 +177,8 @@ public static class ContextConfiguration {
@Bean
public IntegrationFlow gatewayFlow() {
return IntegrationFlow.from("gatewayInput")
.gateway("gatewayRequest", g -> g.errorChannel("gatewayError").replyTimeout(10L))
.gateway("gatewayRequest",
g -> g.errorChannel("gatewayError").replyTimeout(10L).errorOnTimeout(true))
.gateway((f) -> f.transform("From Gateway SubFlow: "::concat))
.get();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,14 +59,17 @@
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.annotation.AnnotationConstants;
import org.springframework.integration.annotation.BridgeTo;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.GatewayHeader;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationProperties;
Expand All @@ -93,6 +96,7 @@
import org.springframework.util.ClassUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -150,6 +154,9 @@ public class GatewayInterfaceTests {
@Autowired
private MessageChannel gatewayChannel;

@Autowired
private PollableChannel gatewayQueueChannel;

@Autowired
private MessageChannel errorChannel;

Expand Down Expand Up @@ -468,19 +475,19 @@ public void testAnnotationGatewayProxyFactoryBean() {
assertThat(TestUtils.getPropertyValue(this.annotationGatewayProxyFactoryBean,
"defaultRequestTimeout", Expression.class).getValue()).isEqualTo(1111L);
assertThat(TestUtils.getPropertyValue(this.annotationGatewayProxyFactoryBean,
"defaultReplyTimeout", Expression.class).getValue()).isEqualTo(222L);
"defaultReplyTimeout", Expression.class).getValue()).isEqualTo(0L);

Collection<MessagingGatewaySupport> messagingGateways =
this.annotationGatewayProxyFactoryBean.getGateways().values();
assertThat(messagingGateways.size()).isEqualTo(1);

MessagingGatewaySupport gateway = messagingGateways.iterator().next();
assertThat(gateway.getRequestChannel()).isSameAs(this.gatewayChannel);
assertThat(gateway.getRequestChannel()).isSameAs(this.gatewayQueueChannel);
assertThat(gateway.getReplyChannel()).isSameAs(this.gatewayChannel);
assertThat(gateway.getErrorChannel()).isSameAs(this.errorChannel);
Object requestMapper = TestUtils.getPropertyValue(gateway, "requestMapper");

assertThat(TestUtils.getPropertyValue(requestMapper, "payloadExpression.expression")).isEqualTo("@foo");
assertThat(TestUtils.getPropertyValue(requestMapper, "payloadExpression.expression")).isEqualTo("args[0]");

Map globalHeaderExpressions = TestUtils.getPropertyValue(requestMapper, "globalHeaderExpressions", Map.class);
assertThat(globalHeaderExpressions.size()).isEqualTo(1);
Expand All @@ -489,6 +496,9 @@ public void testAnnotationGatewayProxyFactoryBean() {
assertThat(barHeaderExpression).isNotNull();
assertThat(barHeaderExpression).isInstanceOf(LiteralExpression.class);
assertThat(((LiteralExpression) barHeaderExpression).getValue()).isEqualTo("baz");

assertThatExceptionOfType(MessageTimeoutException.class)
.isThrownBy(() -> this.gatewayByAnnotationGPFB.foo("test"));
}

@Test
Expand Down Expand Up @@ -667,6 +677,12 @@ public MessageChannel gatewayChannel() {
return new DirectChannel();
}

@Bean
@BridgeTo(poller = @Poller(fixedDelay = "1000"))
public MessageChannel gatewayQueueChannel() {
return new QueueChannel();
}

@Bean
@BridgeTo
public MessageChannel gatewayThreadChannel() {
Expand Down Expand Up @@ -802,13 +818,14 @@ public interface IgnoredHeaderGateway {
}

@MessagingGateway(
defaultRequestChannel = "${gateway.channel:gatewayChannel}",
defaultRequestChannel = "${gateway.channel:gatewayQueueChannel}",
defaultReplyChannel = "${gateway.channel:gatewayChannel}",
defaultPayloadExpression = "${gateway.payload:@foo}",
defaultPayloadExpression = "${gateway.payload:args[0]}",
errorChannel = "${gateway.channel:errorChannel}",
asyncExecutor = "${gateway.executor:exec}",
defaultRequestTimeout = "${gateway.timeout:1111}",
defaultReplyTimeout = "${gateway.timeout:222}",
defaultReplyTimeout = "${gateway.timeout:0}",
errorOnTimeout = true,
defaultHeaders = {
@GatewayHeader(name = "${gateway.header.name:bar}",
value = "${gateway.header.value:baz}")
Expand Down
6 changes: 6 additions & 0 deletions src/reference/antora/modules/ROOT/pages/gateway.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,8 @@ If a component downstream is still running (perhaps because of an infinite loop
However, if the timeout has been reached before the actual reply was produced, it could result in a 'null' return from the gateway method.
You should understand that the reply message (if produced) is sent to a reply channel after the gateway method invocation might have returned, so you must be aware of that and design your flow with it in mind.

Also see the `errorOnTimeout` property to throw a `MessageTimeoutException` instead of returning `null`, when a timeout occurs.

[[downstream-component-returns-null-]]
=== Downstream Component Returns 'null'

Expand Down Expand Up @@ -838,4 +840,8 @@ At that time, the calling thread starts waiting for the reply.
If the flow was completely synchronous, the reply is immediately available.
For asynchronous flows, the thread waits for up to this time.

Starting with version 6.2, the `errorOnTimeout` property of the internal `MethodInvocationGateway` extension of the `MessagingGatewaySupport` is exposed on the `@MessagingGateway` and `GatewayEndpointSpec`.
This option has exactly the same meaning as for any inbound gateway explained in the end of xref:endpoint-summary.adoc#endpoint-summary[Endpoint Summary] chapter.
In other words, setting this option to `true`, would lead to the `MessageTimeoutException` being thrown from a send-and-receive gateway operation instead of returning `null` when the receive timeout is exhausted.

See xref:dsl/integration-flow-as-gateway.adoc[`IntegrationFlow` as Gateway] in the Java DSL chapter for options to define gateways through `IntegrationFlow`.
2 changes: 2 additions & 0 deletions src/reference/antora/modules/ROOT/pages/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ See, for example, `transformWith()`, `splitWith()` in xref:dsl.adoc#java-dsl[ Ja
- A new `spring.integration.endpoints.defaultTimeout` global property has been introduced to override the default 30 seconds timeout for all the endpoints in the application.
See xref:configuration/global-properties.adoc[Global Properties] for more information.

- The `@MessagingGateway` and `GatewayEndpointSpec` provided by the Java DSL now expose the `errorOnTimeout` property of the internal `MethodInvocationGateway` extension of the `MessagingGatewaySupport`.
See xref:gateway.adoc#gateway-no-response[ Gateway Behavior When No response Arrives] for more information.
[[x6.2-websockets]]
=== WebSockets Changes

Expand Down

0 comments on commit 4e310bb

Please sign in to comment.