From 4fdbdf180eb8428f34389dee0fcc60c8389977b1 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 29 Mar 2023 10:15:47 -0400 Subject: [PATCH] GH-8577: Revise `ImapIdleChannelAdapter` logic (#8588) * GH-8577: Revise `ImapIdleChannelAdapter` logic Fixes https://github.com/spring-projects/spring-integration/issues/8577 When we process mail messages in async manner, it is possible that we end up in a race condition situation where the next idle cycle closes the folder. It is possible to reopen the folder, but feels better to block the current idle cycle until we are done with the message and therefore keep folder opened. * Deprecate `ImapIdleChannelAdapter.sendingTaskExecutor` in favor of an `ExecutorChannel` as an output for this channel adapter or similar async hand-off downstream. * Make use of `shouldReconnectAutomatically` as it is advertised for this channel adapter * Optimize the proxy creation for message sending task * * Remove `ImapIdleChannelAdapter.sendingTaskExecutor` * Fix language in docs Co-authored-by: Gary Russell --------- Co-authored-by: Gary Russell --- .../mail/ImapIdleChannelAdapter.java | 162 ++++++++++-------- .../config/ImapIdleChannelAdapterParser.java | 4 +- .../mail/dsl/ImapIdleChannelAdapterSpec.java | 14 +- .../mail/config/spring-integration-mail.xsd | 20 +-- .../mail/ImapMailReceiverTests.java | 4 +- ...pIdleChannelAdapterParserTests-context.xml | 3 +- .../ImapIdleChannelAdapterParserTests.java | 3 +- src/reference/asciidoc/mail.adoc | 14 +- src/reference/asciidoc/whats-new.adoc | 6 + 9 files changed, 112 insertions(+), 118 deletions(-) diff --git a/spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java b/spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java index b9ad5cc4f1e..fbe3f6e9353 100755 --- a/spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java +++ b/spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,14 +19,12 @@ import java.io.Serial; import java.time.Instant; import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import jakarta.mail.Folder; import jakarta.mail.Message; -import jakarta.mail.MessagingException; import org.aopalliance.aop.Advice; import org.springframework.aop.framework.ProxyFactory; @@ -38,6 +36,7 @@ import org.springframework.integration.transaction.IntegrationResourceHolder; import org.springframework.integration.transaction.IntegrationResourceHolderSynchronization; import org.springframework.integration.transaction.TransactionSynchronizationFactory; +import org.springframework.messaging.MessagingException; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.TriggerContext; @@ -78,12 +77,10 @@ public class ImapIdleChannelAdapter extends MessageProducerSupport implements Be private boolean shouldReconnectAutomatically = true; - private Executor sendingTaskExecutor = Executors.newFixedThreadPool(1); - - private boolean sendingTaskExecutorSet; - private List adviceChain; + private Consumer messageSender; + private long reconnectDelay = DEFAULT_RECONNECT_DELAY; // milliseconds private volatile ScheduledFuture receivingTask; @@ -103,21 +100,10 @@ public void setAdviceChain(List adviceChain) { this.adviceChain = adviceChain; } - /** - * Specify an {@link Executor} used to send messages received by the - * adapter. - * @param sendingTaskExecutor the sendingTaskExecutor to set - */ - public void setSendingTaskExecutor(Executor sendingTaskExecutor) { - Assert.notNull(sendingTaskExecutor, "'sendingTaskExecutor' must not be null"); - this.sendingTaskExecutor = sendingTaskExecutor; - this.sendingTaskExecutorSet = true; - } - /** * Specify whether the IDLE task should reconnect automatically after - * catching a {@link jakarta.mail.FolderClosedException} while waiting for messages. The - * default value is true. + * catching a {@link jakarta.mail.MessagingException} while waiting for messages. The + * default value is true. * @param shouldReconnectAutomatically true to reconnect. */ public void setShouldReconnectAutomatically(boolean shouldReconnectAutomatically) { @@ -148,6 +134,26 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv this.applicationEventPublisher = applicationEventPublisher; } + @Override + @SuppressWarnings("unchecked") + protected void onInit() { + super.onInit(); + + Consumer messageSenderToUse = new MessageSender(); + + if (!CollectionUtils.isEmpty(this.adviceChain)) { + ProxyFactory proxyFactory = new ProxyFactory(messageSenderToUse); + this.adviceChain.forEach(proxyFactory::addAdvice); + for (Advice advice : this.adviceChain) { + proxyFactory.addAdvice(advice); + } + messageSenderToUse = (Consumer) proxyFactory.getProxy(this.classLoader); + } + + this.messageSender = (Consumer) messageSenderToUse; + } + + /* * Lifecycle implementation */ @@ -162,7 +168,10 @@ protected void doStart() { @Override // guarded by super#lifecycleLock protected void doStop() { - this.receivingTask.cancel(true); + if (this.receivingTask != null) { + this.receivingTask.cancel(true); + this.receivingTask = null; + } this.mailReceiver.cancelPing(); } @@ -170,69 +179,53 @@ protected void doStop() { public void destroy() { super.destroy(); this.mailReceiver.destroy(); - // If we're running with the default executor, shut it down. - if (!this.sendingTaskExecutorSet && this.sendingTaskExecutor != null) { - ((ExecutorService) this.sendingTaskExecutor).shutdown(); + } + + private void publishException(Exception ex) { + if (this.applicationEventPublisher != null) { + this.applicationEventPublisher.publishEvent(new ImapIdleExceptionEvent(ex)); + } + else { + logger.debug(() -> "No application event publisher for exception: " + ex.getMessage()); } } - private Runnable createMessageSendingTask(Object mailMessage) { - Runnable sendingTask = prepareSendingTask(mailMessage); + private class MessageSender implements Consumer { - // wrap in the TX proxy if necessary - if (!CollectionUtils.isEmpty(this.adviceChain)) { - ProxyFactory proxyFactory = new ProxyFactory(sendingTask); - if (!CollectionUtils.isEmpty(this.adviceChain)) { - for (Advice advice : this.adviceChain) { - proxyFactory.addAdvice(advice); - } - } - sendingTask = (Runnable) proxyFactory.getProxy(this.classLoader); + MessageSender() { } - return sendingTask; - } - private Runnable prepareSendingTask(Object mailMessage) { - return () -> { - @SuppressWarnings("unchecked") - org.springframework.messaging.Message message = + @Override + public void accept(Object mailMessage) { + org.springframework.messaging.Message messageToSend = mailMessage instanceof Message ? getMessageBuilderFactory().withPayload(mailMessage).build() - : (org.springframework.messaging.Message) mailMessage; + : (org.springframework.messaging.Message) mailMessage; if (TransactionSynchronizationManager.isActualTransactionActive() - && this.transactionSynchronizationFactory != null) { + && ImapIdleChannelAdapter.this.transactionSynchronizationFactory != null) { - TransactionSynchronization synchronization = this.transactionSynchronizationFactory.create(this); + TransactionSynchronization synchronization = + ImapIdleChannelAdapter.this.transactionSynchronizationFactory.create(this); if (synchronization != null) { TransactionSynchronizationManager.registerSynchronization(synchronization); - if (synchronization instanceof IntegrationResourceHolderSynchronization + if (synchronization instanceof IntegrationResourceHolderSynchronization integrationSync && !TransactionSynchronizationManager.hasResource(this)) { - TransactionSynchronizationManager.bindResource(this, - ((IntegrationResourceHolderSynchronization) synchronization).getResourceHolder()); + TransactionSynchronizationManager.bindResource(this, integrationSync.getResourceHolder()); } Object resourceHolder = TransactionSynchronizationManager.getResource(this); - if (resourceHolder instanceof IntegrationResourceHolder) { - ((IntegrationResourceHolder) resourceHolder).setMessage(message); + if (resourceHolder instanceof IntegrationResourceHolder integrationResourceHolder) { + integrationResourceHolder.setMessage(messageToSend); } } } - sendMessage(message); - }; - } - - private void publishException(Exception ex) { - if (this.applicationEventPublisher != null) { - this.applicationEventPublisher.publishEvent(new ImapIdleExceptionEvent(ex)); + sendMessage(messageToSend); } - else { - logger.debug(() -> "No application event publisher for exception: " + ex.getMessage()); - } - } + } private class ReceivingTask implements Runnable { @@ -246,10 +239,23 @@ public void run() { ImapIdleChannelAdapter.this.idleTask.run(); logger.debug("Task completed successfully. Re-scheduling it again right away."); } - catch (Exception ex) { //run again after a delay - logger.warn(ex, () -> "Failed to execute IDLE task. Will attempt to resubmit in " - + ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds."); - ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution(); + catch (Exception ex) { + if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically + && ex.getCause() instanceof jakarta.mail.MessagingException messagingException) { + + //run again after a delay + logger.info(messagingException, + () -> "Failed to execute IDLE task. Will attempt to resubmit in " + + ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds."); + ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution(); + } + else { + logger.warn(ex, + "Failed to execute IDLE task. " + + "Won't resubmit since not a 'shouldReconnectAutomatically'" + + "or not a 'jakarta.mail.MessagingException'"); + ImapIdleChannelAdapter.this.receivingTaskTrigger.stop(); + } publishException(ex); } } @@ -274,21 +280,19 @@ public void run() { Object[] mailMessages = ImapIdleChannelAdapter.this.mailReceiver.receive(); logger.debug(() -> "received " + mailMessages.length + " mail messages"); for (Object mailMessage : mailMessages) { - Runnable messageSendingTask = createMessageSendingTask(mailMessage); if (isRunning()) { - ImapIdleChannelAdapter.this.sendingTaskExecutor.execute(messageSendingTask); + ImapIdleChannelAdapter.this.messageSender.accept(mailMessage); } } } } - catch (MessagingException ex) { + catch (jakarta.mail.MessagingException ex) { logger.warn(ex, "error occurred in idle task"); if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically) { throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", ex); } else { - throw new org.springframework.messaging.MessagingException( - "Failure in 'idle' task. Will NOT resubmit.", ex); + throw new MessagingException("Failure in 'idle' task. Will NOT resubmit.", ex); } } } @@ -298,7 +302,9 @@ public void run() { private class ExceptionAwarePeriodicTrigger implements Trigger { - private volatile boolean delayNextExecution; + private final AtomicBoolean delayNextExecution = new AtomicBoolean(); + + private final AtomicBoolean stop = new AtomicBoolean(); ExceptionAwarePeriodicTrigger() { @@ -306,8 +312,10 @@ private class ExceptionAwarePeriodicTrigger implements Trigger { @Override public Instant nextExecution(TriggerContext triggerContext) { - if (this.delayNextExecution) { - this.delayNextExecution = false; + if (this.stop.getAndSet(false)) { + return null; + } + if (this.delayNextExecution.getAndSet(false)) { return Instant.now().plusMillis(ImapIdleChannelAdapter.this.reconnectDelay); } else { @@ -316,7 +324,11 @@ public Instant nextExecution(TriggerContext triggerContext) { } void delayNextExecution() { - this.delayNextExecution = true; + this.delayNextExecution.set(true); + } + + void stop() { + this.stop.set(true); } } diff --git a/spring-integration-mail/src/main/java/org/springframework/integration/mail/config/ImapIdleChannelAdapterParser.java b/spring-integration-mail/src/main/java/org/springframework/integration/mail/config/ImapIdleChannelAdapterParser.java index 68b7fcd6d17..d5ed26efd2d 100644 --- a/spring-integration-mail/src/main/java/org/springframework/integration/mail/config/ImapIdleChannelAdapterParser.java +++ b/spring-integration-mail/src/main/java/org/springframework/integration/mail/config/ImapIdleChannelAdapterParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,8 +54,6 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, txElement, "synchronization-factory", "transactionSynchronizationFactory"); } - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "task-executor", - "sendingTaskExecutor"); AbstractBeanDefinition beanDefinition = builder.getBeanDefinition(); IntegrationNamespaceUtils.configureAndSetAdviceChainIfPresent(null, DomUtils.getChildElementByTagName(element, "transactional"), beanDefinition, parserContext); diff --git a/spring-integration-mail/src/main/java/org/springframework/integration/mail/dsl/ImapIdleChannelAdapterSpec.java b/spring-integration-mail/src/main/java/org/springframework/integration/mail/dsl/ImapIdleChannelAdapterSpec.java index 5f256d35b3f..ba10128c551 100644 --- a/spring-integration-mail/src/main/java/org/springframework/integration/mail/dsl/ImapIdleChannelAdapterSpec.java +++ b/spring-integration-mail/src/main/java/org/springframework/integration/mail/dsl/ImapIdleChannelAdapterSpec.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2022 the original author or authors. + * Copyright 2014-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.Function; @@ -348,17 +347,6 @@ public ImapIdleChannelAdapterSpec transactional() { return transactional(transactionInterceptor); } - /** - * Specify a task executor to be used to send messages to the downstream flow. - * @param sendingTaskExecutor the sendingTaskExecutor. - * @return the spec. - * @see ImapIdleChannelAdapter#setSendingTaskExecutor(Executor) - */ - public ImapIdleChannelAdapterSpec sendingTaskExecutor(Executor sendingTaskExecutor) { - this.target.setSendingTaskExecutor(sendingTaskExecutor); - return this; - } - /** * @param shouldReconnectAutomatically the shouldReconnectAutomatically. * @return the spec. diff --git a/spring-integration-mail/src/main/resources/org/springframework/integration/mail/config/spring-integration-mail.xsd b/spring-integration-mail/src/main/resources/org/springframework/integration/mail/config/spring-integration-mail.xsd index 8ef3ea2ae60..b38af9d71a5 100644 --- a/spring-integration-mail/src/main/resources/org/springframework/integration/mail/config/spring-integration-mail.xsd +++ b/spring-integration-mail/src/main/resources/org/springframework/integration/mail/config/spring-integration-mail.xsd @@ -164,21 +164,6 @@ - - - - - - - - - - @@ -197,7 +182,7 @@ @@ -206,7 +191,8 @@ diff --git a/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java b/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java index 3d009079589..543c5d57136 100644 --- a/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java +++ b/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -209,6 +209,7 @@ public void testIdleWithServerGuts(ImapMailReceiver receiver, boolean mapped, bo adapter.setOutputChannel(channel); adapter.setTaskScheduler(taskScheduler); adapter.setReconnectDelay(1); + adapter.afterPropertiesSet(); adapter.start(); MimeMessage message = GreenMailUtil.createTextEmail("Foo ", "Bar ", "Test Email", "foo\r\n", @@ -698,6 +699,7 @@ public void testInitialIdleDelayWhenRecentIsSupported() throws Exception { QueueChannel channel = new QueueChannel(); adapter.setOutputChannel(channel); adapter.setReconnectDelay(1); + adapter.afterPropertiesSet(); ImapMailReceiver receiver = new ImapMailReceiver("imap:foo"); receiver.setCancelIdleInterval(1); diff --git a/spring-integration-mail/src/test/java/org/springframework/integration/mail/config/ImapIdleChannelAdapterParserTests-context.xml b/spring-integration-mail/src/test/java/org/springframework/integration/mail/config/ImapIdleChannelAdapterParserTests-context.xml index b385e0ae8cf..55b36b104e8 100644 --- a/spring-integration-mail/src/test/java/org/springframework/integration/mail/config/ImapIdleChannelAdapterParserTests-context.xml +++ b/spring-integration-mail/src/test/java/org/springframework/integration/mail/config/ImapIdleChannelAdapterParserTests-context.xml @@ -78,8 +78,7 @@ store-uri="imap:foo" channel="channel" auto-startup="false" - should-delete-messages="true" - task-executor="executor"> + should-delete-messages="true"> diff --git a/spring-integration-mail/src/test/java/org/springframework/integration/mail/config/ImapIdleChannelAdapterParserTests.java b/spring-integration-mail/src/test/java/org/springframework/integration/mail/config/ImapIdleChannelAdapterParserTests.java index 2de5d3edfed..e3ddb6ee88d 100644 --- a/spring-integration-mail/src/test/java/org/springframework/integration/mail/config/ImapIdleChannelAdapterParserTests.java +++ b/spring-integration-mail/src/test/java/org/springframework/integration/mail/config/ImapIdleChannelAdapterParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -194,7 +194,6 @@ public void transactionalAdapter() { assertThat(receiverAccessor.getPropertyValue("shouldDeleteMessages")).isEqualTo(Boolean.TRUE); assertThat(receiverAccessor.getPropertyValue("shouldMarkMessagesAsRead")).isEqualTo(Boolean.TRUE); assertThat(adapterAccessor.getPropertyValue("errorChannel")).isNull(); - assertThat(adapterAccessor.getPropertyValue("sendingTaskExecutor")).isEqualTo(context.getBean("executor")); assertThat(adapterAccessor.getPropertyValue("adviceChain")).isNotNull(); } diff --git a/src/reference/asciidoc/mail.adoc b/src/reference/asciidoc/mail.adoc index ab788d357d5..45118824afc 100644 --- a/src/reference/asciidoc/mail.adoc +++ b/src/reference/asciidoc/mail.adoc @@ -85,7 +85,7 @@ MailReceiver receiver = new Pop3MailReceiver("pop3://usr:pwd@localhost/INBOX"); Another option for receiving mail is the IMAP `idle` command (if supported by your mail server). Spring Integration provides the `ImapIdleChannelAdapter`, which is itself a message-producing endpoint. -It delegates to an instance of the `ImapMailReceiver` but enables asynchronous reception of mail messages. +It delegates to an instance of the `ImapMailReceiver`. The next section has examples of configuring both types of inbound channel adapter with Spring Integration's namespace support in the 'mail' schema. [[imap-format-important]] @@ -143,6 +143,10 @@ In this case, the only header populated is the mentioned above `IntegrationMessa Starting with version 5.5.11, the folder is closed automatically after `AbstractMailReceiver.receive()` if no messages received or all of them are filtered out independently of the `autoCloseFolder` flag. In this case there is nothing to produce downstream for possible logic around `IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE` header. +Starting with version 6.0.5, the `ImapIdleChannelAdapter` no longer performs asynchronous message publishing. +This is necessary to block the idle listener loop for message processing downstream (e.g. with big attachments) because the mail folder must remain open. +If an async hand-off is required, an `ExecutorChannel` can be used as the output channel of this channel adapter. + [[mail-mapping]] === Inbound Mail Message Mapping @@ -428,7 +432,7 @@ IMPORTANT: In both configurations, `channel` and `should-delete-messages` are re You should understand why `should-delete-messages` is required. The issue is with the POP3 protocol, which does not have any knowledge of messages that were read. It can only know what has been read within a single session. -This means that, when your POP3 mail adapter runs, emails are successfully consumed as as they become available during each poll and no single email message is delivered more then once. +This means that, when your POP3 mail adapter runs, emails are successfully consumed as they become available during each poll and no single email message is delivered more then once. However, as soon as you restart your adapter and begin a new session, all the email messages that might have been retrieved in the previous session are retrieved again. That is the nature of POP3. Some might argue that `should-delete-messages` should be `true` by default. @@ -563,7 +567,7 @@ The following example shows what the `Mover` class might look like: ---- public class Mover { - public void process(MimeMessage message) throws Exception{ + public void process(MimeMessage message) throws Exception { Folder folder = message.getFolder(); folder.open(Folder.READ_WRITE); String messageId = message.getMessageID(); @@ -581,7 +585,7 @@ public class Mover { } } - Folder somethingFolder = store.getFolder("SOMETHING")); + Folder somethingFolder = store.getFolder("SOMETHING"); somethingFolder.appendMessages(new MimeMessage[]{message}); folder.expunge(); folder.close(true); @@ -634,7 +638,7 @@ public class MailApplication { .handle(Mail.outboundAdapter("gmail") .port(smtpServer.getPort()) .credentials("user", "pw") - .protocol("smtp")), + .protocol("smtp"), e -> e.id("sendMailEndpoint")) .get(); } diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 0a7dbe39ab5..5740ab3e75d 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -47,3 +47,9 @@ See <<./web-sockets.adoc#web-socket-overview, WebSocket Overview>> for more info The `JmsInboundGateway`, via its `ChannelPublishingJmsMessageListener`, can now be configured with a `replyToExpression` to resolve a reply destination against the request message at runtime. See <<./jms.adoc#jms-inbound-gateway, JMS Inbound Gateway>> for more information. + +[[x6.1-mail]] +=== Mail Changes + +The (previously deprecated) `ImapIdleChannelAdapter.sendingTaskExecutor` property has been removed in favor of an asynchronous message process downstream in the flow. +See <<./mail.adoc#mail-inbound, Mail-receiving Channel Adapter>> for more information.