|
16 | 16 | */ |
17 | 17 | package org.apache.activemq.transport.tcp; |
18 | 18 |
|
| 19 | +import static java.lang.Thread.getAllStackTraces; |
| 20 | +import static java.util.stream.Collectors.toList; |
| 21 | + |
| 22 | +import static org.hamcrest.MatcherAssert.assertThat; |
| 23 | +import static org.hamcrest.core.IsCollectionContaining.hasItem; |
| 24 | +import static org.hamcrest.core.IsNot.not; |
| 25 | + |
19 | 26 | import java.io.IOException; |
| 27 | +import java.net.SocketException; |
20 | 28 | import java.net.URI; |
21 | 29 | import java.net.URISyntaxException; |
| 30 | +import java.util.List; |
22 | 31 | import java.util.concurrent.atomic.AtomicBoolean; |
23 | 32 | import java.util.concurrent.atomic.AtomicInteger; |
24 | 33 |
|
|
33 | 42 | import org.apache.activemq.transport.TransportFactory; |
34 | 43 | import org.apache.activemq.transport.TransportListener; |
35 | 44 | import org.apache.activemq.transport.TransportServer; |
| 45 | +import org.hamcrest.Matcher; |
36 | 46 | import org.slf4j.Logger; |
37 | 47 | import org.slf4j.LoggerFactory; |
38 | 48 |
|
@@ -73,32 +83,7 @@ protected void setUp() throws Exception { |
73 | 83 | */ |
74 | 84 | private void startClient() throws Exception, URISyntaxException { |
75 | 85 | clientTransport = TransportFactory.connect(new URI("tcp://localhost:" + serverPort + "?trace=true&wireFormat.maxInactivityDuration=1000")); |
76 | | - clientTransport.setTransportListener(new TransportListener() { |
77 | | - @Override |
78 | | - public void onCommand(Object command) { |
79 | | - clientReceiveCount.incrementAndGet(); |
80 | | - if (clientRunOnCommand != null) { |
81 | | - clientRunOnCommand.run(); |
82 | | - } |
83 | | - } |
84 | | - |
85 | | - @Override |
86 | | - public void onException(IOException error) { |
87 | | - if (!ignoreClientError.get()) { |
88 | | - LOG.info("Client transport error:"); |
89 | | - error.printStackTrace(); |
90 | | - clientErrorCount.incrementAndGet(); |
91 | | - } |
92 | | - } |
93 | | - |
94 | | - @Override |
95 | | - public void transportInterupted() { |
96 | | - } |
97 | | - |
98 | | - @Override |
99 | | - public void transportResumed() { |
100 | | - } |
101 | | - }); |
| 86 | + clientTransport.setTransportListener(new TestClientTransportListener()); |
102 | 87 |
|
103 | 88 | clientTransport.start(); |
104 | 89 | } |
@@ -181,32 +166,7 @@ public void testClientHang() throws Exception { |
181 | 166 | // Manually create a client transport so that it does not send KeepAlive |
182 | 167 | // packets. this should simulate a client hang. |
183 | 168 | clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:" + serverPort), null); |
184 | | - clientTransport.setTransportListener(new TransportListener() { |
185 | | - @Override |
186 | | - public void onCommand(Object command) { |
187 | | - clientReceiveCount.incrementAndGet(); |
188 | | - if (clientRunOnCommand != null) { |
189 | | - clientRunOnCommand.run(); |
190 | | - } |
191 | | - } |
192 | | - |
193 | | - @Override |
194 | | - public void onException(IOException error) { |
195 | | - if (!ignoreClientError.get()) { |
196 | | - LOG.info("Client transport error:"); |
197 | | - error.printStackTrace(); |
198 | | - clientErrorCount.incrementAndGet(); |
199 | | - } |
200 | | - } |
201 | | - |
202 | | - @Override |
203 | | - public void transportInterupted() { |
204 | | - } |
205 | | - |
206 | | - @Override |
207 | | - public void transportResumed() { |
208 | | - } |
209 | | - }); |
| 169 | + clientTransport.setTransportListener(new TestClientTransportListener()); |
210 | 170 |
|
211 | 171 | clientTransport.start(); |
212 | 172 | WireFormatInfo info = new WireFormatInfo(); |
@@ -237,6 +197,34 @@ public void testNoClientHang() throws Exception { |
237 | 197 | assertEquals(0, serverErrorCount.get()); |
238 | 198 | } |
239 | 199 |
|
| 200 | + public void testReadCheckTimerIsNotLeakedOnError() throws Exception { |
| 201 | + // Intentionally picks a port that is not the listening port to generate a failure |
| 202 | + clientTransport = TransportFactory.connect(new URI("tcp://localhost:" + (serverPort ^ 1))); |
| 203 | + clientTransport.setTransportListener(new TestClientTransportListener()); |
| 204 | + |
| 205 | + // Control test to verify there was no timer from a previous test |
| 206 | + assertThat(getCurrentThreadNames(), not(hasReadCheckTimer())); |
| 207 | + |
| 208 | + try { |
| 209 | + clientTransport.start(); |
| 210 | + fail("A ConnectionException was expected"); |
| 211 | + } catch (SocketException e) { |
| 212 | + // A SocketException is expected. |
| 213 | + } |
| 214 | + |
| 215 | + // If there is any read check timer at this point, calling stop should clean it up (because CHECKER_COUNTER becomes 0) |
| 216 | + clientTransport.stop(); |
| 217 | + assertThat(getCurrentThreadNames(), not(hasReadCheckTimer())); |
| 218 | + } |
| 219 | + |
| 220 | + private static Matcher<Iterable<? super String>> hasReadCheckTimer() { |
| 221 | + return hasItem("ActiveMQ InactivityMonitor ReadCheckTimer"); |
| 222 | + } |
| 223 | + |
| 224 | + private static List<String> getCurrentThreadNames() { |
| 225 | + return getAllStackTraces().keySet().stream().map(Thread::getName).collect(toList()); |
| 226 | + } |
| 227 | + |
240 | 228 | /** |
241 | 229 | * Used to test when a operation blocks. This should not cause transport to |
242 | 230 | * get disconnected. |
@@ -272,4 +260,31 @@ public void testNoClientHangWithServerBlock() throws Exception { |
272 | 260 | assertEquals(0, clientErrorCount.get()); |
273 | 261 | assertEquals(0, serverErrorCount.get()); |
274 | 262 | } |
| 263 | + |
| 264 | + private class TestClientTransportListener implements TransportListener { |
| 265 | + @Override |
| 266 | + public void onCommand(Object command) { |
| 267 | + clientReceiveCount.incrementAndGet(); |
| 268 | + if (clientRunOnCommand != null) { |
| 269 | + clientRunOnCommand.run(); |
| 270 | + } |
| 271 | + } |
| 272 | + |
| 273 | + @Override |
| 274 | + public void onException(IOException error) { |
| 275 | + if (!ignoreClientError.get()) { |
| 276 | + LOG.info("Client transport error:"); |
| 277 | + error.printStackTrace(); |
| 278 | + clientErrorCount.incrementAndGet(); |
| 279 | + } |
| 280 | + } |
| 281 | + |
| 282 | + @Override |
| 283 | + public void transportInterupted() { |
| 284 | + } |
| 285 | + |
| 286 | + @Override |
| 287 | + public void transportResumed() { |
| 288 | + } |
| 289 | + } |
275 | 290 | } |
0 commit comments