Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-8705 Expose errorOnTimeout on MessagingGateway #8712

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.springframework.core.annotation.AliasFor;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.gateway.MessagingGatewaySupport;

/**
* A stereotype annotation to provide an Integration Messaging Gateway Proxy
Expand Down Expand Up @@ -165,4 +166,13 @@
*/
boolean proxyDefaultMethods() default false;

/**
* If errorOnTimeout is true, null won't be returned as a result of a gateway method invocation when a timeout occurs.
* Instead, a {@link org.springframework.integration.MessageTimeoutException} is thrown
* or an error message is published to the error channel.
* @since 6.2
* @see MessagingGatewaySupport#setErrorOnTimeout(boolean)
*/
boolean errorOnTimeout() default false;

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -98,4 +98,16 @@ public GatewayEndpointSpec replyTimeout(Long replyTimeout) {
return this;
}

/**
* Set a error on timeout flag.
* @param errorOnTimeout true to produce an error in case of a reply timeout.
* @return the spec.
* @since 6.2
* @see org.springframework.integration.gateway.GatewayProxyFactoryBean#setErrorOnTimeout(boolean)
*/
public GatewayEndpointSpec errorOnTimeout(boolean errorOnTimeout) {
this.handler.setErrorOnTimeout(errorOnTimeout);
return this;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,6 +128,8 @@ protected void onInit() {

populateAsyncExecutorIfAny();

setErrorOnTimeout(this.gatewayAttributes.getBoolean("errorOnTimeout"));

boolean proxyDefaultMethods = this.gatewayAttributes.getBoolean("proxyDefaultMethods");
if (proxyDefaultMethods) { // Override only if annotation attribute is different
setProxyDefaultMethods(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public void setReplyTimeout(Long replyTimeout) {
this.gatewayProxyFactoryBean.setDefaultReplyTimeout(replyTimeout);
}

public void setErrorOnTimeout(boolean errorOnTimeout) {
this.gatewayProxyFactoryBean.setErrorOnTimeout(errorOnTimeout);
}

@Override
protected Object handleRequestMessage(Message<?> requestMessage) {
if (this.exchanger == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ public class GatewayProxyFactoryBean<T> extends AbstractEndpoint

private MetricsCaptor metricsCaptor;

private boolean errorOnTimeout;

/**
* Create a Factory whose service interface type can be configured by setter injection.
* If none is set, it will fall back to the default service interface type,
Expand Down Expand Up @@ -455,6 +457,18 @@ public void registerMetricsCaptor(MetricsCaptor metricsCaptorToRegister) {
this.gatewayMap.values().forEach(gw -> gw.registerMetricsCaptor(metricsCaptorToRegister));
}

/**
* If errorOnTimeout is true, null won't be returned as a result of a gateway method invocation, when a timeout occurs.
* Instead, a {@link org.springframework.integration.MessageTimeoutException} is thrown
* or an error message is published to the error channel.
* @param errorOnTimeout true to create the error message on reply timeout.
* @since 6.2
* @see MessagingGatewaySupport#setErrorOnTimeout(boolean)
*/
public void setErrorOnTimeout(boolean errorOnTimeout) {
this.errorOnTimeout = errorOnTimeout;
}

@Override
@SuppressWarnings("unchecked")
protected void onInit() {
Expand Down Expand Up @@ -881,6 +895,7 @@ private MethodInvocationGateway doCreateMethodInvocationGateway(Method method,
gateway.setBeanFactory(getBeanFactory());
gateway.setShouldTrack(this.shouldTrack);
gateway.registerMetricsCaptor(this.metricsCaptor);
gateway.setErrorOnTimeout(this.errorOnTimeout);
gateway.afterPropertiesSet();

return gateway;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.MessageRejectedException;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
Expand All @@ -43,6 +44,7 @@
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;
Expand Down Expand Up @@ -79,9 +81,11 @@ void testGatewayFlow() {
assertThat(receive.getPayload()).isEqualTo("From Gateway SubFlow: FOO");
assertThat(this.gatewayError.receive(1)).isNull();

message = MessageBuilder.withPayload("bar").setReplyChannel(replyChannel).build();
Message<String> otherMessage = MessageBuilder.withPayload("bar").setReplyChannel(replyChannel).build();

this.gatewayInput.send(message);
assertThatExceptionOfType(MessageHandlingException.class)
.isThrownBy(() -> this.gatewayInput.send(otherMessage))
.withCauseExactlyInstanceOf(MessageTimeoutException.class);

assertThat(replyChannel.receive(1)).isNull();

Expand Down Expand Up @@ -173,7 +177,8 @@ public static class ContextConfiguration {
@Bean
public IntegrationFlow gatewayFlow() {
return IntegrationFlow.from("gatewayInput")
.gateway("gatewayRequest", g -> g.errorChannel("gatewayError").replyTimeout(10L))
.gateway("gatewayRequest",
g -> g.errorChannel("gatewayError").replyTimeout(10L).errorOnTimeout(true))
.gateway((f) -> f.transform("From Gateway SubFlow: "::concat))
.get();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,14 +59,17 @@
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.annotation.AnnotationConstants;
import org.springframework.integration.annotation.BridgeTo;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.GatewayHeader;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationProperties;
Expand All @@ -93,6 +96,7 @@
import org.springframework.util.ClassUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -150,6 +154,9 @@ public class GatewayInterfaceTests {
@Autowired
private MessageChannel gatewayChannel;

@Autowired
private PollableChannel gatewayQueueChannel;

@Autowired
private MessageChannel errorChannel;

Expand Down Expand Up @@ -468,19 +475,19 @@ public void testAnnotationGatewayProxyFactoryBean() {
assertThat(TestUtils.getPropertyValue(this.annotationGatewayProxyFactoryBean,
"defaultRequestTimeout", Expression.class).getValue()).isEqualTo(1111L);
assertThat(TestUtils.getPropertyValue(this.annotationGatewayProxyFactoryBean,
"defaultReplyTimeout", Expression.class).getValue()).isEqualTo(222L);
"defaultReplyTimeout", Expression.class).getValue()).isEqualTo(0L);

Collection<MessagingGatewaySupport> messagingGateways =
this.annotationGatewayProxyFactoryBean.getGateways().values();
assertThat(messagingGateways.size()).isEqualTo(1);

MessagingGatewaySupport gateway = messagingGateways.iterator().next();
assertThat(gateway.getRequestChannel()).isSameAs(this.gatewayChannel);
assertThat(gateway.getRequestChannel()).isSameAs(this.gatewayQueueChannel);
assertThat(gateway.getReplyChannel()).isSameAs(this.gatewayChannel);
assertThat(gateway.getErrorChannel()).isSameAs(this.errorChannel);
Object requestMapper = TestUtils.getPropertyValue(gateway, "requestMapper");

assertThat(TestUtils.getPropertyValue(requestMapper, "payloadExpression.expression")).isEqualTo("@foo");
assertThat(TestUtils.getPropertyValue(requestMapper, "payloadExpression.expression")).isEqualTo("args[0]");

Map globalHeaderExpressions = TestUtils.getPropertyValue(requestMapper, "globalHeaderExpressions", Map.class);
assertThat(globalHeaderExpressions.size()).isEqualTo(1);
Expand All @@ -489,6 +496,9 @@ public void testAnnotationGatewayProxyFactoryBean() {
assertThat(barHeaderExpression).isNotNull();
assertThat(barHeaderExpression).isInstanceOf(LiteralExpression.class);
assertThat(((LiteralExpression) barHeaderExpression).getValue()).isEqualTo("baz");

assertThatExceptionOfType(MessageTimeoutException.class)
.isThrownBy(() -> this.gatewayByAnnotationGPFB.foo("test"));
}

@Test
Expand Down Expand Up @@ -667,6 +677,12 @@ public MessageChannel gatewayChannel() {
return new DirectChannel();
}

@Bean
@BridgeTo(poller = @Poller(fixedDelay = "1000"))
public MessageChannel gatewayQueueChannel() {
return new QueueChannel();
}

@Bean
@BridgeTo
public MessageChannel gatewayThreadChannel() {
Expand Down Expand Up @@ -802,13 +818,14 @@ public interface IgnoredHeaderGateway {
}

@MessagingGateway(
defaultRequestChannel = "${gateway.channel:gatewayChannel}",
defaultRequestChannel = "${gateway.channel:gatewayQueueChannel}",
defaultReplyChannel = "${gateway.channel:gatewayChannel}",
defaultPayloadExpression = "${gateway.payload:@foo}",
defaultPayloadExpression = "${gateway.payload:args[0]}",
errorChannel = "${gateway.channel:errorChannel}",
asyncExecutor = "${gateway.executor:exec}",
defaultRequestTimeout = "${gateway.timeout:1111}",
defaultReplyTimeout = "${gateway.timeout:222}",
defaultReplyTimeout = "${gateway.timeout:0}",
errorOnTimeout = true,
defaultHeaders = {
@GatewayHeader(name = "${gateway.header.name:bar}",
value = "${gateway.header.value:baz}")
Expand Down
6 changes: 6 additions & 0 deletions src/reference/antora/modules/ROOT/pages/gateway.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,8 @@ If a component downstream is still running (perhaps because of an infinite loop
However, if the timeout has been reached before the actual reply was produced, it could result in a 'null' return from the gateway method.
You should understand that the reply message (if produced) is sent to a reply channel after the gateway method invocation might have returned, so you must be aware of that and design your flow with it in mind.

Also see the `errorOnTimeout` property to throw a `MessageTimeoutException` instead of returning `null`, when a timeout occurs.

[[downstream-component-returns-null-]]
=== Downstream Component Returns 'null'

Expand Down Expand Up @@ -838,4 +840,8 @@ At that time, the calling thread starts waiting for the reply.
If the flow was completely synchronous, the reply is immediately available.
For asynchronous flows, the thread waits for up to this time.

Starting with version 6.2, the `errorOnTimeout` property of the internal `MethodInvocationGateway` extension of the `MessagingGatewaySupport` is exposed on the `@MessagingGateway` and `GatewayEndpointSpec`.
This option has exactly the same meaning as for any inbound gateway explained in the end of xref:endpoint-summary.adoc#endpoint-summary[Endpoint Summary] chapter.
In other words, setting this option to `true`, would lead to the `MessageTimeoutException` being thrown from a send-and-receive gateway operation instead of returning `null` when the receive timeout is exhausted.

See xref:dsl/integration-flow-as-gateway.adoc[`IntegrationFlow` as Gateway] in the Java DSL chapter for options to define gateways through `IntegrationFlow`.
3 changes: 3 additions & 0 deletions src/reference/antora/modules/ROOT/pages/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ See xref:endpoint.adoc#endpoint-pollingconsumer[Polling Consumer] for more infor
- Java, Groovy and Kotlin DSLs have now context-specific methods in the `IntegrationFlowDefinition` with a single `Consumer` argument to configure an endpoint and its handler with one builder and readable options.
See, for example, `transformWith()`, `splitWith()` in xref:dsl.adoc#java-dsl[ Java DSL Chapter].

- The `@MessagingGateway` and `GatewayEndpointSpec` provided by the Java DSL now expose the `errorOnTimeout` property of the internal `MethodInvocationGateway` extension of the `MessagingGatewaySupport`.
See xref:gateway.adoc#gateway-no-response[ Gateway Behavior When No response Arrives] for more information.

[[x6.2-websockets]]
=== WebSockets Changes

Expand Down