From 3449c8462e39c9580cbcae32c8f784fb232c47b9 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 20 Nov 2023 13:49:14 +0100 Subject: [PATCH] Terminate request queue on connection error first before terminating response handlers. Terminating the request queue will ensure that no new requests will be accepted and thus that we could miss a response handler to be terminated. [#245] Signed-off-by: Mark Paluch --- .../mssql/client/ReactorNettyClient.java | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/src/main/java/io/r2dbc/mssql/client/ReactorNettyClient.java b/src/main/java/io/r2dbc/mssql/client/ReactorNettyClient.java index 5936bb51..a0140042 100644 --- a/src/main/java/io/r2dbc/mssql/client/ReactorNettyClient.java +++ b/src/main/java/io/r2dbc/mssql/client/ReactorNettyClient.java @@ -36,12 +36,7 @@ import io.r2dbc.mssql.message.header.PacketIdProvider; import io.r2dbc.mssql.message.tds.ProtocolException; import io.r2dbc.mssql.message.tds.Redirect; -import io.r2dbc.mssql.message.token.AbstractDoneToken; -import io.r2dbc.mssql.message.token.AbstractInfoToken; -import io.r2dbc.mssql.message.token.Attention; -import io.r2dbc.mssql.message.token.EnvChangeToken; -import io.r2dbc.mssql.message.token.FeatureExtAckToken; -import io.r2dbc.mssql.message.token.LoginAckToken; +import io.r2dbc.mssql.message.token.*; import io.r2dbc.mssql.message.type.Collation; import io.r2dbc.mssql.util.Assert; import io.r2dbc.spi.R2dbcException; @@ -50,11 +45,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoSink; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.SynchronousSink; +import reactor.core.publisher.*; import reactor.netty.Connection; import reactor.netty.NettyOutbound; import reactor.netty.resources.ConnectionProvider; @@ -367,7 +358,8 @@ private Object encodeForSend(ClientMessage message) { @SuppressWarnings("unchecked") private Mono resumeError(Throwable throwable) { - handleConnectionError(throwable); + logger.error(this.context.getMessage("Error: {}"), throwable.getMessage(), throwable); + this.requestSink.emitComplete((signalType, emitResult) -> { if (emitResult.isFailure()) { @@ -377,8 +369,7 @@ private Mono resumeError(Throwable throwable) { return false; }); - logger.error(this.context.getMessage("Error: {}"), throwable.getMessage(), throwable); - + handleConnectionError(throwable); return (Mono) close(); }