Skip to content

Commit 45d605b

Browse files
committed
fix #4201: changing how the jetty session is set
adding missing test timeouts correcting the wait in HttpClientReadableByteChannel
1 parent c741a07 commit 45d605b

File tree

5 files changed

+13
-14
lines changed

5 files changed

+13
-14
lines changed

httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocket.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class JettyWebSocket implements WebSocket, WebSocketListener {
3636
private final Condition backPressure;
3737
private final AtomicBoolean closed;
3838
private boolean moreMessages;
39-
private Session webSocketSession;
39+
private volatile Session webSocketSession;
4040

4141
public JettyWebSocket(WebSocket.Listener listener) {
4242
this.listener = listener;
@@ -115,6 +115,7 @@ public void onWebSocketClose(int statusCode, String reason) {
115115

116116
@Override
117117
public void onWebSocketConnect(Session session) {
118+
this.webSocketSession = session;
118119
listener.onOpen(this);
119120
}
120121

@@ -133,11 +134,6 @@ public void onWebSocketError(Throwable cause) {
133134
listener.onError(this, cause);
134135
}
135136

136-
public JettyWebSocket setWebSocketSession(Session webSocketSession) {
137-
this.webSocketSession = webSocketSession;
138-
return this;
139-
}
140-
141137
private void backPressure() {
142138
try {
143139
lock.lock();

httpclient-jetty/src/main/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public CompletableFuture<WebSocket> buildAsync(WebSocket.Listener listener) {
5858
final CompletableFuture<WebSocket> future = new CompletableFuture<>();
5959
final var webSocket = new JettyWebSocket(listener);
6060
return webSocketClient.connect(webSocket, Objects.requireNonNull(WebSocket.toWebSocketUri(getUri())), cur)
61-
.thenApply(webSocket::setWebSocketSession)
61+
.thenApply(s -> webSocket)
6262
.exceptionally(ex -> {
6363
if (ex instanceof CompletionException && ex.getCause() instanceof UpgradeException) {
6464
future.completeExceptionally(toHandshakeException((UpgradeException) ex.getCause()));

httpclient-jetty/src/test/java/io/fabric8/kubernetes/client/jetty/JettyWebSocketTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ void sendCloseWhenConnectionIsOpen() {
182182
// Given
183183
final var jws = new JettyWebSocket(new Listener());
184184
final var session = mock(Session.class);
185-
jws.setWebSocketSession(session);
185+
jws.onWebSocketConnect(session);
186186
when(session.isOpen()).thenReturn(true);
187187
// When
188188
jws.sendClose(1000, "Closing");
@@ -196,7 +196,7 @@ void sendCloseIgnoredWhenConnectionIsClosed() {
196196
// Given
197197
final var jws = new JettyWebSocket(new Listener());
198198
final var session = mock(Session.class);
199-
jws.setWebSocketSession(session);
199+
jws.onWebSocketConnect(session);
200200
when(session.isOpen()).thenReturn(false);
201201
// When
202202
jws.sendClose(1000, "Closing");
@@ -210,7 +210,7 @@ void sendCloseIgnoredWhenAlreadyClosed() {
210210
// Given
211211
final var jws = new JettyWebSocket(new Listener());
212212
final var session = mock(Session.class);
213-
jws.setWebSocketSession(session);
213+
jws.onWebSocketConnect(session);
214214
when(session.isOpen()).thenReturn(true);
215215
jws.sendClose(1000, "Closing");
216216
// When
@@ -226,7 +226,7 @@ void sendIncreasesQueueSize() {
226226
// Given
227227
final var jws = new JettyWebSocket(new Listener());
228228
final var session = mock(Session.class, RETURNS_DEEP_STUBS);
229-
jws.setWebSocketSession(session);
229+
jws.onWebSocketConnect(session);
230230
when(session.isOpen()).thenReturn(true);
231231
// When
232232
jws.send(ByteBuffer.wrap(new byte[] { 1, 3, 3, 7 }));

httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ void testWebsocketHandshakeFailure() {
8787

8888
assertThrows(WebSocketHandshakeException.class, () -> {
8989
try {
90-
startedFuture.get();
90+
startedFuture.get(10, TimeUnit.SECONDS);
9191
} catch (ExecutionException e) {
9292
throw e.getCause();
9393
}
@@ -144,7 +144,7 @@ public void onMessage(WebSocket webSocket, String text) {
144144
assertFalse(latch.await(10, TimeUnit.SECONDS));
145145
assertEquals(1, latch.getCount());
146146

147-
startedFuture.get().request();
147+
startedFuture.get(10, TimeUnit.SECONDS).request();
148148

149149
assertTrue(latch.await(10, TimeUnit.SECONDS));
150150
}

kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/HttpClientReadableByteChannel.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,13 @@ public synchronized int read(ByteBuffer arg0) throws IOException {
106106
consumeRequested = true;
107107
this.asyncBody.consume();
108108
// the consume call may actually trigger result deliver
109-
// if it did, then just start the loop over
109+
// if it did, then just start the loop over or be done
110110
if (!consumeRequested) {
111111
continue;
112112
}
113+
if (done) {
114+
return -1;
115+
}
113116
}
114117
try {
115118
this.wait(); // block until more buffers are delivered

0 commit comments

Comments
 (0)