Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An autonomous transaction cancellation by a reactive chain timeout operator results in r2dbc connections leakage #32115

Open
evgenyvsmirnov opened this issue Jan 25, 2024 · 5 comments
Labels
in: data Issues in data modules (jdbc, orm, oxm, tx) status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged or decided on

Comments

@evgenyvsmirnov
Copy link

Greetings!
Please consider the test below (reactor + r2dbc-postgresql + r2dbc-pool). A reactive chain within the test begins a transaction (outer), inserts a record, begins an autonomous transaction (inner). Inner attempts to insert a record with the same data thus being unable to proceed (outer is still in progress) is cancelled by the reactive chain timeout operator:

import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.RepeatedTest
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.r2dbc.core.DatabaseClient
import org.springframework.transaction.ReactiveTransactionManager
import org.springframework.transaction.TransactionDefinition
import org.springframework.transaction.reactive.TransactionalOperator
import org.springframework.transaction.support.DefaultTransactionDefinition
import reactor.core.publisher.Mono
import java.time.Duration
import java.util.*
import java.util.concurrent.TimeoutException

class TimeoutsTest : R2dbcRepositoryTestBase() {

    @Autowired
    private lateinit var databaseClient: DatabaseClient

    @Autowired
    private lateinit var transactionManager: ReactiveTransactionManager

    //@Test
    @RepeatedTest(value = 30, failureThreshold = 1)
    fun testTimeout() {
	// Jdbc is used for the sake of DB initialization only .
        SERVER.getJdbcOperations()
            ?.execute(
                """
                    DROP TABLE IF EXISTS users_TimeoutsTest;

                    CREATE TABLE IF NOT EXISTS users_TimeoutsTest (
                         userId SERIAL PRIMARY KEY,
                         userName VARCHAR NOT NULL,
                         email VARCHAR NOT NULL,
                         CONSTRAINT unique_user_TimeoutsTest UNIQUE (userName),
                         CONSTRAINT unique_email_TimeoutsTest UNIQUE (email)
                    );
                """.trimIndent()
            )

        val outerTransactionalOperator = TransactionalOperator.create(
            transactionManager,
            DefaultTransactionDefinition() .apply {
                isolationLevel = TransactionDefinition.ISOLATION_READ_COMMITTED
            }
        )

        val result: Mono<String> =
            databaseClient.sql("INSERT INTO users_TimeoutsTest (userId, userName, email) VALUES (1000, 'test', 'test@example.com') RETURNING userId")
                .map { row, _ -> row.get(0).toString() }
                .one()
                .flatMap {
                    val innerTransactionalOperator = TransactionalOperator.create(
                        transactionManager,
                        DefaultTransactionDefinition() .apply {
                            isolationLevel = TransactionDefinition.ISOLATION_REPEATABLE_READ
                            propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRES_NEW
                        }
                    )
                    databaseClient.sql("INSERT INTO users_TimeoutsTest (userId, userName, email) VALUES (1001, 'test', 'test@example.com') RETURNING userId")
                        .map { row, _ -> row.get(0).toString() }
                        .one()
                        .`as`(innerTransactionalOperator::transactional)
                        .timeout(Duration.ofSeconds(5))
                        .onErrorResume(TimeoutException::class.java) { Mono.just("Inner failure") }
                }
                .`as`(outerTransactionalOperator::transactional)
                .onErrorResume {
                    databaseClient.sql("SELECT COUNT(*) FROM users_TimeoutsTest WHERE userId=1000")
                        .map { row, _ -> row.get(0) }
                        .one()
                        .map { if (it == 1) "Inner failure" else "Outer failure" }
                }

        assertEquals("Inner failure", result.block())
    }
}

Environment:

  1. springframework: 6.1.2
  2. reactor-core: 3.6.1
  3. r2dbc-pool: 1.0.1
  4. r2dbc-postgresql: 1.0.3

The outcomes:

  1. Always see the following stacktrace:
