@@ -417,46 +417,63 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
417417 .map (bodyByteBuf -> leakDetectionDebuggingEnabled
418418 ? bodyByteBuf .retain ().touch (this )
419419 : bodyByteBuf .retain ())
420- .publishOn (CosmosSchedulers .TRANSPORT_RESPONSE_BOUNDED_ELASTIC );
420+ .publishOn (CosmosSchedulers .TRANSPORT_RESPONSE_BOUNDED_ELASTIC )
421+ .doOnDiscard (ByteBuf .class , buf -> {
422+ if (buf .refCnt () > 0 ) {
423+ // there could be a race with the catch in the .map operator below
424+ // so, use safeRelease
425+ io .netty .util .ReferenceCountUtil .safeRelease (buf );
426+ }
427+ });
421428
422429 return contentObservable
423430 .map (content -> {
424431 if (leakDetectionDebuggingEnabled ) {
425432 content .touch (this );
426433 }
427434
428- // Capture transport client request timeline
429- ReactorNettyRequestRecord reactorNettyRequestRecord = httpResponse .request ().reactorNettyRequestRecord ();
430- if (reactorNettyRequestRecord != null ) {
431- reactorNettyRequestRecord .setTimeCompleted (Instant .now ());
432- }
435+ try {
436+ // Capture transport client request timeline
437+ ReactorNettyRequestRecord reactorNettyRequestRecord = httpResponse .request ().reactorNettyRequestRecord ();
438+ if (reactorNettyRequestRecord != null ) {
439+ reactorNettyRequestRecord .setTimeCompleted (Instant .now ());
440+ }
433441
434- StoreResponse rsp = request
435- .getEffectiveHttpTransportSerializer (this )
436- .unwrapToStoreResponse (httpRequest .uri ().toString (), request , httpResponseStatus , httpResponseHeaders , content );
442+ StoreResponse rsp = request
443+ .getEffectiveHttpTransportSerializer (this )
444+ .unwrapToStoreResponse (httpRequest .uri ().toString (), request , httpResponseStatus , httpResponseHeaders , content );
437445
438- if (reactorNettyRequestRecord != null ) {
439- rsp .setRequestTimeline (reactorNettyRequestRecord .takeTimelineSnapshot ());
446+ if (reactorNettyRequestRecord != null ) {
447+ rsp .setRequestTimeline (reactorNettyRequestRecord .takeTimelineSnapshot ());
440448
441- if (this .gatewayServerErrorInjector != null ) {
442- // only configure when fault injection is used
443- rsp .setFaultInjectionRuleId (
444- request
445- .faultInjectionRequestContext
446- .getFaultInjectionRuleId (reactorNettyRequestRecord .getTransportRequestId ()));
449+ if (this .gatewayServerErrorInjector != null ) {
450+ // only configure when fault injection is used
451+ rsp .setFaultInjectionRuleId (
452+ request
453+ .faultInjectionRequestContext
454+ .getFaultInjectionRuleId (reactorNettyRequestRecord .getTransportRequestId ()));
447455
448- rsp .setFaultInjectionRuleEvaluationResults (
449- request
450- .faultInjectionRequestContext
451- .getFaultInjectionRuleEvaluationResults (reactorNettyRequestRecord .getTransportRequestId ()));
456+ rsp .setFaultInjectionRuleEvaluationResults (
457+ request
458+ .faultInjectionRequestContext
459+ .getFaultInjectionRuleEvaluationResults (reactorNettyRequestRecord .getTransportRequestId ()));
460+ }
452461 }
453- }
454462
455- if (request .requestContext .cosmosDiagnostics != null ) {
456- BridgeInternal .recordGatewayResponse (request .requestContext .cosmosDiagnostics , request , rsp , globalEndpointManager );
457- }
463+ if (request .requestContext .cosmosDiagnostics != null ) {
464+ BridgeInternal .recordGatewayResponse (request .requestContext .cosmosDiagnostics , request , rsp , globalEndpointManager );
465+ }
458466
459- return rsp ;
467+ return rsp ;
468+ } catch (Throwable t ) {
469+ if (content .refCnt () > 0 ) {
470+ // Unwrap failed before StoreResponse took ownership -> release our retain
471+ // there could be a race with the doOnDiscard above - so, use safeRelease
472+ io .netty .util .ReferenceCountUtil .safeRelease (content );
473+ }
474+
475+ throw t ;
476+ }
460477 })
461478 .single ();
462479
0 commit comments