Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TCP/IP: The cached connections are not freed when used with collaborating channel adapters on the client side #9430

Closed
artembilan opened this issue Aug 29, 2024 · 15 comments · Fixed by #9431

Comments

@artembilan
Copy link
Member

See more info on StackOverlfow: https://stackoverflow.com/questions/78916759/tcp-caching-connection-not-working-as-expected

@artembilan artembilan added this to the 6.4.0-M3 milestone Aug 29, 2024
artembilan added a commit to artembilan/spring-integration that referenced this issue Aug 29, 2024
… adapters

Fixes: spring-projects#9430

When `CachingClientConnectionFactory` is used in combination of
`Tcp.outboundAdapter()` & `Tcp.inboundAdapter()`, the connection is not released
back to the cache because `CachingClientConnectionFactory` does not store
created connections into its `connections` property.
So, when `TcpReceivingChannelAdapter` calls `this.clientConnectionFactory.closeConnection(connectionId);`
it returned immediately because there is nothing to remove from the `this.connections`

* Add `protected removeConnection()` in the `AbstractConnectionFactory`
and override it in the `CachingClientConnectionFactory` with delegation to the `this.targetConnectionFactory`
* Demonstrate the problem in the new `IpIntegrationTests.allRepliesAreReceivedViaLimitedCachingConnectionFactory()` test
and ensure that all 27 letters from English alphabet are sent to the server and received in uppercase
while size of the `CachingClientConnectionFactory` is only `10`

**Auto-cherry-pick to `6.3.x` & `6.2.x`**
spring-builds pushed a commit that referenced this issue Aug 30, 2024
Fixes: #9430

When `CachingClientConnectionFactory` is used in combination of
`Tcp.outboundAdapter()` & `Tcp.inboundAdapter()`, the connection is not released
back to the cache because `CachingClientConnectionFactory` does not store
created connections into its `connections` property.
So, when `TcpReceivingChannelAdapter` calls `this.clientConnectionFactory.closeConnection(connectionId);`
it returned immediately because there is nothing to remove from the `this.connections`

* Add `protected removeConnection()` in the `AbstractConnectionFactory`
and override it in the `CachingClientConnectionFactory` with delegation to the `this.targetConnectionFactory`
* Demonstrate the problem in the new `IpIntegrationTests.allRepliesAreReceivedViaLimitedCachingConnectionFactory()` test
and ensure that all 27 letters from English alphabet are sent to the server and received in uppercase
while size of the `CachingClientConnectionFactory` is only `10`

(cherry picked from commit dc02dec)
spring-builds pushed a commit that referenced this issue Aug 30, 2024
Fixes: #9430

When `CachingClientConnectionFactory` is used in combination of
`Tcp.outboundAdapter()` & `Tcp.inboundAdapter()`, the connection is not released
back to the cache because `CachingClientConnectionFactory` does not store
created connections into its `connections` property.
So, when `TcpReceivingChannelAdapter` calls `this.clientConnectionFactory.closeConnection(connectionId);`
it returned immediately because there is nothing to remove from the `this.connections`

* Add `protected removeConnection()` in the `AbstractConnectionFactory`
and override it in the `CachingClientConnectionFactory` with delegation to the `this.targetConnectionFactory`
* Demonstrate the problem in the new `IpIntegrationTests.allRepliesAreReceivedViaLimitedCachingConnectionFactory()` test
and ensure that all 27 letters from English alphabet are sent to the server and received in uppercase
while size of the `CachingClientConnectionFactory` is only `10`

(cherry picked from commit dc02dec)
@shirshak55
Copy link
Contributor

shirshak55 commented Aug 30, 2024

Hi,

Thanks for the prompt reply in stack overflow.

Here is the repo if someone wants to reproduce it.
https://github.com/shirshak55/sprint-int-caching-connection-bug

I will try testing again if the problem persists.

Thanks.

@artembilan
Copy link
Member Author

Hi, @shirshak55 !
The fix is in. Feel free to test your solution against latest snapshot for 6.2.9, 6.3.4 or 6.4.0.

@shirshak55
Copy link
Contributor