java.lang.IllegalStateException: No value for key [ConnectionPool[PostgreSQL]] bound to context
	at org.springframework.transaction.reactive.TransactionSynchronizationManager.unbindResource(TransactionSynchronizationManager.java:151) ~[spring-tx-6.1.2.jar:6.1.2]
	at org.springframework.r2dbc.connection.R2dbcTransactionManager.lambda$doCleanupAfterCompletion$14(R2dbcTransactionManager.java:350) ~[spring-r2dbc-6.1.2.jar:6.1.2]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:45) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:264) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:264) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:279) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.Operators.error(Operators.java:198) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:49) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:264) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89) ~[reactor-core-3.6.1.jar:3.6.1]
	at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:104) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:223) ~[reactor-core-3.6.1.jar:3.6.1]
	at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:104) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:465) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:871) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:819) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:249) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:215) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:206) ~[reactor-core-3.6.1.jar:3.6.1]
	at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:668) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:934) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:810) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:716) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.6.1.jar:3.6.1]
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294) ~[reactor-netty-core-1.1.14.jar:1.1.14]
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403) ~[reactor-netty-core-1.1.14.jar:1.1.14]
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426) ~[reactor-netty-core-1.1.14.jar:1.1.14]
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.14.jar:1.1.14]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
	at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
  1. the exchange with the database is the following :
        372     3.019813    127.0.0.1   56603   127.0.0.1   32799   PGSQL   110 >Q ----------> BEGIN ISOLATION LEVEL READ COMMITTED, READ WRITE
        373     3.019838    127.0.0.1   32799   127.0.0.1   56603   TCP 56  32799 → 56603 [ACK] Seq=1 Ack=55 Win=6373 Len=0 TSval=3610326549 TSecr=3240910966
        386     3.034130    127.0.0.1   32799   127.0.0.1   56603   PGSQL   73  <C/Z
        387     3.034157    127.0.0.1   56603   127.0.0.1   32799   TCP 56  56603 → 32799 [ACK] Seq=55 Ack=18 Win=6370 Len=0 TSval=3240910980 TSecr=3610326563
        434     3.038892    127.0.0.1   56603   127.0.0.1   32799   PGSQL   177 >Q ----------> INSERT INTO users_TimeoutsTest (userId, userName, email) VALUES (1000, 'test', 'test@example.com') RETURNING userId
        435     3.038927    127.0.0.1   32799   127.0.0.1   56603   TCP 56  32799 → 56603 [ACK] Seq=18 Ack=176 Win=6372 Len=0 TSval=3610326568 TSecr=3240910985
        460     3.064530    127.0.0.1   32799   127.0.0.1   56603   PGSQL   125 <T/D/C/Z
        461     3.064544    127.0.0.1   56603   127.0.0.1   32799   TCP 56  56603 → 32799 [ACK] Seq=176 Ack=87 Win=6369 Len=0 TSval=3240911010 TSecr=3610326593
        711     3.093204    127.0.0.1   56601   127.0.0.1   32799   PGSQL   111 >Q ----------> BEGIN ISOLATION LEVEL REPEATABLE READ, READ WRITE (another connection)
        712     3.093236    127.0.0.1   32799   127.0.0.1   56601   TCP 56  32799 → 56601 [ACK] Seq=1 Ack=56 Win=6373 Len=0 TSval=200857867 TSecr=298528505
        729     3.104023    127.0.0.1   32799   127.0.0.1   56601   PGSQL   73  <C/Z
        730     3.104031    127.0.0.1   56601   127.0.0.1   32799   TCP 56  56601 → 32799 [ACK] Seq=56 Ack=18 Win=6370 Len=0 TSval=298528516 TSecr=200857878
        731     *REF*       127.0.0.1   56601   127.0.0.1   32799   PGSQL   177 >Q ----------> INSERT INTO users_TimeoutsTest (userId, userName, email) VALUES (1001, 'test', 'test@example.com') RETURNING userId
        732     0.000010    127.0.0.1   32799   127.0.0.1   56601   TCP 56  32799 → 56601 [ACK] Seq=18 Ack=177 Win=6372 Len=0 TSval=200857878 TSecr=298528516
        1155    5.006252    127.0.0.1   56601   127.0.0.1   32799   PGSQL   70  >Q ----------> ROLLBACK (inner)
        1156    5.006272    127.0.0.1   32799   127.0.0.1   56601   TCP 56  32799 → 56601 [ACK] Seq=18 Ack=191 Win=6371 Len=0 TSval=200862884 TSecr=298533522
        1179    5.017974    127.0.0.1   56603   127.0.0.1   32799   PGSQL   68  >Q ----------> COMMIT (outer)
        1180    5.017995    127.0.0.1   32799   127.0.0.1   56603   TCP 56  32799 → 56603 [ACK] Seq=87 Ack=188 Win=6371 Len=0 TSval=3610331651 TSecr=3240916068
        1186    5.047064    127.0.0.1   32799   127.0.0.1   56603   PGSQL   74  <C/Z
        1187    5.047120    127.0.0.1   56603   127.0.0.1   32799   TCP 56  56603 → 32799 [ACK] Seq=188 Ack=105 Win=6369 Len=0 TSval=3240916098 TSecr=3610331681
        1190    5.050271    127.0.0.1   32799   127.0.0.1   56601   PGSQL   289 <E/Z
        1191    5.050313    127.0.0.1   56601   127.0.0.1   32799   TCP 56  56601 → 32799 [ACK] Seq=191 Ack=251 Win=6367 Len=0 TSval=298533567 TSecr=200862929
        1204    5.054634    127.0.0.1   32799   127.0.0.1   56601   PGSQL   76  <C/Z
  1. Once in a while the test fails: the chain produces «Outer failure» (some concurrency issue?)
  2. The last and the worst: Should the RepeatedTest is used the test hangs after POOL_SIZE repetitions because a connection can’t be gained from the pool. The screenshot with the state of the pool below suggests that one of two connections is not returned into the pool during every test run.
    Снимок экрана 2024-01-25 в 14 09 41
@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged or decided on label Jan 25, 2024
@jhoeller jhoeller added the in: data Issues in data modules (jdbc, orm, oxm, tx) label Feb 1, 2024
@sdeleuze sdeleuze self-assigned this Feb 13, 2024
@sdeleuze
Copy link
Contributor

Can you please provide a reproducer as an attached archive or a link to a repository? If a database is needed, please provide a docker-compose.yml.

@sdeleuze sdeleuze added the status: waiting-for-feedback We need additional information before we can continue label Feb 13, 2024
@evgenyvsmirnov
Copy link
Author

gh32115.zip
The attached archive contains the aforementioned test. The test employs testcontainers (postgresql 15.5). A short summary in the test's displayname summarizes its logic.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Feb 15, 2024
@pkgonan
Copy link

pkgonan commented May 14, 2024

@evgenyvsmirnov @sdeleuze
I have same problem. How did you solve this?

@pkgonan
Copy link

pkgonan commented May 14, 2024

I think I'm having this problem because I'm using Propagation.REQUIRES_NEW.

@transactional(propagation = Propagation.REQUIRES_NEW)

@evgenyvsmirnov
Copy link
Author

@evgenyvsmirnov @sdeleuze I have same problem. How did you solve this?

I haven't – I've just reported it.

@sdeleuze sdeleuze removed their assignment Jun 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: data Issues in data modules (jdbc, orm, oxm, tx) status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged or decided on
Projects
None yet
Development

No branches or pull requests

5 participants