Skip to content

Commit b6037c7

Browse files
authored
[AMQ-9392] Prevent InactivityMonitor read check Timer leak when TCP connection fails (#1119)
* Adding test. * Naive fix. * Fixing OpenWireConnectionTimeoutTest. - It seems checking configuredOk can't be done that early. - Falling back to let the Timer be created and make sure it is disposed of.
1 parent 4191818 commit b6037c7

File tree

2 files changed

+86
-56
lines changed

2 files changed

+86
-56
lines changed

activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,17 @@ public synchronized void stopConnectCheckTask() {
437437
synchronized (AbstractInactivityMonitor.class) {
438438
READ_CHECK_TIMER.purge();
439439
CHECKER_COUNTER--;
440+
if (CHECKER_COUNTER == 0) {
441+
if (READ_CHECK_TIMER != null) {
442+
READ_CHECK_TIMER.cancel();
443+
READ_CHECK_TIMER = null;
444+
}
445+
try {
446+
ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, 0);
447+
} finally {
448+
ASYNC_TASKS = null;
449+
}
450+
}
440451
}
441452
}
442453
}
@@ -497,10 +508,14 @@ protected synchronized void stopMonitorThreads() {
497508
READ_CHECK_TIMER.purge();
498509
CHECKER_COUNTER--;
499510
if (CHECKER_COUNTER == 0) {
500-
WRITE_CHECK_TIMER.cancel();
501-
READ_CHECK_TIMER.cancel();
502-
WRITE_CHECK_TIMER = null;
503-
READ_CHECK_TIMER = null;
511+
if (WRITE_CHECK_TIMER != null) {
512+
WRITE_CHECK_TIMER.cancel();
513+
WRITE_CHECK_TIMER = null;
514+
}
515+
if (READ_CHECK_TIMER != null) {
516+
READ_CHECK_TIMER.cancel();
517+
READ_CHECK_TIMER = null;
518+
}
504519
try {
505520
ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, 0);
506521
} finally {

activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java

Lines changed: 67 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,18 @@
1616
*/
1717
package org.apache.activemq.transport.tcp;
1818

19+
import static java.lang.Thread.getAllStackTraces;
20+
import static java.util.stream.Collectors.toList;
21+
22+
import static org.hamcrest.MatcherAssert.assertThat;
23+
import static org.hamcrest.core.IsCollectionContaining.hasItem;
24+
import static org.hamcrest.core.IsNot.not;
25+
1926
import java.io.IOException;
27+
import java.net.SocketException;
2028
import java.net.URI;
2129
import java.net.URISyntaxException;
30+
import java.util.List;
2231
import java.util.concurrent.atomic.AtomicBoolean;
2332
import java.util.concurrent.atomic.AtomicInteger;
2433

@@ -33,6 +42,7 @@
3342
import org.apache.activemq.transport.TransportFactory;
3443
import org.apache.activemq.transport.TransportListener;
3544
import org.apache.activemq.transport.TransportServer;
45+
import org.hamcrest.Matcher;
3646
import org.slf4j.Logger;
3747
import org.slf4j.LoggerFactory;
3848

@@ -73,32 +83,7 @@ protected void setUp() throws Exception {
7383
*/
7484
private void startClient() throws Exception, URISyntaxException {
7585
clientTransport = TransportFactory.connect(new URI("tcp://localhost:" + serverPort + "?trace=true&wireFormat.maxInactivityDuration=1000"));
76-
clientTransport.setTransportListener(new TransportListener() {
77-
@Override
78-
public void onCommand(Object command) {
79-
clientReceiveCount.incrementAndGet();
80-
if (clientRunOnCommand != null) {
81-
clientRunOnCommand.run();
82-
}
83-
}
84-
85-
@Override
86-
public void onException(IOException error) {
87-
if (!ignoreClientError.get()) {
88-
LOG.info("Client transport error:");
89-
error.printStackTrace();
90-
clientErrorCount.incrementAndGet();
91-
}
92-
}
93-
94-
@Override
95-
public void transportInterupted() {
96-
}
97-
98-
@Override
99-
public void transportResumed() {
100-
}
101-
});
86+
clientTransport.setTransportListener(new TestClientTransportListener());
10287

10388
clientTransport.start();
10489
}
@@ -181,32 +166,7 @@ public void testClientHang() throws Exception {
181166
// Manually create a client transport so that it does not send KeepAlive
182167
// packets. this should simulate a client hang.
183168
clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:" + serverPort), null);
184-
clientTransport.setTransportListener(new TransportListener() {
185-
@Override
186-
public void onCommand(Object command) {
187-
clientReceiveCount.incrementAndGet();
188-
if (clientRunOnCommand != null) {
189-
clientRunOnCommand.run();
190-
}
191-
}
192-
193-
@Override
194-
public void onException(IOException error) {
195-
if (!ignoreClientError.get()) {
196-
LOG.info("Client transport error:");
197-
error.printStackTrace();
198-
clientErrorCount.incrementAndGet();
199-
}
200-
}
201-
202-
@Override
203-
public void transportInterupted() {
204-
}
205-
206-
@Override
207-
public void transportResumed() {
208-
}
209-
});
169+
clientTransport.setTransportListener(new TestClientTransportListener());
210170

211171
clientTransport.start();
212172
WireFormatInfo info = new WireFormatInfo();
@@ -237,6 +197,34 @@ public void testNoClientHang() throws Exception {
237197
assertEquals(0, serverErrorCount.get());
238198
}
239199

200+
public void testReadCheckTimerIsNotLeakedOnError() throws Exception {
201+
// Intentionally picks a port that is not the listening port to generate a failure
202+
clientTransport = TransportFactory.connect(new URI("tcp://localhost:" + (serverPort ^ 1)));
203+
clientTransport.setTransportListener(new TestClientTransportListener());
204+
205+
// Control test to verify there was no timer from a previous test
206+
assertThat(getCurrentThreadNames(), not(hasReadCheckTimer()));
207+
208+
try {
209+
clientTransport.start();
210+
fail("A ConnectionException was expected");
211+
} catch (SocketException e) {
212+
// A SocketException is expected.
213+
}
214+
215+
// If there is any read check timer at this point, calling stop should clean it up (because CHECKER_COUNTER becomes 0)
216+
clientTransport.stop();
217+
assertThat(getCurrentThreadNames(), not(hasReadCheckTimer()));
218+
}
219+
220+
private static Matcher<Iterable<? super String>> hasReadCheckTimer() {
221+
return hasItem("ActiveMQ InactivityMonitor ReadCheckTimer");
222+
}
223+
224+
private static List<String> getCurrentThreadNames() {
225+
return getAllStackTraces().keySet().stream().map(Thread::getName).collect(toList());
226+
}
227+
240228
/**
241229
* Used to test when a operation blocks. This should not cause transport to
242230
* get disconnected.
@@ -272,4 +260,31 @@ public void testNoClientHangWithServerBlock() throws Exception {
272260
assertEquals(0, clientErrorCount.get());
273261
assertEquals(0, serverErrorCount.get());
274262
}
263+
264+
private class TestClientTransportListener implements TransportListener {
265+
@Override
266+
public void onCommand(Object command) {
267+
clientReceiveCount.incrementAndGet();
268+
if (clientRunOnCommand != null) {
269+
clientRunOnCommand.run();
270+
}
271+
}
272+
273+
@Override
274+
public void onException(IOException error) {
275+
if (!ignoreClientError.get()) {
276+
LOG.info("Client transport error:");
277+
error.printStackTrace();
278+
clientErrorCount.incrementAndGet();
279+
}
280+
}
281+
282+
@Override
283+
public void transportInterupted() {
284+
}
285+
286+
@Override
287+
public void transportResumed() {
288+
}
289+
}
275290
}

0 commit comments

Comments
 (0)