shirshak55 commented Aug 30, 2024

@artembilan

I tested it with 6.4.0-SNAPSHOT and here is the result.

Scenerio 1:
TCP Client: Sends 20 message to TCP Server.
TCP Server: Replies message as soon as it receives.
Pool is 10.
Here, each message goes to server properly but TCP Client will disconnects as soon as it receives the reply. Is this behavior correct?

Logs:

Client connected
Received: Hello World 0
Client disconnected
Client connected
Received: Hello World 1
Client disconnected
Client connected
Received: Hello World 2
Client disconnected
Client connected
Received: Hello World 3
Client disconnected
Client connected
Received: Hello World 4
Client disconnected
Client connected
Received: Hello World 5
Client disconnected
Client connected
Received: Hello World 6
Client disconnected
Client connected
Received: Hello World 7
Client disconnected
Client connected
Received: Hello World 8
Client disconnected
Client connected
Received: Hello World 9
Client disconnected
Client connected
Received: Hello World 10
Client disconnected
Client connected
Received: Hello World 11
Client disconnected
Client connected
Received: Hello World 12
Client disconnected
Client connected


And it follows same pattern.

Scenerio 2:
TCP Client: Sends 20 message to TCP Server. Sends every 100ms
TCP Server: Replies message as soon as it receives. But we delay sending the response by 3second.
Pool is 10.

Here each client sends the message to server, but on 11th message time has elapsed only like 1sec, it can't get the pool and spring integration errors out.

Client side logs

21:27:05.189 ERROR [main][org.springframework.integration.ip.tcp.TcpSendingMessageHandler] Error creating connection
org.springframework.integration.util.PoolItemNotAvailableException: Timed out while waiting to acquire a pool entry.
	at org.springframework.integration.util.SimplePool.getItem(SimplePool.java:202) ~[spring-integration-core-6.4.0-20240830.005918-132.jar:6.4.0-SNAPSHOT]

REPLY RECEIVED --- ECHO: Hello World 0    
REPLY RECEIVED --- ECHO: Hello World 1
REPLY RECEIVED --- ECHO: Hello World 2
REPLY RECEIVED --- ECHO: Hello World 3
REPLY RECEIVED --- ECHO: Hello World 4
REPLY RECEIVED --- ECHO: Hello World 5
REPLY RECEIVED --- ECHO: Hello World 6
REPLY RECEIVED --- ECHO: Hello World 7
REPLY RECEIVED --- ECHO: Hello World 8
REPLY RECEIVED --- ECHO: Hello World 9

Server side logs:

Server listening on port 8080
Client connected
Received: Hello World 0
Client connected
Received: Hello World 1
Client connected
Received: Hello World 2
Client connected
Received: Hello World 3
Client connected
Received: Hello World 4
Client connected
Received: Hello World 5
Client connected
Received: Hello World 6
Client connected
Received: Hello World 7
Client connected
Received: Hello World 8
Client connected
Received: Hello World 9                                   -------------- after this client errored out
Client disconnected                                 
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected

So, i believe these behaviors are correct.

Thanks,
Shirshak

@artembilan
Copy link
Member Author

I’ll look into your application tomorrow, but first scenario feels like without caching wrapper. The seconds one feels like without my recent fix in action.

@artembilan
Copy link
Member Author

Yeah... Something is still off with my fix.
Reopening...

Thank you for quick turn around about testing this stuff!

@artembilan artembilan reopened this Aug 30, 2024
artembilan added a commit that referenced this issue Aug 30, 2024
Related to: #9430

**Auto-cherry-pick to `6.3.x` & `6.2.x`**
spring-builds pushed a commit that referenced this issue Aug 30, 2024
Related to: #9430

(cherry picked from commit 8242ca6)
spring-builds pushed a commit that referenced this issue Aug 30, 2024
Related to: #9430

(cherry picked from commit 8242ca6)
@artembilan
Copy link
Member Author

Fixed via: 8242ca6.

@shirshak55 ,

please, give the latest snapshot one more shot!

@shirshak55
Copy link
Contributor

Hi, Whats the latest SNAPSHOT version. Same 6.4.0-SNAPSHOT?

