11/*
2- * Copyright (c) 2015, 2024 , Oracle and/or its affiliates. All rights reserved.
2+ * Copyright (c) 2015, 2025 , Oracle and/or its affiliates. All rights reserved.
33 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44 *
55 * This code is free software; you can redistribute it and/or modify it
3333import java .util .concurrent .Executor ;
3434import java .util .concurrent .TimeUnit ;
3535import java .util .concurrent .atomic .AtomicInteger ;
36+ import java .util .concurrent .atomic .AtomicReference ;
3637import java .util .function .Function ;
3738import java .net .http .HttpClient ;
3839import java .net .http .HttpResponse ;
3940import java .net .http .HttpTimeoutException ;
4041
42+ import jdk .internal .net .http .HttpClientImpl .DelegatingExecutor ;
4143import jdk .internal .net .http .common .Logger ;
4244import jdk .internal .net .http .common .MinimalFuture ;
4345import jdk .internal .net .http .common .Utils ;
@@ -63,9 +65,9 @@ final class Exchange<T> {
6365
6466 // used to record possible cancellation raised before the exchImpl
6567 // has been established.
66- private volatile IOException failed ;
68+ private final AtomicReference < IOException > failed = new AtomicReference <>() ;
6769 final MultiExchange <T > multi ;
68- final Executor parentExecutor ;
70+ final DelegatingExecutor parentExecutor ;
6971 volatile boolean upgrading ; // to HTTP/2
7072 volatile boolean upgraded ; // to HTTP/2
7173 final PushGroup <T > pushGroup ;
@@ -91,7 +93,7 @@ PushGroup<T> getPushGroup() {
9193 return pushGroup ;
9294 }
9395
94- Executor executor () {
96+ DelegatingExecutor executor () {
9597 return parentExecutor ;
9698 }
9799
@@ -236,26 +238,26 @@ public void cancel(IOException cause) {
236238 // If the impl is non null, propagate the exception right away.
237239 // Otherwise record it so that it can be propagated once the
238240 // exchange impl has been established.
239- ExchangeImpl <?> impl = exchImpl ;
241+ ExchangeImpl <?> impl ;
242+ IOException closeReason = null ;
243+ synchronized (this ) {
244+ impl = exchImpl ;
245+ if (impl == null ) {
246+ // no impl yet. record the exception
247+ failed .compareAndSet (null , cause );
248+ }
249+ }
240250 if (impl != null ) {
241251 // propagate the exception to the impl
242252 if (debug .on ()) debug .log ("Cancelling exchImpl: %s" , exchImpl );
243253 impl .cancel (cause );
244254 } else {
245- // no impl yet. record the exception
246- IOException failed = this .failed ;
247- if (failed == null ) {
248- synchronized (this ) {
249- failed = this .failed ;
250- if (failed == null ) {
251- failed = this .failed = cause ;
252- }
253- }
254- }
255-
256- // abort/close the connection if setting up the exchange. This can
255+ // abort/close the connection if setting up the exchange. This can
257256 // be important when setting up HTTP/2
258- connectionAborter .closeConnection (failed );
257+ closeReason = failed .get ();
258+ if (closeReason != null ) {
259+ connectionAborter .closeConnection (closeReason );
260+ }
259261
260262 // now call checkCancelled to recheck the impl.
261263 // if the failed state is set and the impl is not null, reset
@@ -274,9 +276,9 @@ private void checkCancelled() {
274276 ExchangeImpl <?> impl = null ;
275277 IOException cause = null ;
276278 CompletableFuture <? extends ExchangeImpl <T >> cf = null ;
277- if (failed != null ) {
279+ if (failed . get () != null ) {
278280 synchronized (this ) {
279- cause = failed ;
281+ cause = failed . get () ;
280282 impl = exchImpl ;
281283 cf = exchangeCF ;
282284 }
@@ -286,7 +288,11 @@ private void checkCancelled() {
286288 // The exception is raised by propagating it to the impl.
287289 if (debug .on ()) debug .log ("Cancelling exchImpl: %s" , impl );
288290 impl .cancel (cause );
289- failed = null ;
291+ synchronized (this ) {
292+ if (impl == exchImpl ) {
293+ failed .compareAndSet (cause , null );
294+ }
295+ }
290296 } else {
291297 Log .logTrace ("Exchange: request [{0}/timeout={1}ms] no impl is set."
292298 + "\n \t Can''t cancel yet with {2}" ,
@@ -313,7 +319,7 @@ <U> CompletableFuture<U> checkCancelled(CompletableFuture<U> cf, HttpConnection
313319 if (t == null ) t = new IOException ("Request cancelled" );
314320 if (debug .on ()) debug .log ("exchange cancelled during connect: " + t );
315321 try {
316- connection .close ();
322+ connection .close (t );
317323 } catch (Throwable x ) {
318324 if (debug .on ()) debug .log ("Failed to close connection" , x );
319325 }
@@ -330,8 +336,13 @@ public void h2Upgrade() {
330336 request .setH2Upgrade (this );
331337 }
332338
339+ synchronized IOException failed (IOException io ) {
340+ IOException cause = failed .compareAndExchange (null , io );
341+ return cause == null ? io : cause ;
342+ }
343+
333344 synchronized IOException getCancelCause () {
334- return failed ;
345+ return failed . get () ;
335346 }
336347
337348 // get/set the exchange impl, solving race condition issues with
@@ -409,6 +420,11 @@ private CompletableFuture<Response> checkFor407(ExchangeImpl<T> ex, Throwable t,
409420 }
410421 }
411422
423+ private CompletableFuture <Response > startSendingBody (DelegatingExecutor executor ) {
424+ return exchImpl .sendBodyAsync ()
425+ .thenCompose (exIm -> exIm .getResponseAsync (executor ));
426+ }
427+
412428 // After sending the request headers, if no ProxyAuthorizationRequired
413429 // was raised and the expectContinue flag is on, we need to wait
414430 // for the 100-Continue response
@@ -430,9 +446,7 @@ private CompletableFuture<Response> expectContinue(ExchangeImpl<T> ex) {
430446 if (debug .on ())
431447 debug .log ("Setting ExpectTimeoutRaised and sending request body" );
432448 exchImpl .setExpectTimeoutRaised ();
433- CompletableFuture <Response > cf =
434- exchImpl .sendBodyAsync ()
435- .thenCompose (exIm -> exIm .getResponseAsync (parentExecutor ));
449+ CompletableFuture <Response > cf = startSendingBody (parentExecutor );
436450 cf = wrapForUpgrade (cf );
437451 cf = wrapForLog (cf );
438452 return cf ;
@@ -444,9 +458,7 @@ private CompletableFuture<Response> expectContinue(ExchangeImpl<T> ex) {
444458 nonFinalResponses .incrementAndGet ();
445459 Log .logTrace ("Received 100-Continue: sending body" );
446460 if (debug .on ()) debug .log ("Received 100-Continue for %s" , r1 );
447- CompletableFuture <Response > cf =
448- exchImpl .sendBodyAsync ()
449- .thenCompose (exIm -> exIm .getResponseAsync (parentExecutor ));
461+ CompletableFuture <Response > cf = startSendingBody (parentExecutor );
450462 cf = wrapForUpgrade (cf );
451463 cf = wrapForLog (cf );
452464 return cf ;
@@ -471,8 +483,7 @@ private CompletableFuture<Response> expectContinue(ExchangeImpl<T> ex) {
471483 private CompletableFuture <Response > sendRequestBody (ExchangeImpl <T > ex ) {
472484 assert !request .expectContinue ();
473485 if (debug .on ()) debug .log ("sendRequestBody" );
474- CompletableFuture <Response > cf = ex .sendBodyAsync ()
475- .thenCompose (exIm -> exIm .getResponseAsync (parentExecutor ));
486+ CompletableFuture <Response > cf = startSendingBody (parentExecutor );
476487 cf = wrapForUpgrade (cf );
477488 // after 101 is handled we check for other 1xx responses
478489 cf = cf .thenCompose (this ::ignore1xxResponse );
@@ -669,7 +680,7 @@ HttpResponse.BodySubscriber<T> ignoreBody(HttpResponse.ResponseInfo hdrs) {
669680 // Either way, we need to relay it to s.
670681 synchronized (this ) {
671682 exchImpl = s ;
672- t = failed ;
683+ t = failed . get () ;
673684 }
674685 // Check whether the HTTP/1.1 was cancelled.
675686 if (t == null ) t = e .getCancelCause ();
0 commit comments