Skip to content

Commit a90d66f

Browse files
committed
Don't renew message until newMessageNextTrack()
1 parent 51797ad commit a90d66f

15 files changed

+180
-32
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ public void setShouldTrack(boolean shouldTrack) {
9898
this.shouldTrack = shouldTrack;
9999
}
100100

101+
@Override
102+
public boolean newMessageNextTrack() {
103+
return false;
104+
}
105+
101106
@Override
102107
public void setCountsEnabled(boolean countsEnabled) {
103108
this.countsEnabled = countsEnabled;
@@ -392,7 +397,7 @@ public final boolean send(Message<?> message, long timeout) {
392397
Assert.notNull(message, "message must not be null");
393398
Assert.notNull(message.getPayload(), "message payload must not be null");
394399
if (this.shouldTrack) {
395-
message = MessageHistory.write(message, this, this.getMessageBuilderFactory());
400+
message = MessageHistory.write(message, this, getMessageBuilderFactory(), newMessageNextTrack());
396401
}
397402

398403
Deque<ChannelInterceptor> interceptorStack = null;

spring-integration-core/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ public String getComponentType() {
7070
return "publish-subscribe-channel";
7171
}
7272

73+
@Override
74+
public boolean newMessageNextTrack() {
75+
return true;
76+
}
77+
7378
/**
7479
* Provide an {@link ErrorHandler} strategy for handling Exceptions that
7580
* occur downstream from this channel. This will <i>only</i> be applied if

spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ public void setShouldTrack(boolean shouldTrack) {
8080
this.shouldTrack = shouldTrack;
8181
}
8282

83+
@Override
84+
public boolean newMessageNextTrack() {
85+
return false;
86+
}
87+
8388
protected MessagingTemplate getMessagingTemplate() {
8489
return this.messagingTemplate;
8590
}
@@ -117,7 +122,7 @@ protected void sendMessage(Message<?> message) {
117122
throw new MessagingException("cannot send a null message");
118123
}
119124
if (this.shouldTrack) {
120-
message = MessageHistory.write(message, this, this.getMessageBuilderFactory());
125+
message = MessageHistory.write(message, this, getMessageBuilderFactory(), newMessageNextTrack());
121126
}
122127
try {
123128
this.messagingTemplate.send(this.outputChannel, message);

spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ public void setShouldTrack(boolean shouldTrack) {
102102
this.shouldTrack = shouldTrack;
103103
}
104104

105+
@Override
106+
public boolean newMessageNextTrack() {
107+
return false;
108+
}
109+
105110
@Override
106111
public String getComponentType() {
107112
return (this.source instanceof NamedComponent) ?

spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,11 @@ public void setShouldTrack(boolean shouldTrack) {
219219
}
220220
}
221221

222+
@Override
223+
public boolean newMessageNextTrack() {
224+
return false;
225+
}
226+
222227
/**
223228
* Set the executor for use when the gateway method returns
224229
* {@link java.util.concurrent.Future} or {@link org.springframework.util.concurrent.ListenableFuture}.

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,12 @@ public void setShouldTrack(boolean shouldTrack) {
234234
this.historyWritingPostProcessor.setShouldTrack(shouldTrack);
235235
}
236236

237+
@Override
238+
public boolean newMessageNextTrack() {
239+
return false;
240+
}
241+
242+
237243
@Override
238244
public int getMessageCount() {
239245
return (int) this.messageCount.get();

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageHandler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ public void setShouldTrack(boolean shouldTrack) {
9494
this.shouldTrack = shouldTrack;
9595
}
9696

97+
@Override
98+
public boolean newMessageNextTrack() {
99+
return false;
100+
}
101+
97102
@Override
98103
public void configureMetrics(AbstractMessageHandlerMetrics metrics) {
99104
Assert.notNull(metrics, "'metrics' must not be null");
@@ -119,7 +124,7 @@ public final void handleMessage(Message<?> message) {
119124
AbstractMessageHandlerMetrics handlerMetrics = this.handlerMetrics;
120125
try {
121126
if (message != null && this.shouldTrack) {
122-
message = MessageHistory.write(message, this, this.getMessageBuilderFactory());
127+
message = MessageHistory.write(message, this, getMessageBuilderFactory(), newMessageNextTrack());
123128
}
124129
if (countsEnabled) {
125130
start = handlerMetrics.beforeHandle();

spring-integration-core/src/main/java/org/springframework/integration/history/MessageHistory.java

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ public final class MessageHistory implements List<Properties>, Serializable {
5555
private static final MessageBuilderFactory MESSAGE_BUILDER_FACTORY = new DefaultMessageBuilderFactory();
5656

5757

58-
private final List<Properties> components;
58+
private final List<Properties> components = new ArrayList<Properties>();
59+
60+
private boolean nextNewMessage;
5961

6062

6163
public static MessageHistory read(Message<?> message) {
@@ -66,46 +68,58 @@ public static <T> Message<T> write(Message<T> message, NamedComponent component)
6668
return write(message, component, MESSAGE_BUILDER_FACTORY);
6769
}
6870

69-
@SuppressWarnings("unchecked")
7071
public static <T> Message<T> write(Message<T> message, NamedComponent component,
7172
MessageBuilderFactory messageBuilderFactory) {
73+
return write(message, component, messageBuilderFactory, false);
74+
}
75+
76+
@SuppressWarnings("unchecked")
77+
public static <T> Message<T> write(Message<T> message, NamedComponent component,
78+
MessageBuilderFactory messageBuilderFactory, boolean newMessageOnNext) {
7279
Assert.notNull(message, "Message must not be null");
7380
Assert.notNull(component, "Component must not be null");
7481
Properties metadata = extractMetadata(component);
7582
if (!metadata.isEmpty()) {
76-
MessageHistory previousHistory = message.getHeaders().get(HEADER_NAME, MessageHistory.class);
77-
List<Properties> components = (previousHistory != null) ?
78-
new ArrayList<Properties>(previousHistory) : new ArrayList<Properties>();
79-
components.add(metadata);
80-
MessageHistory history = new MessageHistory(components);
81-
82-
if (message instanceof MutableMessage) {
83-
message.getHeaders().put(HEADER_NAME, history);
84-
}
85-
else if (message instanceof ErrorMessage) {
86-
IntegrationMessageHeaderAccessor headerAccessor = new IntegrationMessageHeaderAccessor(message);
87-
headerAccessor.setHeader(HEADER_NAME, history);
88-
Throwable payload = ((ErrorMessage) message).getPayload();
89-
ErrorMessage errorMessage = new ErrorMessage(payload, headerAccessor.toMessageHeaders());
90-
message = (Message<T>) errorMessage;
91-
}
92-
else if (message instanceof AdviceMessage) {
93-
IntegrationMessageHeaderAccessor headerAccessor = new IntegrationMessageHeaderAccessor(message);
94-
headerAccessor.setHeader(HEADER_NAME, history);
95-
message = new AdviceMessage<T>(message.getPayload(), headerAccessor.toMessageHeaders(),
96-
((AdviceMessage) message).getInputMessage());
83+
MessageHistory messageHistory = read(message);
84+
if (messageHistory == null || messageHistory.nextNewMessage) {
85+
messageHistory = new MessageHistory(messageHistory);
86+
messageHistory.components.add(metadata);
87+
88+
if (message instanceof MutableMessage) {
89+
message.getHeaders().put(HEADER_NAME, messageHistory);
90+
}
91+
else if (message instanceof ErrorMessage) {
92+
IntegrationMessageHeaderAccessor headerAccessor = new IntegrationMessageHeaderAccessor(message);
93+
headerAccessor.setHeader(HEADER_NAME, messageHistory);
94+
Throwable payload = ((ErrorMessage) message).getPayload();
95+
ErrorMessage errorMessage = new ErrorMessage(payload, headerAccessor.toMessageHeaders());
96+
message = (Message<T>) errorMessage;
97+
}
98+
else if (message instanceof AdviceMessage) {
99+
IntegrationMessageHeaderAccessor headerAccessor = new IntegrationMessageHeaderAccessor(message);
100+
headerAccessor.setHeader(HEADER_NAME, messageHistory);
101+
message = new AdviceMessage<T>(message.getPayload(), headerAccessor.toMessageHeaders(),
102+
((AdviceMessage) message).getInputMessage());
103+
}
104+
else {
105+
message = messageBuilderFactory.fromMessage(message)
106+
.setHeader(HEADER_NAME, messageHistory)
107+
.build();
108+
}
97109
}
98110
else {
99-
message = messageBuilderFactory.fromMessage(message).setHeader(HEADER_NAME, history).build();
111+
messageHistory.components.add(metadata);
100112
}
113+
messageHistory.nextNewMessage = newMessageOnNext;
101114
}
102115
return message;
103116
}
104117

105118

106119
private MessageHistory(List<Properties> components) {
107-
Assert.notEmpty(components, "component list must not be empty");
108-
this.components = components;
120+
if (components != null) {
121+
this.components.addAll(components);
122+
}
109123
}
110124

111125

spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@ public String getComponentType() {
121121
return "router";
122122
}
123123

124+
@Override
125+
public boolean newMessageNextTrack() {
126+
return true;
127+
}
128+
124129
/**
125130
* Provides {@link MessagingTemplate} access for subclasses
126131
* @return The messaging template.

spring-integration-core/src/main/java/org/springframework/integration/support/management/LifecycleTrackableMessageHandlerMetrics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,9 @@ public void setShouldTrack(boolean shouldTrack) {
5252
this.trackable.setShouldTrack(shouldTrack);
5353
}
5454

55+
@Override
56+
public boolean newMessageNextTrack() {
57+
return this.trackable.newMessageNextTrack();
58+
}
59+
5560
}

spring-integration-core/src/main/java/org/springframework/integration/support/management/LifecycleTrackableMessageSourceMetrics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,9 @@ public void setShouldTrack(boolean shouldTrack) {
5252
this.trackable.setShouldTrack(shouldTrack);
5353
}
5454

55+
@Override
56+
public boolean newMessageNextTrack() {
57+
return this.trackable.newMessageNextTrack();
58+
}
59+
5560
}

spring-integration-core/src/main/java/org/springframework/integration/support/management/TrackableComponent.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
/**
2323
* @author Mark Fisher
2424
* @author Gary Russell
25+
* @author Artem Bilan
2526
* @since 2.0
2627
*/
2728
@IntegrationManagedResource
@@ -30,4 +31,6 @@ public interface TrackableComponent extends NamedComponent {
3031
@ManagedOperation
3132
void setShouldTrack(boolean shouldTrack);
3233

34+
boolean newMessageNextTrack();
35+
3336
}

spring-integration-core/src/main/java/org/springframework/integration/support/management/TrackableRouterMetrics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,9 @@ public void setShouldTrack(boolean shouldTrack) {
5050
this.trackable.setShouldTrack(shouldTrack);
5151
}
5252

53+
@Override
54+
public boolean newMessageNextTrack() {
55+
return this.trackable.newMessageNextTrack();
56+
}
57+
5358
}

spring-integration-core/src/test/java/org/springframework/integration/core/MessageHistoryTests.java

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,12 @@
2525
import static org.junit.Assert.assertThat;
2626

2727
import java.util.Properties;
28+
import java.util.concurrent.atomic.AtomicReference;
2829

2930
import org.junit.Test;
3031

32+
import org.springframework.integration.channel.PublishSubscribeChannel;
33+
import org.springframework.integration.handler.AbstractMessageHandler;
3134
import org.springframework.integration.history.MessageHistory;
3235
import org.springframework.integration.message.AdviceMessage;
3336
import org.springframework.integration.support.MessageBuilder;
@@ -53,6 +56,7 @@ public void addComponents() {
5356
assertNotNull(history1);
5457
assertEquals("testComponent-1", history1.toString());
5558
Message<String> result2 = MessageHistory.write(result1, new TestComponent(2));
59+
assertSame(result1, result2);
5660
MessageHistory history2 = MessageHistory.read(result2);
5761
assertNotNull(history2);
5862
assertEquals("testComponent-1,testComponent-2", history2.toString());
@@ -97,7 +101,7 @@ public void testCorrectErrorMessageAfterWrite() {
97101
Message<Throwable> result2 = MessageHistory.write(result1, new TestComponent(2));
98102
assertThat(result2, instanceOf(ErrorMessage.class));
99103
assertNotSame(original, result2);
100-
assertNotSame(result1, result2);
104+
assertSame(result1, result2);
101105
assertSame(original.getPayload(), result2.getPayload());
102106
MessageHistory history2 = MessageHistory.read(result2);
103107
assertNotNull(history2);
@@ -122,12 +126,63 @@ public void testCorrectAdviceMessageAfterWrite() {
122126
assertNotSame(original, result2);
123127
assertSame(original.getPayload(), result2.getPayload());
124128
assertSame(original.getInputMessage(), ((AdviceMessage) result2).getInputMessage());
125-
assertNotSame(result1, result2);
129+
assertSame(result1, result2);
126130
MessageHistory history2 = MessageHistory.read(result2);
127131
assertNotNull(history2);
128132
assertEquals("testComponent-1,testComponent-2", history2.toString());
129133
}
130134

135+
@Test
136+
public void testHistoryOnPublishSubscribeChannel() {
137+
Message<?> message = new GenericMessage<>("foo");
138+
assertNull(MessageHistory.read(message));
139+
140+
PublishSubscribeChannel publishSubscribeChannel = new PublishSubscribeChannel();
141+
publishSubscribeChannel.setComponentName("publishSubscribeChannel");
142+
publishSubscribeChannel.setShouldTrack(true);
143+
144+
final AtomicReference<Message<?>> message1 = new AtomicReference<>();
145+
146+
AbstractMessageHandler messageHandler1 = new AbstractMessageHandler() {
147+
148+
@Override
149+
protected void handleMessageInternal(Message<?> message) throws Exception {
150+
message1.set(message);
151+
}
152+
153+
};
154+
155+
messageHandler1.setShouldTrack(true);
156+
messageHandler1.setComponentName("messageHandler1");
157+
publishSubscribeChannel.subscribe(messageHandler1);
158+
159+
final AtomicReference<Message<?>> message2 = new AtomicReference<>();
160+
161+
AbstractMessageHandler messageHandler2 = new AbstractMessageHandler() {
162+
163+
@Override
164+
protected void handleMessageInternal(Message<?> message) throws Exception {
165+
message2.set(message);
166+
}
167+
168+
};
169+
170+
messageHandler2.setShouldTrack(true);
171+
messageHandler2.setComponentName("messageHandler2");
172+
publishSubscribeChannel.subscribe(messageHandler2);
173+
174+
publishSubscribeChannel.send(message);
175+
176+
MessageHistory history1 = MessageHistory.read(message1.get());
177+
assertNotNull(history1);
178+
assertEquals("publishSubscribeChannel,messageHandler1", history1.toString());
179+
180+
MessageHistory history2 = MessageHistory.read(message2.get());
181+
assertNotNull(history2);
182+
assertEquals("publishSubscribeChannel,messageHandler2", history2.toString());
183+
}
184+
185+
131186

132187
private static class TestComponent implements NamedComponent {
133188

0 commit comments

Comments
 (0)