@artembilan
Copy link
Member Author

Yep! That's the one.

@shirshak55
Copy link
Contributor

shirshak55 commented Aug 30, 2024

@artembilan

Great.

Here is the testing result:

Scenerio 1:
TCP Client: Sends 20 message to TCP Server.
TCP Server: Replies message as soon as it receives.
Pool is 10.

Server Log

Server listening on port 8080
Client connected
Received: Hello World 0
Received: Hello World 1
Received: Hello World 2
Received: Hello World 3
Received: Hello World 4
Received: Hello World 5
Received: Hello World 6
Received: Hello World 7
Received: Hello World 8
Received: Hello World 9
Received: Hello World 10
Received: Hello World 11
Received: Hello World 12
Received: Hello World 13
Received: Hello World 14
Received: Hello World 15
Received: Hello World 16
Received: Hello World 17
Received: Hello World 18
Received: Hello World 19
Received: Hello World 20

Client (Spring Integration)

Starting server...
REPLY RECEIVED --- ECHO: Hello World 0
REPLY RECEIVED --- ECHO: Hello World 1
REPLY RECEIVED --- ECHO: Hello World 2
REPLY RECEIVED --- ECHO: Hello World 3
REPLY RECEIVED --- ECHO: Hello World 4
REPLY RECEIVED --- ECHO: Hello World 5
REPLY RECEIVED --- ECHO: Hello World 6
REPLY RECEIVED --- ECHO: Hello World 7
REPLY RECEIVED --- ECHO: Hello World 8
REPLY RECEIVED --- ECHO: Hello World 9
REPLY RECEIVED --- ECHO: Hello World 10
REPLY RECEIVED --- ECHO: Hello World 11
REPLY RECEIVED --- ECHO: Hello World 12
REPLY RECEIVED --- ECHO: Hello World 13
REPLY RECEIVED --- ECHO: Hello World 14
REPLY RECEIVED --- ECHO: Hello World 15
REPLY RECEIVED --- ECHO: Hello World 16
REPLY RECEIVED --- ECHO: Hello World 17
REPLY RECEIVED --- ECHO: Hello World 18
REPLY RECEIVED --- ECHO: Hello World 19
REPLY RECEIVED --- ECHO: Hello World 20

Now:
Scenerio 2:
TCP Client: Sends 20 message to TCP Server. Sends every 100ms
TCP Server: Replies message as soon as it receives. But we delay sending the response by 3second.
Pool is 10.

TCP Server side Log: (nodejs)

Server listening on port 8080
Client connected
Received: Hello World 0
Client connected
Received: Hello World 1
Client connected
Received: Hello World 2
Client connected
Received: Hello World 3
Client connected
Received: Hello World 4
Client connected
Received: Hello World 5
Client connected
Received: Hello World 6
Client connected
Received: Hello World 7
Client connected
Received: Hello World 8
Client connected
Received: Hello World 9
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected
Client disconnected

Client side log

