Skip to content

Commit 0883972

Browse files
committed
fix: prevent the notifyUpdate() during subscribe to die
A single notifyUpdate() is executed during the initial subscribe, in case there are messages waiting in the queue. Because the try-catch is outside the polling loop this can lead to a dying polling loop at startup leaving the messages unprocessed.
1 parent 6db7041 commit 0883972

File tree

2 files changed

+37
-11
lines changed

2 files changed

+37
-11
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616

1717
package org.springframework.integration.jdbc.channel;
1818

19-
import java.util.Optional;
20-
import java.util.concurrent.Executor;
21-
2219
import org.springframework.core.log.LogAccessor;
2320
import org.springframework.core.task.SimpleAsyncTaskExecutor;
2421
import org.springframework.integration.channel.AbstractSubscribableChannel;
@@ -32,6 +29,9 @@
3229
import org.springframework.transaction.support.TransactionTemplate;
3330
import org.springframework.util.Assert;
3431

32+
import java.util.Optional;
33+
import java.util.concurrent.Executor;
34+
3535
/**
3636
* An {@link AbstractSubscribableChannel} for receiving push notifications for
3737
* messages send to a group id of a {@link JdbcChannelMessageStore}. Receiving
@@ -160,14 +160,15 @@ protected boolean doSend(Message<?> message, long timeout) {
160160
@Override
161161
public void notifyUpdate() {
162162
this.executor.execute(() -> {
163-
try {
164-
Optional<Message<?>> dispatchedMessage;
165-
do {
166-
dispatchedMessage = askForMessage();
167-
} while (dispatchedMessage.isPresent());
168-
}
169-
catch (Exception ex) {
170-
LOGGER.error(ex, "Exception during message dispatch");
163+
while (true) {
164+
try {
165+
if (askForMessage().isEmpty()) {
166+
break;
167+
}
168+
}
169+
catch (Exception ex) {
170+
LOGGER.error(ex, "Exception during message dispatch");
171+
}
171172
}
172173
});
173174
}

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,31 @@ void testRetryOnErrorDuringDispatch(boolean transactionsEnabled) throws Interrup
233233
assertThat(payloads).containsExactly("1");
234234
}
235235

236+
@Test
237+
void testPollLoopDiesOnNotifyInSubscribe() throws InterruptedException {
238+
CountDownLatch latch = new CountDownLatch(2);
239+
List<Object> payloads = new ArrayList<>();
240+
241+
messageStore.addMessageToGroup(groupId, new GenericMessage<>("error"));
242+
messageStore.addMessageToGroup(groupId, new GenericMessage<>("1"));
243+
244+
postgresChannelMessageTableSubscriber.start();
245+
postgresSubscribableChannel.subscribe(message -> {
246+
try {
247+
if ("error".equals(message.getPayload())) {
248+
throw new RuntimeException("An error has occurred");
249+
}
250+
payloads.add(message.getPayload());
251+
}
252+
finally {
253+
latch.countDown();
254+
}
255+
});
256+
257+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
258+
assertThat(payloads).containsExactly("1");
259+
}
260+
236261
@Configuration
237262
@EnableIntegration
238263
public static class Config {

0 commit comments

Comments
 (0)