Skip to content

Commit

Permalink
GH-8577: Revise ImapIdleChannelAdapter logic (#8588)
Browse files Browse the repository at this point in the history
* GH-8577: Revise `ImapIdleChannelAdapter` logic

Fixes #8577

When we process mail messages in async manner, it is possible that we end up
in a race condition situation where the next idle cycle closes the folder.

It is possible to reopen the folder, but feels better to block the current idle
cycle until we are done with the message and therefore keep folder opened.

* Deprecate `ImapIdleChannelAdapter.sendingTaskExecutor` in favor of an `ExecutorChannel`
as an output for this channel adapter or similar async hand-off downstream.
* Make use of `shouldReconnectAutomatically` as it is advertised for this channel adapter
* Optimize the proxy creation for message sending task

* * Remove `ImapIdleChannelAdapter.sendingTaskExecutor`

* Fix language in docs

Co-authored-by: Gary Russell <grussell@vmware.com>

---------

Co-authored-by: Gary Russell <grussell@vmware.com>
  • Loading branch information
artembilan and garyrussell authored Mar 29, 2023
1 parent 22d47e7 commit 4fdbdf1
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 118 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,14 +19,12 @@
import java.io.Serial;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import jakarta.mail.Folder;
import jakarta.mail.Message;
import jakarta.mail.MessagingException;
import org.aopalliance.aop.Advice;

import org.springframework.aop.framework.ProxyFactory;
Expand All @@ -38,6 +36,7 @@
import org.springframework.integration.transaction.IntegrationResourceHolder;
import org.springframework.integration.transaction.IntegrationResourceHolderSynchronization;
import org.springframework.integration.transaction.TransactionSynchronizationFactory;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
Expand Down Expand Up @@ -78,12 +77,10 @@ public class ImapIdleChannelAdapter extends MessageProducerSupport implements Be

private boolean shouldReconnectAutomatically = true;

private Executor sendingTaskExecutor = Executors.newFixedThreadPool(1);

private boolean sendingTaskExecutorSet;

private List<Advice> adviceChain;

private Consumer<Object> messageSender;

private long reconnectDelay = DEFAULT_RECONNECT_DELAY; // milliseconds

private volatile ScheduledFuture<?> receivingTask;
Expand All @@ -103,21 +100,10 @@ public void setAdviceChain(List<Advice> adviceChain) {
this.adviceChain = adviceChain;
}

/**
* Specify an {@link Executor} used to send messages received by the
* adapter.
* @param sendingTaskExecutor the sendingTaskExecutor to set
*/
public void setSendingTaskExecutor(Executor sendingTaskExecutor) {
Assert.notNull(sendingTaskExecutor, "'sendingTaskExecutor' must not be null");
this.sendingTaskExecutor = sendingTaskExecutor;
this.sendingTaskExecutorSet = true;
}

/**
* Specify whether the IDLE task should reconnect automatically after
* catching a {@link jakarta.mail.FolderClosedException} while waiting for messages. The
* default value is <code>true</code>.
* catching a {@link jakarta.mail.MessagingException} while waiting for messages. The
* default value is true.
* @param shouldReconnectAutomatically true to reconnect.
*/
public void setShouldReconnectAutomatically(boolean shouldReconnectAutomatically) {
Expand Down Expand Up @@ -148,6 +134,26 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
this.applicationEventPublisher = applicationEventPublisher;
}

@Override
@SuppressWarnings("unchecked")
protected void onInit() {
super.onInit();

Consumer<?> messageSenderToUse = new MessageSender();

if (!CollectionUtils.isEmpty(this.adviceChain)) {
ProxyFactory proxyFactory = new ProxyFactory(messageSenderToUse);
this.adviceChain.forEach(proxyFactory::addAdvice);
for (Advice advice : this.adviceChain) {
proxyFactory.addAdvice(advice);
}
messageSenderToUse = (Consumer<?>) proxyFactory.getProxy(this.classLoader);
}

this.messageSender = (Consumer<Object>) messageSenderToUse;
}


/*
* Lifecycle implementation
*/
Expand All @@ -162,77 +168,64 @@ protected void doStart() {
@Override
// guarded by super#lifecycleLock
protected void doStop() {
this.receivingTask.cancel(true);
if (this.receivingTask != null) {
this.receivingTask.cancel(true);
this.receivingTask = null;
}
this.mailReceiver.cancelPing();
}

@Override
public void destroy() {
super.destroy();
this.mailReceiver.destroy();
// If we're running with the default executor, shut it down.
if (!this.sendingTaskExecutorSet && this.sendingTaskExecutor != null) {
((ExecutorService) this.sendingTaskExecutor).shutdown();
}

private void publishException(Exception ex) {
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new ImapIdleExceptionEvent(ex));
}
else {
logger.debug(() -> "No application event publisher for exception: " + ex.getMessage());
}
}

private Runnable createMessageSendingTask(Object mailMessage) {
Runnable sendingTask = prepareSendingTask(mailMessage);
private class MessageSender implements Consumer<Object> {

// wrap in the TX proxy if necessary
if (!CollectionUtils.isEmpty(this.adviceChain)) {
ProxyFactory proxyFactory = new ProxyFactory(sendingTask);
if (!CollectionUtils.isEmpty(this.adviceChain)) {
for (Advice advice : this.adviceChain) {
proxyFactory.addAdvice(advice);
}
}
sendingTask = (Runnable) proxyFactory.getProxy(this.classLoader);
MessageSender() {
}
return sendingTask;
}

private Runnable prepareSendingTask(Object mailMessage) {
return () -> {
@SuppressWarnings("unchecked")
org.springframework.messaging.Message<?> message =
@Override
public void accept(Object mailMessage) {
org.springframework.messaging.Message<?> messageToSend =
mailMessage instanceof Message
? getMessageBuilderFactory().withPayload(mailMessage).build()
: (org.springframework.messaging.Message<Object>) mailMessage;
: (org.springframework.messaging.Message<?>) mailMessage;

if (TransactionSynchronizationManager.isActualTransactionActive()
&& this.transactionSynchronizationFactory != null) {
&& ImapIdleChannelAdapter.this.transactionSynchronizationFactory != null) {

TransactionSynchronization synchronization = this.transactionSynchronizationFactory.create(this);
TransactionSynchronization synchronization =
ImapIdleChannelAdapter.this.transactionSynchronizationFactory.create(this);
if (synchronization != null) {
TransactionSynchronizationManager.registerSynchronization(synchronization);

if (synchronization instanceof IntegrationResourceHolderSynchronization
if (synchronization instanceof IntegrationResourceHolderSynchronization integrationSync
&& !TransactionSynchronizationManager.hasResource(this)) {

TransactionSynchronizationManager.bindResource(this,
((IntegrationResourceHolderSynchronization) synchronization).getResourceHolder());
TransactionSynchronizationManager.bindResource(this, integrationSync.getResourceHolder());
}

Object resourceHolder = TransactionSynchronizationManager.getResource(this);
if (resourceHolder instanceof IntegrationResourceHolder) {
((IntegrationResourceHolder) resourceHolder).setMessage(message);
if (resourceHolder instanceof IntegrationResourceHolder integrationResourceHolder) {
integrationResourceHolder.setMessage(messageToSend);
}
}
}
sendMessage(message);
};
}

private void publishException(Exception ex) {
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new ImapIdleExceptionEvent(ex));
sendMessage(messageToSend);
}
else {
logger.debug(() -> "No application event publisher for exception: " + ex.getMessage());
}
}

}

private class ReceivingTask implements Runnable {

Expand All @@ -246,10 +239,23 @@ public void run() {
ImapIdleChannelAdapter.this.idleTask.run();
logger.debug("Task completed successfully. Re-scheduling it again right away.");
}
catch (Exception ex) { //run again after a delay
logger.warn(ex, () -> "Failed to execute IDLE task. Will attempt to resubmit in "
+ ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds.");
ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution();
catch (Exception ex) {
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically
&& ex.getCause() instanceof jakarta.mail.MessagingException messagingException) {

//run again after a delay
logger.info(messagingException,
() -> "Failed to execute IDLE task. Will attempt to resubmit in "
+ ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds.");
ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution();
}
else {
logger.warn(ex,
"Failed to execute IDLE task. " +
"Won't resubmit since not a 'shouldReconnectAutomatically'" +
"or not a 'jakarta.mail.MessagingException'");
ImapIdleChannelAdapter.this.receivingTaskTrigger.stop();
}
publishException(ex);
}
}
Expand All @@ -274,21 +280,19 @@ public void run() {
Object[] mailMessages = ImapIdleChannelAdapter.this.mailReceiver.receive();
logger.debug(() -> "received " + mailMessages.length + " mail messages");
for (Object mailMessage : mailMessages) {
Runnable messageSendingTask = createMessageSendingTask(mailMessage);
if (isRunning()) {
ImapIdleChannelAdapter.this.sendingTaskExecutor.execute(messageSendingTask);
ImapIdleChannelAdapter.this.messageSender.accept(mailMessage);
}
}
}
}
catch (MessagingException ex) {
catch (jakarta.mail.MessagingException ex) {
logger.warn(ex, "error occurred in idle task");
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically) {
throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", ex);
}
else {
throw new org.springframework.messaging.MessagingException(
"Failure in 'idle' task. Will NOT resubmit.", ex);
throw new MessagingException("Failure in 'idle' task. Will NOT resubmit.", ex);
}
}
}
Expand All @@ -298,16 +302,20 @@ public void run() {

private class ExceptionAwarePeriodicTrigger implements Trigger {

private volatile boolean delayNextExecution;
private final AtomicBoolean delayNextExecution = new AtomicBoolean();

private final AtomicBoolean stop = new AtomicBoolean();


ExceptionAwarePeriodicTrigger() {
}

@Override
public Instant nextExecution(TriggerContext triggerContext) {
if (this.delayNextExecution) {
this.delayNextExecution = false;
if (this.stop.getAndSet(false)) {
return null;
}
if (this.delayNextExecution.getAndSet(false)) {
return Instant.now().plusMillis(ImapIdleChannelAdapter.this.reconnectDelay);
}
else {
Expand All @@ -316,7 +324,11 @@ public Instant nextExecution(TriggerContext triggerContext) {
}

void delayNextExecution() {
this.delayNextExecution = true;
this.delayNextExecution.set(true);
}

void stop() {
this.stop.set(true);
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,8 +54,6 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, txElement,
"synchronization-factory", "transactionSynchronizationFactory");
}
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "task-executor",
"sendingTaskExecutor");
AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
IntegrationNamespaceUtils.configureAndSetAdviceChainIfPresent(null,
DomUtils.getChildElementByTagName(element, "transactional"), beanDefinition, parserContext);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2022 the original author or authors.
* Copyright 2014-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,6 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -348,17 +347,6 @@ public ImapIdleChannelAdapterSpec transactional() {
return transactional(transactionInterceptor);
}

/**
* Specify a task executor to be used to send messages to the downstream flow.
* @param sendingTaskExecutor the sendingTaskExecutor.
* @return the spec.
* @see ImapIdleChannelAdapter#setSendingTaskExecutor(Executor)
*/
public ImapIdleChannelAdapterSpec sendingTaskExecutor(Executor sendingTaskExecutor) {
this.target.setSendingTaskExecutor(sendingTaskExecutor);
return this;
}

/**
* @param shouldReconnectAutomatically the shouldReconnectAutomatically.
* @return the spec.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,6 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="task-executor" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
Reference to a bean that implements
org.springframework.core.task.TaskExecutor which is used
to send Messages received by this adapter.
If not provided, the adapter uses a single-threaded executor.
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.core.task.TaskExecutor"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="cancel-idle-interval" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Expand All @@ -197,7 +182,7 @@
<xsd:attribute name="store-uri" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
The URI for the Mail Store. Typically of the form: [pop3|imap]://user:password@host:port/INBOX
The URI for the Mail Store. Typically, of the form: [pop3|imap]://user:password@host:port/INBOX
If this is not provided, then the store will be retrieved via the no-arg Session.getStore()
instead of the Session.getStore(url) method.
]]></xsd:documentation>
Expand All @@ -206,7 +191,8 @@
<xsd:attribute name="mail-filter-expression" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
Allows you to provide a SpEL expression which defines a fine grained filtering criteria for the mail messages to be processed by this adapter.
Allows you to provide a SpEL expression which defines a fine-grained
filtering criteria for the mail messages to be processed by this adapter.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
Expand Down
Loading

0 comments on commit 4fdbdf1

Please sign in to comment.