// Here server is replying only after 3 seconds, so client sends 10 message and gives following error.
16:42:25.959 ERROR [main][org.springframework.integration.ip.tcp.TcpSendingMessageHandler] Error creating connection
org.springframework.integration.util.PoolItemNotAvailableException: Timed out while waiting to acquire a pool entry.
	at org.springframework.integration.util.SimplePool.getItem(SimplePool.java:202) ~[spring-integration-core-6.4.0-20240830.202717-135.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory.obtainConnection(CachingClientConnectionFactory.java:123) ~[spring-integration-ip-6.4.0-20240830.202717-140.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory.getConnection(AbstractClientConnectionFactory.java:120) ~[spring-integration-ip-6.4.0-20240830.202717-140.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory.getConnection(AbstractClientConnectionFactory.java:42) ~[spring-integration-ip-6.4.0-20240830.202717-140.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.obtainConnection(TcpSendingMessageHandler.java:90) [spring-integration-ip-6.4.0-20240830.202717-140.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.doWrite(TcpSendingMessageHandler.java:180) [spring-integration-ip-6.4.0-20240830.202717-140.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageAsClient(TcpSendingMessageHandler.java:150) [spring-integration-ip-6.4.0-20240830.202717-140.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:110) [spring-integration-ip-6.4.0-20240830.202717-140.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) [spring-integration-core-6.4.0-20240830.202717-135.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) [spring-integration-core-6.4.0-20240830.202717-135.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132) [spring-integration-core-6.4.0-20240830.202717-135.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) [spring-integration-core-6.4.0-20240830.202717-135.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) [spring-integration-core-6.4.0-20240830.202717-135.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) [spring-integration-core-6.4.0-20240830.202717-135.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:390) [spring-integration-core-6.4.0-20240830.202717-135.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:334) [spring-integration-core-6.4.0-20240830.202717-135.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:304) [spring-integration-core-6.4.0-20240830.202717-135.jar:6.4.0-SNAPSHOT]
	at org.springframework.integration.samples.tcpclientserver.Main.main(Main.java:23) [classes/:?]
Exception in thread "main" org.springframework.messaging.MessageHandlingException: Failed to obtain a connection in the [bean 'org.springframework.integration.ip.tcp.TcpSendingMessageHandler#0' for component 'outAdapter.client'], failedMessage=GenericMessage [payload=Hello World 10 
, headers={id=588599c3-13b0-eeb5-2d47-f9ac71b53e7c, timestamp=1725050544748}]
	at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.obtainConnection(TcpSendingMessageHandler.java:94)
	at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.doWrite(TcpSendingMessageHandler.java:180)
	at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageAsClient(TcpSendingMessageHandler.java:150)
	at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:110)
	at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
	at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:390)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:334)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:304)
	at org.springframework.integration.samples.tcpclientserver.Main.main(Main.java:23)
Caused by: org.springframework.integration.util.PoolItemNotAvailableException: Timed out while waiting to acquire a pool entry.
	at org.springframework.integration.util.SimplePool.getItem(SimplePool.java:202)
	at org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory.obtainConnection(CachingClientConnectionFactory.java:123)
	at org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory.getConnection(AbstractClientConnectionFactory.java:120)
	at org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory.getConnection(AbstractClientConnectionFactory.java:42)
	at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.obtainConnection(TcpSendingMessageHandler.java:90)
	... 13 more
	
	// After 3 seconds
REPLY RECEIVED --- ECHO: Hello World 0
REPLY RECEIVED --- ECHO: Hello World 1
REPLY RECEIVED --- ECHO: Hello World 2
REPLY RECEIVED --- ECHO: Hello World 3
REPLY RECEIVED --- ECHO: Hello World 4
REPLY RECEIVED --- ECHO: Hello World 5
REPLY RECEIVED --- ECHO: Hello World 6
REPLY RECEIVED --- ECHO: Hello World 7
REPLY RECEIVED --- ECHO: Hello World 8
REPLY RECEIVED --- ECHO: Hello World 9

Let me know if its correct behavior.

Thank you.

@artembilan
Copy link
Member Author

@shirshak55 ,

Thank you for testing this so fast!

Tell me, please, if you receive all the 20 messages in the second scenario.

@shirshak55
Copy link
Contributor

shirshak55 commented Sep 3, 2024 via email

@artembilan
Copy link
Member Author

Oh, right! If you put that onto retry, it will kick them off when old replies are received.

@shirshak55
Copy link
Contributor

@artembilan

Yes, after getting replies it starts to send again. Is there anyway to send those error (pool not found) message to lets say error channel? Afaik inbound adapter provides error channel, outbound doesn't. I can do it at gateway level, but gateway is too far from outbound adapter due to too many service activators etc..

Thanks

@artembilan
Copy link
Member Author

Well, I guess in Java you would do try..catch for those service calls, so see if an ExpressionEvaluatingRequestHandlerAdvice would do the trick for you: https://docs.spring.io/spring-integration/reference/handler-advice/classes.html#expression-advice

@shirshak55
Copy link
Contributor

Yea, i have been doing try catch. I believe this bug is solved. Thanks you for passing me advice link.

I will close the stack overflow also.

Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants