Skip to content

Commit acbc310

Browse files
committed
fix #4201: ensuring okhttp is tolerant to multiple consume calls
also converting jetty to fully non-blocking and adapting the body delivery for jdk for use with sendAsync logic - we need the AsyncBody to be available immediately
1 parent ed64589 commit acbc310

File tree

6 files changed

+139
-86
lines changed

6 files changed

+139
-86
lines changed

httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkHttpClientImpl.java

Lines changed: 59 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,19 @@
2626
import java.net.URI;
2727
import java.net.http.HttpResponse.BodyHandler;
2828
import java.net.http.HttpResponse.BodyHandlers;
29+
import java.net.http.HttpResponse.BodySubscriber;
30+
import java.net.http.HttpResponse.ResponseInfo;
2931
import java.net.http.WebSocketHandshakeException;
3032
import java.nio.ByteBuffer;
3133
import java.util.List;
3234
import java.util.Map;
3335
import java.util.Optional;
3436
import java.util.concurrent.CompletableFuture;
3537
import java.util.concurrent.CompletionException;
38+
import java.util.concurrent.CompletionStage;
3639
import java.util.concurrent.Flow;
3740
import java.util.concurrent.Flow.Subscriber;
3841
import java.util.concurrent.Flow.Subscription;
39-
import java.util.concurrent.atomic.AtomicBoolean;
4042
import java.util.concurrent.atomic.AtomicLong;
4143
import java.util.function.Supplier;
4244

@@ -47,48 +49,80 @@
4749
*/
4850
public class JdkHttpClientImpl implements HttpClient {
4951

52+
/**
53+
* Adapts the BodyHandler to immediately complete the body
54+
*/
55+
private static final class BodyHandlerAdapter implements BodyHandler<AsyncBody> {
56+
private final AsyncBodySubscriber<?> subscriber;
57+
private final BodyHandler<Void> handler;
58+
59+
private BodyHandlerAdapter(AsyncBodySubscriber<?> subscriber, BodyHandler<Void> handler) {
60+
this.subscriber = subscriber;
61+
this.handler = handler;
62+
}
63+
64+
@Override
65+
public BodySubscriber<AsyncBody> apply(ResponseInfo responseInfo) {
66+
BodySubscriber<Void> bodySubscriber = handler.apply(responseInfo);
67+
return new BodySubscriber<AsyncBody>() {
68+
CompletableFuture<AsyncBody> cf = CompletableFuture.completedFuture(subscriber);
69+
70+
@Override
71+
public void onSubscribe(Subscription subscription) {
72+
bodySubscriber.onSubscribe(subscription);
73+
}
74+
75+
@Override
76+
public void onNext(List<ByteBuffer> item) {
77+
bodySubscriber.onNext(item);
78+
}
79+
80+
@Override
81+
public void onError(Throwable throwable) {
82+
bodySubscriber.onError(throwable);
83+
}
84+
85+
@Override
86+
public void onComplete() {
87+
bodySubscriber.onComplete();
88+
}
89+
90+
@Override
91+
public CompletionStage<AsyncBody> getBody() {
92+
return cf;
93+
}
94+
};
95+
}
96+
}
97+
5098
private final class AsyncBodySubscriber<T> implements Subscriber<T>, AsyncBody {
5199
private final BodyConsumer<T> consumer;
52100
private CompletableFuture<Void> done = new CompletableFuture<Void>();
53-
private final AtomicBoolean subscribed = new AtomicBoolean();
54-
private volatile Flow.Subscription subscription;
55-
private T initialItem;
56-
private boolean first = true;
57-
private boolean isComplete;
101+
private CompletableFuture<Flow.Subscription> subscription = new CompletableFuture<>();
58102

59103
private AsyncBodySubscriber(BodyConsumer<T> consumer) {
60104
this.consumer = consumer;
61105
}
62106

63107
@Override
64108
public void onSubscribe(Subscription subscription) {
65-
if (!subscribed.compareAndSet(false, true)) {
109+
if (this.subscription.isDone()) {
66110
subscription.cancel();
67111
return;
68112
}
69-
this.subscription = subscription;
70-
// the sendAsync future won't complete unless we do the initial request here
71-
// so in onNext we'll trap the item until we're ready
72-
subscription.request(1);
113+
this.subscription.complete(subscription);
73114
}
74115

75116
@Override
76117
public void onNext(T item) {
77-
synchronized (this) {
78-
if (first) {
79-
this.initialItem = item;
80-
first = false;
81-
return;
82-
}
83-
}
84118
try {
85119
if (item == null) {
86120
done.complete(null);
87121
} else {
88122
consumer.consume(item, this);
89123
}
90124
} catch (Exception e) {
91-
subscription.cancel();
125+
subscription.thenAccept(Subscription::cancel);
92126
done.completeExceptionally(e);
93127
}
94128
}
@@ -100,10 +134,6 @@ public void onError(Throwable throwable) {
100134

101135
@Override
102136
public synchronized void onComplete() {
103-
if (initialItem != null) {
104-
this.isComplete = true;
105-
return;
106-
}
107137
done.complete(null);
108138
}
109139

@@ -112,19 +142,7 @@ public synchronized void consume() {
112142
if (done.isDone()) {
113143
return;
114144
}
115-
try {
116-
first = false;
117-
if (initialItem != null) {
118-
T item = initialItem;
119-
initialItem = null;
120-
onNext(item);
121-
}
122-
} finally {
123-
if (isComplete) {
124-
done.complete(null);
125-
}
126-
this.subscription.request(1);
127-
}
145+
this.subscription.thenAccept(s -> s.request(1));
128146
}
129147

130148
@Override
@@ -134,7 +152,7 @@ public CompletableFuture<Void> done() {
134152

135153
@Override
136154
public void cancel() {
137-
subscription.cancel();
155+
subscription.thenAccept(Subscription::cancel);
138156
done.cancel(false);
139157
}
140158

@@ -234,7 +252,8 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeLines(HttpRequest reque
234252
return sendAsync(request, () -> {
235253
AsyncBodySubscriber<String> subscriber = new AsyncBodySubscriber<>(consumer);
236254
BodyHandler<Void> handler = BodyHandlers.fromLineSubscriber(subscriber);
237-
return new HandlerAndAsyncBody<>(handler, subscriber);
255+
BodyHandler<AsyncBody> handlerAdapter = new BodyHandlerAdapter(subscriber, handler);
256+
return new HandlerAndAsyncBody<>(handlerAdapter, subscriber);
238257
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
239258
}
240259

@@ -243,7 +262,8 @@ public CompletableFuture<HttpResponse<AsyncBody>> consumeBytes(HttpRequest reque
243262
return sendAsync(request, () -> {
244263
AsyncBodySubscriber<List<ByteBuffer>> subscriber = new AsyncBodySubscriber<>(consumer);
245264
BodyHandler<Void> handler = BodyHandlers.fromSubscriber(subscriber);
246-
return new HandlerAndAsyncBody<>(handler, subscriber);
265+
BodyHandler<AsyncBody> handlerAdapter = new BodyHandlerAdapter(subscriber, handler);
266+
return new HandlerAndAsyncBody<>(handlerAdapter, subscriber);
247267
}).thenApply(r -> new JdkHttpResponseImpl<AsyncBody>(r.response, r.asyncBody));
248268
}
249269

httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyAsyncResponseListener.java

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,37 +15,47 @@
1515
*/
1616
package io.fabric8.kubernetes.client.jetty;
1717

18-
import io.fabric8.kubernetes.client.KubernetesClientException;
1918
import io.fabric8.kubernetes.client.http.HttpClient;
2019
import io.fabric8.kubernetes.client.http.HttpRequest;
2120
import io.fabric8.kubernetes.client.http.HttpResponse;
2221
import org.eclipse.jetty.client.api.Request;
2322
import org.eclipse.jetty.client.api.Response;
2423
import org.eclipse.jetty.client.api.Result;
24+
import org.eclipse.jetty.util.Callback;
2525

2626
import java.nio.ByteBuffer;
2727
import java.util.concurrent.CompletableFuture;
28+
import java.util.function.LongConsumer;
2829

2930
public abstract class JettyAsyncResponseListener<T> extends Response.Listener.Adapter implements HttpClient.AsyncBody {
3031

3132
private final HttpRequest httpRequest;
3233
private final HttpClient.BodyConsumer<T> bodyConsumer;
3334
private final CompletableFuture<HttpResponse<HttpClient.AsyncBody>> asyncResponse;
3435
private final CompletableFuture<Void> asyncBodyDone;
35-
private boolean consume;
36+
private volatile LongConsumer demand;
37+
private boolean initialConsumeCalled;
38+
private Runnable initialConsume;
3639

3740
JettyAsyncResponseListener(HttpRequest httpRequest, HttpClient.BodyConsumer<T> bodyConsumer) {
3841
this.httpRequest = httpRequest;
3942
this.bodyConsumer = bodyConsumer;
4043
asyncResponse = new CompletableFuture<>();
4144
asyncBodyDone = new CompletableFuture<>();
42-
consume = false;
4345
}
4446

4547
@Override
46-
public synchronized void consume() {
47-
consume = true;
48-
this.notifyAll();
48+
public void consume() {
49+
synchronized (this) {
50+
if (!this.initialConsumeCalled) {
51+
this.initialConsumeCalled = true;
52+
if (this.initialConsume != null) {
53+
this.initialConsume.run();
54+
this.initialConsume = null;
55+
}
56+
}
57+
}
58+
demand.accept(1);
4959
}
5060

5161
@Override
@@ -74,21 +84,20 @@ public CompletableFuture<HttpResponse<HttpClient.AsyncBody>> listen(Request requ
7484
}
7585

7686
@Override
77-
public void onContent(Response response, ByteBuffer content) {
78-
try {
79-
synchronized (this) {
80-
while (!consume && !asyncBodyDone.isCancelled()) {
81-
this.wait();
82-
}
83-
}
84-
if (!asyncBodyDone.isCancelled()) {
85-
bodyConsumer.consume(process(response, content), this);
87+
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback) {
88+
synchronized (this) {
89+
if (!initialConsumeCalled) {
90+
// defer until consume is called
91+
this.initialConsume = () -> onContent(response, demand, content, callback);
92+
return;
8693
}
87-
} catch (InterruptedException e) {
88-
Thread.currentThread().interrupt();
89-
throw KubernetesClientException.launderThrowable(e);
94+
this.demand = demand;
95+
}
96+
try {
97+
bodyConsumer.consume(process(response, content), this);
98+
callback.succeeded();
9099
} catch (Exception e) {
91-
throw KubernetesClientException.launderThrowable(e);
100+
callback.failed(e);
92101
}
93102
}
94103

httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpClientImpl.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,16 +72,21 @@ public void consume() {
7272
// consume should not block from a callers perspective
7373
try {
7474
httpClient.dispatcher().executorService().execute(() -> {
75-
try {
76-
if (!source.exhausted() && !done.isDone()) {
77-
T value = process(source);
78-
consumer.consume(value, this);
79-
} else {
80-
done.complete(null);
75+
// we must serialize multiple consumes as source is not thread-safe
76+
// it would be better to use SerialExecutor, but that would need to move modules, as to
77+
// potentially not hold multiple executor threads
78+
synchronized (source) {
79+
try {
80+
if (!source.exhausted() && !done.isDone()) {
81+
T value = process(source);
82+
consumer.consume(value, this);
83+
} else {
84+
done.complete(null);
85+
}
86+
} catch (Exception e) {
87+
Utils.closeQuietly(source);
88+
done.completeExceptionally(e);
8189
}
82-
} catch (Exception e) {
83-
Utils.closeQuietly(source);
84-
done.completeExceptionally(e);
8590
}
8691
});
8792
} catch (Exception e) {

httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,15 @@
3131
import java.io.InputStream;
3232
import java.io.Reader;
3333
import java.net.URI;
34+
import java.nio.ByteBuffer;
35+
import java.nio.charset.StandardCharsets;
3436
import java.util.ArrayList;
3537
import java.util.Arrays;
3638
import java.util.concurrent.CompletableFuture;
3739
import java.util.concurrent.CountDownLatch;
3840
import java.util.concurrent.ExecutionException;
3941
import java.util.concurrent.TimeUnit;
42+
import java.util.concurrent.atomic.AtomicInteger;
4043

4144
import static org.assertj.core.api.Assertions.assertThat;
4245
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -53,8 +56,8 @@ protected HttpClient.Factory getHttpClientFactory() {
5356
return new OkHttpClientFactory();
5457
}
5558

56-
private KubernetesMockServer server;
57-
private KubernetesClient client;
59+
KubernetesMockServer server;
60+
KubernetesClient client;
5861

5962
@BeforeEach
6063
void setUp() {
@@ -148,15 +151,17 @@ public void onMessage(WebSocket webSocket, String text) {
148151

149152
@Test
150153
void testAsyncBody() throws Exception {
151-
server.expect().withPath("/async").andReturn(200, "hello world").always();
154+
int byteCount = 20000;
155+
server.expect().withPath("/async").andReturn(200, new String(new byte[byteCount], StandardCharsets.UTF_8)).always();
152156

153-
CompletableFuture<Boolean> consumed = new CompletableFuture<>();
157+
CompletableFuture<Integer> consumed = new CompletableFuture<>();
158+
AtomicInteger total = new AtomicInteger();
154159

155160
CompletableFuture<HttpResponse<AsyncBody>> responseFuture = client.getHttpClient().consumeBytes(
156161
client.getHttpClient().newHttpRequestBuilder().uri(URI.create(client.getConfiguration().getMasterUrl() + "async"))
157162
.build(),
158163
(value, asyncBody) -> {
159-
consumed.complete(true);
164+
value.stream().map(ByteBuffer::remaining).forEach(total::addAndGet);
160165
asyncBody.consume();
161166
});
162167

@@ -169,15 +174,14 @@ void testAsyncBody() throws Exception {
169174
r.body().done().whenComplete((v, ex) -> {
170175
if (ex != null) {
171176
consumed.completeExceptionally(ex);
172-
}
173-
if (v != null) {
174-
consumed.complete(false);
177+
} else {
178+
consumed.complete(total.get());
175179
}
176180
});
177181
}
178182
});
179183

180-
assertTrue(consumed.get(5, TimeUnit.SECONDS));
184+
assertEquals(byteCount, consumed.get(5, TimeUnit.SECONDS));
181185
}
182186

183187
@Test
@@ -219,14 +223,15 @@ void testConsumeLines() throws Exception {
219223
@ParameterizedTest(name = "{index}: {0}")
220224
@ValueSource(classes = { String.class, byte[].class, Reader.class, InputStream.class })
221225
void testSupportedTypes(Class<?> type) throws Exception {
222-
server.expect().withPath("/type").andReturn(200, "hello world").always();
226+
String value = new String(new byte[16384]);
227+
server.expect().withPath("/type").andReturn(200, value).always();
223228
final HttpResponse<?> result = client.getHttpClient()
224229
.sendAsync(client.getHttpClient().newHttpRequestBuilder()
225230
.uri(URI.create(client.getConfiguration().getMasterUrl() + "type")).build(), type)
226231
.get(10, TimeUnit.SECONDS);
227232
assertThat(result)
228233
.satisfies(r -> assertThat(r.body()).isInstanceOf(type))
229-
.satisfies(r -> assertThat(r.bodyString()).isEqualTo("hello world"));
234+
.satisfies(r -> assertThat(r.bodyString()).isEqualTo(value));
230235
}
231236

232237
}

0 commit comments

Comments
 (0)