diff --git a/mockwebserver/src/main/java/okhttp3/mockwebserver/MockResponse.java b/mockwebserver/src/main/java/okhttp3/mockwebserver/MockResponse.java index 8452b8463404..17a79419613d 100644 --- a/mockwebserver/src/main/java/okhttp3/mockwebserver/MockResponse.java +++ b/mockwebserver/src/main/java/okhttp3/mockwebserver/MockResponse.java @@ -19,9 +19,9 @@ import java.util.List; import java.util.concurrent.TimeUnit; import okhttp3.Headers; +import okhttp3.NewWebSocket; import okhttp3.internal.Internal; import okhttp3.internal.http2.Settings; -import okhttp3.WebSocketListener; import okio.Buffer; /** A scripted response to be replayed by the mock web server. */ @@ -45,7 +45,7 @@ public final class MockResponse implements Cloneable { private List promises = new ArrayList<>(); private Settings settings; - private WebSocketListener webSocketListener; + private NewWebSocket.Listener webSocketListener; /** Creates a new mock response with an empty body. */ public MockResponse() { @@ -284,7 +284,7 @@ public Settings getSettings() { * Attempts to perform a web socket upgrade on the connection. This will overwrite any previously * set status or body. */ - public MockResponse withWebSocketUpgrade(WebSocketListener listener) { + public MockResponse withWebSocketUpgrade(NewWebSocket.Listener listener) { setStatus("HTTP/1.1 101 Switching Protocols"); setHeader("Connection", "Upgrade"); setHeader("Upgrade", "websocket"); @@ -293,7 +293,7 @@ public MockResponse withWebSocketUpgrade(WebSocketListener listener) { return this; } - public WebSocketListener getWebSocketListener() { + public NewWebSocket.Listener getWebSocketListener() { return webSocketListener; } diff --git a/mockwebserver/src/main/java/okhttp3/mockwebserver/MockWebServer.java b/mockwebserver/src/main/java/okhttp3/mockwebserver/MockWebServer.java index 329a5a7352bd..75fde27f4fe0 100644 --- a/mockwebserver/src/main/java/okhttp3/mockwebserver/MockWebServer.java +++ b/mockwebserver/src/main/java/okhttp3/mockwebserver/MockWebServer.java @@ -69,7 +69,7 @@ import okhttp3.internal.http2.Http2Stream; import okhttp3.internal.http2.Settings; import okhttp3.internal.platform.Platform; -import okhttp3.internal.ws.RealWebSocket; +import okhttp3.internal.ws.RealNewWebSocket; import okhttp3.internal.ws.WebSocketProtocol; import okio.Buffer; import okio.BufferedSink; @@ -676,14 +676,15 @@ private void handleWebSocketUpgrade(Socket socket, BufferedSource source, Buffer replyExecutor.allowCoreThreadTimeOut(true); final CountDownLatch connectionClose = new CountDownLatch(1); - RealWebSocket webSocket = - new RealWebSocket(false /* is server */, source, sink, new SecureRandom(), replyExecutor, - response.getWebSocketListener(), fancyResponse, name) { - @Override protected void shutdown() { - connectionClose.countDown(); - } - }; - + RealNewWebSocket.Streams streams = new RealNewWebSocket.Streams(false, source, sink) { + @Override public void close() { + connectionClose.countDown(); + } + }; + RealNewWebSocket webSocket = new RealNewWebSocket(fancyRequest, + response.getWebSocketListener(), new SecureRandom()); + response.getWebSocketListener().onOpen(webSocket, fancyResponse); + webSocket.initReaderAndWriter(streams); webSocket.loopReader(); // Even if messages are no longer being read we need to wait for the connection close signal. diff --git a/okhttp-tests/src/main/java/okhttp3/AutobahnTester.java b/okhttp-tests/src/main/java/okhttp3/AutobahnTester.java index af9aed149946..87e99b552d50 100644 --- a/okhttp-tests/src/main/java/okhttp3/AutobahnTester.java +++ b/okhttp-tests/src/main/java/okhttp3/AutobahnTester.java @@ -17,18 +17,12 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import okhttp3.internal.Version; -import okio.BufferedSource; import okio.ByteString; -import static okhttp3.WebSocket.BINARY; -import static okhttp3.WebSocket.TEXT; - /** * Exercises the web socket implementation against the Autobahn Testsuite. @@ -42,9 +36,9 @@ public static void main(String... args) throws IOException { final OkHttpClient client = new OkHttpClient(); - private WebSocketCall newWebSocket(String path) { + private NewWebSocket newWebSocket(String path, NewWebSocket.Listener listener) { Request request = new Request.Builder().url(HOST + path).build(); - return client.newWebSocketCall(request); + return client.newWebSocket(request, listener); } public void run() throws IOException { @@ -65,51 +59,30 @@ public void run() throws IOException { private void runTest(final long number, final long count) { final CountDownLatch latch = new CountDownLatch(1); final AtomicLong startNanos = new AtomicLong(); - newWebSocket("/runCase?case=" + number + "&agent=okhttp") // - .enqueue(new WebSocketListener() { - private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor(); - private WebSocket webSocket; - - @Override public void onOpen(WebSocket webSocket, Response response) { - this.webSocket = webSocket; - - System.out.println("Executing test case " + number + "/" + count); - startNanos.set(System.nanoTime()); - } - - @Override public void onMessage(final ResponseBody message) throws IOException { - final RequestBody response; - if (message.contentType() == TEXT) { - response = RequestBody.create(TEXT, message.string()); - } else { - BufferedSource source = message.source(); - response = RequestBody.create(BINARY, source.readByteString()); - source.close(); - } - sendExecutor.execute(new Runnable() { - @Override public void run() { - try { - webSocket.message(response); - } catch (IOException e) { - e.printStackTrace(System.out); - } - } - }); - } - - @Override public void onPong(ByteString payload) { - } - - @Override public void onClose(int code, String reason) { - sendExecutor.shutdown(); - latch.countDown(); - } - - @Override public void onFailure(Throwable t, Response response) { - t.printStackTrace(System.out); - latch.countDown(); - } - }); + newWebSocket("/runCase?case=" + number + "&agent=okhttp", new NewWebSocket.Listener() { + @Override public void onOpen(NewWebSocket webSocket, Response response) { + System.out.println("Executing test case " + number + "/" + count); + startNanos.set(System.nanoTime()); + } + + @Override public void onMessage(final NewWebSocket webSocket, final ByteString bytes) { + webSocket.send(bytes); + } + + @Override public void onMessage(final NewWebSocket webSocket, final String text) { + webSocket.send(text); + } + + @Override public void onClosing(NewWebSocket webSocket, int code, String reason) { + webSocket.close(1000, null); + latch.countDown(); + } + + @Override public void onFailure(NewWebSocket webSocket, Throwable t, Response response) { + t.printStackTrace(System.out); + latch.countDown(); + } + }); try { if (!latch.await(30, TimeUnit.SECONDS)) { throw new IllegalStateException("Timed out waiting for test " + number + " to finish."); @@ -127,23 +100,17 @@ private long getTestCount() throws IOException { final CountDownLatch latch = new CountDownLatch(1); final AtomicLong countRef = new AtomicLong(); final AtomicReference failureRef = new AtomicReference<>(); - newWebSocket("/getCaseCount").enqueue(new WebSocketListener() { - @Override public void onOpen(WebSocket webSocket, Response response) { - } - - @Override public void onMessage(ResponseBody message) throws IOException { - countRef.set(message.source().readDecimalLong()); - message.close(); + newWebSocket("/getCaseCount", new NewWebSocket.Listener() { + @Override public void onMessage(NewWebSocket webSocket, String text) { + countRef.set(Long.parseLong(text)); } - @Override public void onPong(ByteString payload) { - } - - @Override public void onClose(int code, String reason) { + @Override public void onClosing(NewWebSocket webSocket, int code, String reason) { + webSocket.close(1000, null); latch.countDown(); } - @Override public void onFailure(Throwable t, Response response) { + @Override public void onFailure(NewWebSocket webSocket, Throwable t, Response response) { failureRef.set(t); latch.countDown(); } @@ -164,21 +131,13 @@ private long getTestCount() throws IOException { private void updateReports() { final CountDownLatch latch = new CountDownLatch(1); - newWebSocket("/updateReports?agent=" + Version.userAgent()).enqueue(new WebSocketListener() { - @Override public void onOpen(WebSocket webSocket, Response response) { - } - - @Override public void onMessage(ResponseBody message) throws IOException { - } - - @Override public void onPong(ByteString payload) { - } - - @Override public void onClose(int code, String reason) { + newWebSocket("/updateReports?agent=" + Version.userAgent(), new NewWebSocket.Listener() { + @Override public void onClosing(NewWebSocket webSocket, int code, String reason) { + webSocket.close(code, null); latch.countDown(); } - @Override public void onFailure(Throwable t, Response response) { + @Override public void onFailure(NewWebSocket webSocket, Throwable t, Response response) { latch.countDown(); } }); diff --git a/okhttp-tests/src/test/java/okhttp3/WebSocketCallTest.java b/okhttp-tests/src/test/java/okhttp3/WebSocketCallTest.java deleted file mode 100644 index 261799a304a8..000000000000 --- a/okhttp-tests/src/test/java/okhttp3/WebSocketCallTest.java +++ /dev/null @@ -1,475 +0,0 @@ -/* - * Copyright (C) 2014 Square, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package okhttp3; - -import java.io.IOException; -import java.net.ProtocolException; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Logger; -import okhttp3.internal.tls.SslClient; -import okhttp3.internal.ws.EmptyWebSocketListener; -import okhttp3.internal.ws.RealWebSocket; -import okhttp3.internal.ws.WebSocketRecorder; -import okhttp3.mockwebserver.MockResponse; -import okhttp3.mockwebserver.MockWebServer; -import okio.ByteString; -import org.junit.After; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static okhttp3.TestUtil.defaultClient; -import static okhttp3.WebSocket.BINARY; -import static okhttp3.WebSocket.TEXT; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public final class WebSocketCallTest { - @Rule public final MockWebServer webServer = new MockWebServer(); - - private final SslClient sslClient = SslClient.localhost(); - private final WebSocketRecorder clientListener = new WebSocketRecorder("client"); - private final WebSocketRecorder serverListener = new WebSocketRecorder("server"); - private final Random random = new Random(0); - private OkHttpClient client = defaultClient().newBuilder() - .addInterceptor(new Interceptor() { - @Override public Response intercept(Chain chain) throws IOException { - Response response = chain.proceed(chain.request()); - assertNotNull(response.body()); // Ensure application interceptors never see a null body. - return response; - } - }) - .build(); - - @After public void tearDown() { - clientListener.assertExhausted(); - } - - @Test public void textMessage() throws IOException { - webServer.enqueue(new MockResponse().withWebSocketUpgrade(serverListener)); - enqueueClientWebSocket(); - - WebSocket client = clientListener.assertOpen(); - serverListener.assertOpen(); - - client.message(RequestBody.create(TEXT, "Hello, WebSockets!")); - serverListener.assertTextMessage("Hello, WebSockets!"); - } - - @Test public void binaryMessage() throws IOException { - webServer.enqueue(new MockResponse().withWebSocketUpgrade(serverListener)); - enqueueClientWebSocket(); - - WebSocket client = clientListener.assertOpen(); - serverListener.assertOpen(); - - client.message(RequestBody.create(BINARY, "Hello!")); - serverListener.assertBinaryMessage(new byte[] {'H', 'e', 'l', 'l', 'o', '!'}); - } - - @Test public void nullMessageThrows() throws IOException { - webServer.enqueue(new MockResponse().withWebSocketUpgrade(serverListener)); - enqueueClientWebSocket(); - - WebSocket client = clientListener.assertOpen(); - try { - client.message(null); - fail(); - } catch (NullPointerException e) { - assertEquals("message == null", e.getMessage()); - } - } - - @Test public void missingContentTypeThrows() throws IOException { - webServer.enqueue(new MockResponse().withWebSocketUpgrade(serverListener)); - enqueueClientWebSocket(); - - WebSocket client = clientListener.assertOpen(); - try { - client.message(RequestBody.create(null, "Hey!")); - fail(); - } catch (IllegalArgumentException e) { - assertEquals("Message content type was null. Must use WebSocket.TEXT or WebSocket.BINARY.", - e.getMessage()); - } - } - - @Test public void unknownContentTypeThrows() throws IOException { - webServer.enqueue(new MockResponse().withWebSocketUpgrade(serverListener)); - enqueueClientWebSocket(); - - WebSocket client = clientListener.assertOpen(); - try { - client.message(RequestBody.create(MediaType.parse("text/plain"), "Hey!")); - fail(); - } catch (IllegalArgumentException e) { - assertEquals( - "Unknown message content type: text/plain. Must use WebSocket.TEXT or WebSocket.BINARY.", - e.getMessage()); - } - } - - @Test public void pingPong() throws IOException { - webServer.enqueue(new MockResponse().withWebSocketUpgrade(serverListener)); - enqueueClientWebSocket(); - - WebSocket client = clientListener.assertOpen(); - - client.ping(ByteString.encodeUtf8("Hello, WebSockets!")); - clientListener.assertPong(ByteString.encodeUtf8("Hello, WebSockets!")); - } - - @Test public void nullPingPayloadThrows() throws IOException { - webServer.enqueue(new MockResponse().withWebSocketUpgrade(serverListener)); - enqueueClientWebSocket(); - - WebSocket client = clientListener.assertOpen(); - try { - client.ping(null); - fail(); - } catch (NullPointerException e) { - assertEquals("payload == null", e.getMessage()); - } - } - - @Test public void serverMessage() throws IOException { - webServer.enqueue(new MockResponse().withWebSocketUpgrade(serverListener)); - enqueueClientWebSocket(); - - clientListener.assertOpen(); - WebSocket server = serverListener.assertOpen(); - - server.message(RequestBody.create(TEXT, "Hello, WebSockets!")); - clientListener.assertTextMessage("Hello, WebSockets!"); - } - - @Test public void writingOnReaderThreadThrows() throws IOException, InterruptedException { - webServer.enqueue(new MockResponse().withWebSocketUpgrade(serverListener)); - - Request request = new Request.Builder().get().url(webServer.url("/")).build(); - RealWebSocketCall call = new RealWebSocketCall(client, request, random); - - final AtomicInteger count = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(1); - call.enqueue(new WebSocketListener() { - private WebSocket webSocket; - - @Override public void onOpen(WebSocket webSocket, Response response) { - this.webSocket = webSocket; - - try { - webSocket.close(1000, ""); - fail(); - } catch (IllegalStateException e) { - assertEquals("attempting to write from reader thread", e.getMessage()); - } catch (IOException e) { - throw new AssertionError(e); - } - - count.getAndIncrement(); - } - - @Override public void onMessage(ResponseBody message) throws IOException { - try { - webSocket.message(RequestBody.create(TEXT, "hey")); - fail(); - } catch (IllegalStateException e) { - assertEquals("attempting to write from reader thread", e.getMessage()); - } - - message.close(); - count.getAndIncrement(); - } - - @Override public void onPong(ByteString payload) { - try { - webSocket.ping(ByteString.EMPTY); - fail(); - } catch (IllegalStateException e) { - assertEquals("attempting to write from reader thread", e.getMessage()); - } catch (IOException e) { - throw new AssertionError(e); - } - - count.getAndIncrement(); - } - - @Override public void onClose(int code, String reason) { - latch.countDown(); - } - - @Override public void onFailure(Throwable t, Response response) { - t.printStackTrace(); - } - }); - - WebSocket server = serverListener.assertOpen(); - server.message(RequestBody.create(TEXT, "hi")); - ((RealWebSocket) server).pong(ByteString.EMPTY); - server.close(1000, ""); - - assertTrue(latch.await(10, SECONDS)); - // Verify we hit all three callbacks and attempted to write in them. - assertEquals(3, count.get()); - } - - @Test public void throwingOnOpenClosesAndFails() { - webServer.enqueue(new MockResponse().withWebSocketUpgrade(serverListener)); - - final RuntimeException e = new RuntimeException(); - clientListener.setNextEventDelegate(new EmptyWebSocketListener() { - @Override public void onOpen(WebSocket webSocket, Response response) { - throw e; - } - }); - enqueueClientWebSocket(); - - serverListener.assertOpen(); - serverListener.assertClose(1001, ""); - clientListener.assertFailure(e); - } - - @Ignore("AsyncCall currently lets runtime exceptions propagate.") - @Test public void throwingOnFailLogs() throws InterruptedException { - TestLogHandler logs = new TestLogHandler(); - Logger logger = Logger.getLogger(OkHttpClient.class.getName()); - logger.addHandler(logs); - - webServer.enqueue(new MockResponse().setResponseCode(200).setBody("Body")); - - final RuntimeException e = new RuntimeException(); - clientListener.setNextEventDelegate(new EmptyWebSocketListener() { - @Override public void onFailure(Throwable t, Response response) { - throw e; - } - }); - - enqueueClientWebSocket(); - - assertEquals("", logs.take()); - logger.removeHandler(logs); - } - - @Test public void throwingOnMessageClosesAndFails() throws IOException { - webServer.enqueue(new MockResponse().withWebSocketUpgrade(serverListener)); - enqueueClientWebSocket(); - - clientListener.assertOpen(); - WebSocket server = serverListener.assertOpen(); - - final RuntimeException e = new RuntimeException(); - clientListener.setNextEventDelegate(new EmptyWebSocketListener() { - @Override public void onMessage(ResponseBody message) { - throw e; - } - }); - - server.message(RequestBody.create(TEXT, "Hello, WebSockets!")); - clientListener.assertFailure(e); - serverListener.assertClose(1001, ""); - } - - @Test public void throwingOnOnPongClosesAndFails() throws IOException { - webServer.enqueue(new MockResponse().withWebSocketUpgrade(serverListener)); - enqueueClientWebSocket(); - - WebSocket client = clientListener.assertOpen(); - serverListener.assertOpen(); - - final RuntimeException e = new RuntimeException(); - clientListener.setNextEventDelegate(new EmptyWebSocketListener() { - @Override public void onPong(ByteString payload) { - throw e; - } - }); - - client.ping(ByteString.EMPTY); - clientListener.assertFailure(e); - serverListener.assertClose(1001, ""); - } - - @Test public void throwingOnCloseClosesNormallyAndFails() throws IOException { - webServer.enqueue(new MockResponse().withWebSocketUpgrade(serverListener)); - enqueueClientWebSocket(); - - clientListener.assertOpen(); - WebSocket server = serverListener.assertOpen(); - - final RuntimeException e = new RuntimeException(); - clientListener.setNextEventDelegate(new EmptyWebSocketListener() { - @Override public void onClose(int code, String reason) { - throw e; - } - }); - - server.close(1000, "bye"); - clientListener.assertFailure(e); - serverListener.assertClose(1000, "bye"); - } - - @Test public void non101RetainsBody() throws IOException { - webServer.enqueue(new MockResponse().setResponseCode(200).setBody("Body")); - enqueueClientWebSocket(); - - clientListener.assertFailure(200, "Body", ProtocolException.class, - "Expected HTTP 101 response but was '200 OK'"); - } - - @Test public void notFound() throws IOException { - webServer.enqueue(new MockResponse().setStatus("HTTP/1.1 404 Not Found")); - enqueueClientWebSocket(); - - clientListener.assertFailure(404, null, ProtocolException.class, - "Expected HTTP 101 response but was '404 Not Found'"); - } - - @Test public void clientTimeoutClosesBody() throws IOException { - webServer.enqueue(new MockResponse().setResponseCode(408)); - webServer.enqueue(new MockResponse().withWebSocketUpgrade(serverListener)); - enqueueClientWebSocket(); - - WebSocket client = clientListener.assertOpen(); - - client.ping(ByteString.encodeUtf8("WebSockets are fun!")); - clientListener.assertPong(ByteString.encodeUtf8("WebSockets are fun!")); - } - - @Test public void missingConnectionHeader() throws IOException { - webServer.enqueue(new MockResponse() - .setResponseCode(101) - .setHeader("Upgrade", "websocket") - .setHeader("Sec-WebSocket-Accept", "ujmZX4KXZqjwy6vi1aQFH5p4Ygk=")); - enqueueClientWebSocket(); - - clientListener.assertFailure(101, null, ProtocolException.class, - "Expected 'Connection' header value 'Upgrade' but was 'null'"); - } - - @Test public void wrongConnectionHeader() throws IOException { - webServer.enqueue(new MockResponse() - .setResponseCode(101) - .setHeader("Upgrade", "websocket") - .setHeader("Connection", "Downgrade") - .setHeader("Sec-WebSocket-Accept", "ujmZX4KXZqjwy6vi1aQFH5p4Ygk=")); - enqueueClientWebSocket(); - - clientListener.assertFailure(101, null, ProtocolException.class, - "Expected 'Connection' header value 'Upgrade' but was 'Downgrade'"); - } - - @Test public void missingUpgradeHeader() throws IOException { - webServer.enqueue(new MockResponse() - .setResponseCode(101) - .setHeader("Connection", "Upgrade") - .setHeader("Sec-WebSocket-Accept", "ujmZX4KXZqjwy6vi1aQFH5p4Ygk=")); - enqueueClientWebSocket(); - - clientListener.assertFailure(101, null, ProtocolException.class, - "Expected 'Upgrade' header value 'websocket' but was 'null'"); - } - - @Test public void wrongUpgradeHeader() throws IOException { - webServer.enqueue(new MockResponse() - .setResponseCode(101) - .setHeader("Connection", "Upgrade") - .setHeader("Upgrade", "Pepsi") - .setHeader("Sec-WebSocket-Accept", "ujmZX4KXZqjwy6vi1aQFH5p4Ygk=")); - enqueueClientWebSocket(); - - clientListener.assertFailure(101, null, ProtocolException.class, - "Expected 'Upgrade' header value 'websocket' but was 'Pepsi'"); - } - - @Test public void missingMagicHeader() throws IOException { - webServer.enqueue(new MockResponse() - .setResponseCode(101) - .setHeader("Connection", "Upgrade") - .setHeader("Upgrade", "websocket")); - enqueueClientWebSocket(); - - clientListener.assertFailure(101, null, ProtocolException.class, - "Expected 'Sec-WebSocket-Accept' header value 'ujmZX4KXZqjwy6vi1aQFH5p4Ygk=' but was 'null'"); - } - - @Test public void wrongMagicHeader() throws IOException { - webServer.enqueue(new MockResponse() - .setResponseCode(101) - .setHeader("Connection", "Upgrade") - .setHeader("Upgrade", "websocket") - .setHeader("Sec-WebSocket-Accept", "magic")); - enqueueClientWebSocket(); - - clientListener.assertFailure(101, null, ProtocolException.class, - "Expected 'Sec-WebSocket-Accept' header value 'ujmZX4KXZqjwy6vi1aQFH5p4Ygk=' but was 'magic'"); - } - - @Test public void wsScheme() throws IOException { - websocketScheme("ws"); - } - - @Test public void wsUppercaseScheme() throws IOException { - websocketScheme("WS"); - } - - @Test public void wssScheme() throws IOException { - webServer.useHttps(sslClient.socketFactory, false); - client = client.newBuilder() - .sslSocketFactory(sslClient.socketFactory, sslClient.trustManager) - .hostnameVerifier(new RecordingHostnameVerifier()) - .build(); - - websocketScheme("wss"); - } - - @Test public void httpsScheme() throws IOException { - webServer.useHttps(sslClient.socketFactory, false); - client = client.newBuilder() - .sslSocketFactory(sslClient.socketFactory, sslClient.trustManager) - .hostnameVerifier(new RecordingHostnameVerifier()) - .build(); - - websocketScheme("https"); - } - - private void websocketScheme(String scheme) throws IOException { - webServer.enqueue(new MockResponse().withWebSocketUpgrade(serverListener)); - - Request request = new Request.Builder() - .url(scheme + "://" + webServer.getHostName() + ":" + webServer.getPort() + "/") - .build(); - - enqueueClientWebSocket(request); - WebSocket webSocket = clientListener.assertOpen(); - serverListener.assertOpen(); - - webSocket.message(RequestBody.create(TEXT, "abc")); - serverListener.assertTextMessage("abc"); - } - - private void enqueueClientWebSocket() { - enqueueClientWebSocket(new Request.Builder().get().url(webServer.url("/")).build()); - } - - private void enqueueClientWebSocket(Request request) { - WebSocketCall call = new RealWebSocketCall(client, request, random); - call.enqueue(clientListener); - } -} diff --git a/okhttp-tests/src/test/java/okhttp3/NewWebSocketTest.java b/okhttp-tests/src/test/java/okhttp3/WebSocketHttpTest.java similarity index 94% rename from okhttp-tests/src/test/java/okhttp3/NewWebSocketTest.java rename to okhttp-tests/src/test/java/okhttp3/WebSocketHttpTest.java index fc4f1bc74c7d..bd4ed9edf1ee 100644 --- a/okhttp-tests/src/test/java/okhttp3/NewWebSocketTest.java +++ b/okhttp-tests/src/test/java/okhttp3/WebSocketHttpTest.java @@ -22,7 +22,6 @@ import okhttp3.internal.tls.SslClient; import okhttp3.internal.ws.NewWebSocketRecorder; import okhttp3.internal.ws.RealNewWebSocket; -import okhttp3.internal.ws.WebSocketRecorder; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okio.ByteString; @@ -32,17 +31,16 @@ import org.junit.Test; import static okhttp3.TestUtil.defaultClient; -import static okhttp3.WebSocket.TEXT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; -public final class NewWebSocketTest { +public final class WebSocketHttpTest { @Rule public final MockWebServer webServer = new MockWebServer(); private final SslClient sslClient = SslClient.localhost(); private final NewWebSocketRecorder clientListener = new NewWebSocketRecorder("client"); - private final WebSocketRecorder serverListener = new WebSocketRecorder("server"); + private final NewWebSocketRecorder serverListener = new NewWebSocketRecorder("server"); private final Random random = new Random(0); private OkHttpClient client = defaultClient().newBuilder() .addInterceptor(new Interceptor() { @@ -111,9 +109,9 @@ public final class NewWebSocketTest { enqueueClientWebSocket(); clientListener.assertOpen(); - WebSocket server = serverListener.assertOpen(); + NewWebSocket server = serverListener.assertOpen(); - server.message(RequestBody.create(TEXT, "Hello, WebSockets!")); + server.send("Hello, WebSockets!"); clientListener.assertTextMessage("Hello, WebSockets!"); } @@ -159,7 +157,7 @@ public final class NewWebSocketTest { enqueueClientWebSocket(); clientListener.assertOpen(); - WebSocket server = serverListener.assertOpen(); + NewWebSocket server = serverListener.assertOpen(); final RuntimeException e = new RuntimeException(); clientListener.setNextEventDelegate(new NewWebSocket.Listener() { @@ -168,7 +166,7 @@ public final class NewWebSocketTest { } }); - server.message(RequestBody.create(TEXT, "Hello, WebSockets!")); + server.send("Hello, WebSockets!"); clientListener.assertFailure(e); serverListener.assertExhausted(); } @@ -178,7 +176,7 @@ public final class NewWebSocketTest { enqueueClientWebSocket(); clientListener.assertOpen(); - WebSocket server = serverListener.assertOpen(); + NewWebSocket server = serverListener.assertOpen(); final RuntimeException e = new RuntimeException(); clientListener.setNextEventDelegate(new NewWebSocket.Listener() { @@ -214,12 +212,12 @@ public final class NewWebSocketTest { RealNewWebSocket client = enqueueClientWebSocket(); clientListener.assertOpen(); - WebSocket server = serverListener.assertOpen(); + NewWebSocket server = serverListener.assertOpen(); client.send("abc"); serverListener.assertTextMessage("abc"); - server.message(RequestBody.create(TEXT, "def")); + server.send("def"); clientListener.assertTextMessage("def"); } @@ -340,8 +338,8 @@ private RealNewWebSocket enqueueClientWebSocket() { } private RealNewWebSocket enqueueClientWebSocket(Request request) { - RealNewWebSocket webSocket = new RealNewWebSocket(client, request, clientListener, random); - webSocket.connnect(); + RealNewWebSocket webSocket = new RealNewWebSocket(request, clientListener, random); + webSocket.connect(client); return webSocket; } } diff --git a/okhttp-tests/src/test/java/okhttp3/internal/ws/NewWebSocketRecorder.java b/okhttp-tests/src/test/java/okhttp3/internal/ws/NewWebSocketRecorder.java index 7f4ad50f68eb..40cbfaba7b1f 100644 --- a/okhttp-tests/src/test/java/okhttp3/internal/ws/NewWebSocketRecorder.java +++ b/okhttp-tests/src/test/java/okhttp3/internal/ws/NewWebSocketRecorder.java @@ -21,8 +21,6 @@ import java.util.concurrent.TimeUnit; import okhttp3.NewWebSocket; import okhttp3.Response; -import okhttp3.ResponseBody; -import okhttp3.WebSocket; import okhttp3.internal.Util; import okhttp3.internal.platform.Platform; import okio.ByteString; @@ -142,14 +140,14 @@ public void assertBinaryMessage(byte[] payload) { assertEquals(new Message(ByteString.of(payload)), actual); } - public void assertPong(ByteString payload) { + public void assertClosing(int code, String reason) { Object actual = nextEvent(); - assertEquals(new Pong(payload), actual); + assertEquals(new Closing(code, reason), actual); } - public void assertClose(int code, String reason) { + public void assertClosed(int code, String reason) { Object actual = nextEvent(); - assertEquals(new Closing(code, reason), actual); + assertEquals(new Closed(code, reason), actual); } public void assertExhausted() { @@ -269,30 +267,6 @@ public Message(String string) { } } - static final class Pong { - public final ByteString payload; - - Pong(ByteString payload) { - this.payload = payload; - } - - @Override public String toString() { - return "Pong[" + payload + "]"; - } - - @Override public int hashCode() { - return payload.hashCode(); - } - - @Override public boolean equals(Object obj) { - if (obj instanceof Pong) { - Pong other = (Pong) obj; - return payload == null ? other.payload == null : payload.equals(other.payload); - } - return false; - } - } - static final class Closing { public final int code; public final String reason; @@ -340,61 +314,4 @@ static final class Closed { && ((Closed) other).reason.equals(reason); } } - - /** Expose this recorder as a frame callback and shim in "ping" events. */ - WebSocketReader.FrameCallback asFrameCallback() { - return new WebSocketReader.FrameCallback() { - @Override public void onReadMessage(ResponseBody body) throws IOException { - if (body.contentType().equals(WebSocket.TEXT)) { - String text = body.source().readUtf8(); - onMessage(null, text); - } else if (body.contentType().equals(WebSocket.BINARY)) { - ByteString bytes = body.source().readByteString(); - onMessage(null, bytes); - } else { - throw new IllegalArgumentException(); - } - } - - @Override public void onReadPing(ByteString payload) { - events.add(new Ping(payload)); - } - - @Override public void onReadPong(ByteString padload) { - } - - @Override public void onReadClose(int code, String reason) { - onClosing(null, code, reason); - } - }; - } - - void assertPing(ByteString payload) { - Object actual = nextEvent(); - assertEquals(new Ping(payload), actual); - } - - static final class Ping { - public final ByteString buffer; - - Ping(ByteString buffer) { - this.buffer = buffer; - } - - @Override public String toString() { - return "Ping[" + buffer + "]"; - } - - @Override public int hashCode() { - return buffer.hashCode(); - } - - @Override public boolean equals(Object obj) { - if (obj instanceof Ping) { - Ping other = (Ping) obj; - return buffer == null ? other.buffer == null : buffer.equals(other.buffer); - } - return false; - } - } } diff --git a/okhttp-tests/src/test/java/okhttp3/internal/ws/RealWebSocketTest.java b/okhttp-tests/src/test/java/okhttp3/internal/ws/RealWebSocketTest.java index 94c20d434b2a..580216b0764e 100644 --- a/okhttp-tests/src/test/java/okhttp3/internal/ws/RealWebSocketTest.java +++ b/okhttp-tests/src/test/java/okhttp3/internal/ws/RealWebSocketTest.java @@ -15,65 +15,58 @@ */ package okhttp3.internal.ws; -import java.io.Closeable; +import java.io.EOFException; import java.io.IOException; import java.net.ProtocolException; import java.util.Random; -import java.util.concurrent.Executor; -import okhttp3.MediaType; import okhttp3.Protocol; import okhttp3.Request; -import okhttp3.RequestBody; import okhttp3.Response; -import okio.Buffer; import okio.BufferedSink; -import okio.BufferedSource; import okio.ByteString; import okio.Okio; -import okio.Sink; -import okio.Source; -import okio.Timeout; +import okio.Pipe; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; -import static okhttp3.WebSocket.TEXT; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; public final class RealWebSocketTest { // NOTE: Fields are named 'client' and 'server' for cognitive simplicity. This differentiation has // zero effect on the behavior of the WebSocket API which is why tests are only written once // from the perspective of a single peer. - private final Executor clientExecutor = new SynchronousExecutor(); - private RealWebSocket client; + private RealNewWebSocket client; private boolean clientConnectionCloseThrows; private boolean clientConnectionClosed; - private final MemorySocket client2Server = new MemorySocket(); - private final WebSocketRecorder clientListener = new WebSocketRecorder("client"); + private final Pipe client2Server = new Pipe(1024L); + private final BufferedSink client2ServerSink = Okio.buffer(client2Server.sink()); + private final NewWebSocketRecorder clientListener = new NewWebSocketRecorder("client"); - private final Executor serverExecutor = new SynchronousExecutor(); - private RealWebSocket server; + private RealNewWebSocket server; private boolean serverConnectionClosed; - private final MemorySocket server2client = new MemorySocket(); - private final WebSocketRecorder serverListener = new WebSocketRecorder("server"); + private final Pipe server2client = new Pipe(1024L); + private final BufferedSink server2clientSink = Okio.buffer(server2client.sink()); + private final NewWebSocketRecorder serverListener = new NewWebSocketRecorder("server"); - @Before public void setUp() { + @Before public void setUp() throws IOException { Random random = new Random(0); String url = "http://example.com/websocket"; - Response response = new Response.Builder() + final Response response = new Response.Builder() .code(101) .request(new Request.Builder().url(url).build()) .protocol(Protocol.HTTP_1_1) .build(); - client = new RealWebSocket(true, server2client.source(), client2Server.sink(), random, - clientExecutor, clientListener, response, url) { - @Override protected void shutdown() { + client = new RealNewWebSocket(response.request(), clientListener, random); + client.initReaderAndWriter(new RealNewWebSocket.Streams( + true, Okio.buffer(server2client.source()), client2ServerSink) { + @Override public void close() throws IOException { + source.close(); + sink.close(); if (clientConnectionClosed) { throw new AssertionError("Already closed"); } @@ -83,16 +76,20 @@ public final class RealWebSocketTest { throw new RuntimeException("Oops!"); } } - }; - server = new RealWebSocket(false, client2Server.source(), server2client.sink(), random, - serverExecutor, serverListener, response, url) { - @Override protected void shutdown() { + }); + + server = new RealNewWebSocket(response.request(), serverListener, random); + server.initReaderAndWriter(new RealNewWebSocket.Streams( + false, Okio.buffer(client2Server.source()), server2clientSink) { + @Override public void close() throws IOException { + source.close(); + sink.close(); if (serverConnectionClosed) { throw new AssertionError("Already closed"); } serverConnectionClosed = true; } - }; + }); } @After public void tearDown() { @@ -100,401 +97,202 @@ public final class RealWebSocketTest { serverListener.assertExhausted(); } - @Test public void streamingMessage() throws IOException { - RequestBody message = new RequestBody() { - @Override public MediaType contentType() { - return TEXT; - } - - @Override public void writeTo(BufferedSink sink) throws IOException { - sink.writeUtf8("Hel").flush(); - sink.writeUtf8("lo!").flush(); - sink.close(); - } - }; - client.message(message); - server.processNextFrame(); - serverListener.assertTextMessage("Hello!"); - } - - @Test public void streamingMessageCanInterleavePing() throws IOException { - RequestBody message = new RequestBody() { - @Override public MediaType contentType() { - return TEXT; - } - - @Override public void writeTo(BufferedSink sink) throws IOException { - sink.writeUtf8("Hel").flush(); - client.ping(ByteString.encodeUtf8("Pong?")); - sink.writeUtf8("lo!").flush(); - sink.close(); - } - }; - - client.message(message); - server.processNextFrame(); - serverListener.assertTextMessage("Hello!"); - client.processNextFrame(); - clientListener.assertPong(ByteString.encodeUtf8("Pong?")); - } - - @Test public void pingWritesPong() throws IOException { - client.ping(ByteString.encodeUtf8("Hello!")); - server.processNextFrame(); // Read the ping, write the pong. - client.processNextFrame(); // Read the pong. - clientListener.assertPong(ByteString.encodeUtf8("Hello!")); - } - - @Test public void unsolicitedPong() throws IOException { - client.pong(ByteString.encodeUtf8("Hello!")); - server.processNextFrame(); - serverListener.assertPong(ByteString.encodeUtf8("Hello!")); - } - - @Test public void nullPongPayloadThrows() throws IOException { - try { - client.pong(null); - fail(); - } catch (NullPointerException e) { - assertEquals("payload == null", e.getMessage()); - } - } - @Test public void close() throws IOException { client.close(1000, "Hello!"); assertFalse(server.processNextFrame()); // This will trigger a close response. - serverListener.assertClose(1000, "Hello!"); + serverListener.assertClosing(1000, "Hello!"); + server.close(1000, "Goodbye!"); assertFalse(client.processNextFrame()); - clientListener.assertClose(1000, "Hello!"); + clientListener.assertClosing(1000, "Goodbye!"); + serverListener.assertClosed(1000, "Hello!"); + clientListener.assertClosed(1000, "Goodbye!"); } - @Test public void clientCloseThenMethodsThrow() throws IOException { + @Test public void clientCloseThenMethodsReturnFalse() throws IOException { client.close(1000, "Hello!"); - try { - client.ping(ByteString.encodeUtf8("Pong?")); - fail(); - } catch (IllegalStateException e) { - assertEquals("closed", e.getMessage()); - } - try { - client.close(1000, "Hello!"); - fail(); - } catch (IllegalStateException e) { - assertEquals("closed", e.getMessage()); - } - try { - client.message(RequestBody.create(TEXT, "Hello!")); - fail(); - } catch (IllegalStateException e) { - assertEquals("closed", e.getMessage()); - } + assertFalse(client.close(1000, "Hello!")); + assertFalse(client.send("Hello!")); } - @Test public void socketClosedDuringPingKillsWebSocket() throws IOException { - client2Server.close(); + @Test public void afterSocketClosedPingFailsWebSocket() throws IOException { + client2Server.source().close(); + client.pong(ByteString.encodeUtf8("Ping!")); + clientListener.assertFailure(IOException.class, "source is closed"); - try { - client.ping(ByteString.encodeUtf8("Ping!")); - fail(); - } catch (IOException ignored) { - } - - // A failed write prevents further use of the WebSocket instance. - try { - client.message(RequestBody.create(TEXT, "Hello!")); - fail(); - } catch (IllegalStateException e) { - assertEquals("must call close()", e.getMessage()); - } - try { - client.ping(ByteString.encodeUtf8("Ping!")); - fail(); - } catch (IllegalStateException e) { - assertEquals("must call close()", e.getMessage()); - } + assertFalse(client.send("Hello!")); } @Test public void socketClosedDuringMessageKillsWebSocket() throws IOException { - client2Server.close(); + client2Server.source().close(); - try { - client.message(RequestBody.create(TEXT, "Hello!")); - fail(); - } catch (IOException ignored) { - } + assertTrue(client.send("Hello!")); + clientListener.assertFailure(IOException.class, "source is closed"); // A failed write prevents further use of the WebSocket instance. - try { - client.message(RequestBody.create(TEXT, "Hello!")); - fail(); - } catch (IllegalStateException e) { - assertEquals("must call close()", e.getMessage()); - } - try { - client.ping(ByteString.encodeUtf8("Ping!")); - fail(); - } catch (IllegalStateException e) { - assertEquals("must call close()", e.getMessage()); - } + assertFalse(client.send("Hello!")); + assertFalse(client.pong(ByteString.encodeUtf8("Ping!"))); } - @Test public void serverCloseThenWritingPingThrows() throws IOException { + @Test public void serverCloseThenWritingPingSucceeds() throws IOException { server.close(1000, "Hello!"); client.processNextFrame(); - clientListener.assertClose(1000, "Hello!"); - - try { - client.ping(ByteString.encodeUtf8("Pong?")); - fail(); - } catch (IOException e) { - assertEquals("closed", e.getMessage()); - } + clientListener.assertClosing(1000, "Hello!"); + + assertTrue(client.pong(ByteString.encodeUtf8("Pong?"))); } - @Test public void serverCloseThenWritingMessageThrows() throws IOException { + @Test public void clientCanWriteMessagesAfterServerClose() throws IOException { server.close(1000, "Hello!"); client.processNextFrame(); - clientListener.assertClose(1000, "Hello!"); - - try { - client.message(RequestBody.create(TEXT, "Hi!")); - fail(); - } catch (IOException e) { - assertEquals("closed", e.getMessage()); - } + clientListener.assertClosing(1000, "Hello!"); + + assertTrue(client.send("Hi!")); + server.processNextFrame(); + serverListener.assertTextMessage("Hi!"); } @Test public void serverCloseThenWritingCloseThrows() throws IOException { server.close(1000, "Hello!"); client.processNextFrame(); - clientListener.assertClose(1000, "Hello!"); - - try { - client.close(1000, "Bye!"); - fail(); - } catch (IOException e) { - assertEquals("closed", e.getMessage()); - } - } - - @Test public void serverCloseWhileWritingThrows() throws IOException { - RequestBody message = new RequestBody() { - @Override public MediaType contentType() { - return TEXT; - } - - @Override public void writeTo(BufferedSink sink) throws IOException { - // Start writing data. - sink.writeUtf8("Hel").flush(); - - server.close(1000, "Hello!"); - client.processNextFrame(); - clientListener.assertClose(1000, "Hello!"); - - try { - sink.flush(); // No flushing. - fail(); - } catch (IOException e) { - assertEquals("closed", e.getMessage()); - } - try { - sink.close(); // No closing because this requires writing a frame. - fail(); - } catch (IOException e) { - assertEquals("closed", e.getMessage()); - } - } - }; - client.message(message); + clientListener.assertClosing(1000, "Hello!"); + assertTrue(client.close(1000, "Bye!")); } @Test public void clientCloseClosesConnection() throws IOException { client.close(1000, "Hello!"); assertFalse(clientConnectionClosed); - server.processNextFrame(); // Read client close, send server close. - serverListener.assertClose(1000, "Hello!"); + server.processNextFrame(); // Read client closing, send server close. + serverListener.assertClosing(1000, "Hello!"); - client.processNextFrame(); // Read server close, close connection. + server.close(1000, "Goodbye!"); + client.processNextFrame(); // Read server closing, close connection. assertTrue(clientConnectionClosed); - clientListener.assertClose(1000, "Hello!"); + clientListener.assertClosing(1000, "Goodbye!"); + + // Server and client both finished closing, connection is closed. + serverListener.assertClosed(1000, "Hello!"); + clientListener.assertClosed(1000, "Goodbye!"); } @Test public void serverCloseClosesConnection() throws IOException { server.close(1000, "Hello!"); client.processNextFrame(); // Read server close, send client close, close connection. - assertTrue(clientConnectionClosed); - clientListener.assertClose(1000, "Hello!"); + assertFalse(clientConnectionClosed); + clientListener.assertClosing(1000, "Hello!"); + client.close(1000, "Hello!"); server.processNextFrame(); - serverListener.assertClose(1000, "Hello!"); + serverListener.assertClosing(1000, "Hello!"); + + clientListener.assertClosed(1000, "Hello!"); + serverListener.assertClosed(1000, "Hello!"); } @Test public void clientAndServerCloseClosesConnection() throws IOException { // Send close from both sides at the same time. server.close(1000, "Hello!"); - client.close(1000, "Hi!"); - assertFalse(clientConnectionClosed); - client.processNextFrame(); // Read close, close connection close. - assertTrue(clientConnectionClosed); - clientListener.assertClose(1000, "Hello!"); + assertFalse(clientConnectionClosed); + client.close(1000, "Hi!"); server.processNextFrame(); - serverListener.assertClose(1000, "Hi!"); + + clientListener.assertClosing(1000, "Hello!"); + serverListener.assertClosing(1000, "Hi!"); + clientListener.assertClosed(1000, "Hello!"); + serverListener.assertClosed(1000, "Hi!"); + assertTrue(clientConnectionClosed); serverListener.assertExhausted(); // Client should not have sent second close. clientListener.assertExhausted(); // Server should not have sent second close. } @Test public void serverCloseBreaksReadMessageLoop() throws IOException { - server.message(RequestBody.create(TEXT, "Hello!")); + server.send("Hello!"); server.close(1000, "Bye!"); assertTrue(client.processNextFrame()); clientListener.assertTextMessage("Hello!"); assertFalse(client.processNextFrame()); - clientListener.assertClose(1000, "Bye!"); + clientListener.assertClosing(1000, "Bye!"); } - @Test public void protocolErrorBeforeCloseSendsClose() { - server2client.raw().write(ByteString.decodeHex("0a00")); // Invalid non-final ping frame. + @Test public void protocolErrorBeforeCloseSendsFailure() throws IOException { + server2clientSink.write(ByteString.decodeHex("0a00")).emit(); // Invalid non-final ping frame. client.processNextFrame(); // Detects error, send close, close connection. assertTrue(clientConnectionClosed); clientListener.assertFailure(ProtocolException.class, "Control frames must be final."); server.processNextFrame(); - serverListener.assertClose(1002, ""); + serverListener.assertFailure(EOFException.class, null); } @Test public void protocolErrorInCloseResponseClosesConnection() throws IOException { client.close(1000, "Hello"); + server.processNextFrame(); assertFalse(clientConnectionClosed); // Not closed until close reply is received. // Manually write an invalid masked close frame. - server2client.raw().write(ByteString.decodeHex("888760b420bb635c68de0cd84f")); + server2clientSink.write(ByteString.decodeHex("888760b420bb635c68de0cd84f")).emit(); client.processNextFrame(); // Detects error, closes connection immediately since close already sent. assertTrue(clientConnectionClosed); clientListener.assertFailure(ProtocolException.class, "Server-sent frames must not be masked."); - server.processNextFrame(); - serverListener.assertClose(1000, "Hello"); - + serverListener.assertClosing(1000, "Hello"); serverListener.assertExhausted(); // Client should not have sent second close. } @Test public void protocolErrorAfterCloseDoesNotSendClose() throws IOException { client.close(1000, "Hello!"); + server.processNextFrame(); + assertFalse(clientConnectionClosed); // Not closed until close reply is received. - server2client.raw().write(ByteString.decodeHex("0a00")); // Invalid non-final ping frame. + server2clientSink.write(ByteString.decodeHex("0a00")).emit(); // Invalid non-final ping frame. client.processNextFrame(); // Detects error, closes connection immediately since close already sent. assertTrue(clientConnectionClosed); clientListener.assertFailure(ProtocolException.class, "Control frames must be final."); - server.processNextFrame(); - serverListener.assertClose(1000, "Hello!"); + serverListener.assertClosing(1000, "Hello!"); serverListener.assertExhausted(); // Client should not have sent second close. } - @Test public void networkErrorReportedAsCloseNotFailure() { - server2client.close(); + @Test public void networkErrorReportedAsFailure() throws IOException { + server2clientSink.close(); client.processNextFrame(); - clientListener.assertClose(1006, ""); + clientListener.assertFailure(EOFException.class, null); } - @Test public void closeThrowingClosesConnection() { - client2Server.close(); - - try { - client.close(1000, null); - fail(); - } catch (IOException ignored) { - } - assertTrue(clientConnectionClosed); + @Test public void closeThrowingFailsConnection() throws IOException { + client2Server.source().close(); + client.close(1000, null); + clientListener.assertFailure(IOException.class, "source is closed"); } - @Test public void closeMessageAndConnectionCloseThrowingDoesNotMaskOriginal() { - client2Server.close(); + @Ignore // TODO(jwilson): come up with a way to test unchecked exceptions on the writer thread. + @Test public void closeMessageAndConnectionCloseThrowingDoesNotMaskOriginal() throws IOException { + client2ServerSink.close(); clientConnectionCloseThrows = true; - try { - client.close(1000, "Bye!"); - fail(); - } catch (IOException e) { - assertNotEquals("Oops!", e.getMessage()); - } + client.close(1000, "Bye!"); + clientListener.assertFailure(IOException.class, "failure"); assertTrue(clientConnectionClosed); } - @Test public void peerConnectionCloseThrowingDoesNotPropagate() throws IOException { + @Ignore // TODO(jwilson): come up with a way to test unchecked exceptions on the writer thread. + @Test public void peerConnectionCloseThrowingPropagates() throws IOException { clientConnectionCloseThrows = true; - server.close(1000, "Bye!"); + server.close(1000, "Bye from Server!"); client.processNextFrame(); - assertTrue(clientConnectionClosed); - clientListener.assertClose(1000, "Bye!"); + clientListener.assertClosing(1000, "Bye from Server!"); + client.close(1000, "Bye from Client!"); server.processNextFrame(); - serverListener.assertClose(1000, "Bye!"); - } - - static final class MemorySocket implements Closeable { - private final Buffer buffer = new Buffer(); - private boolean closed; - - @Override public void close() { - closed = true; - } - - Buffer raw() { - return buffer; - } - - BufferedSource source() { - return Okio.buffer(new Source() { - @Override public long read(Buffer sink, long byteCount) throws IOException { - if (closed) throw new IOException("closed"); - return buffer.read(sink, byteCount); - } - - @Override public Timeout timeout() { - return Timeout.NONE; - } - - @Override public void close() { - closed = true; - } - }); - } - - BufferedSink sink() { - return Okio.buffer(new Sink() { - @Override public void write(Buffer source, long byteCount) throws IOException { - if (closed) throw new IOException("closed"); - buffer.write(source, byteCount); - } - - @Override public void flush() { - } - - @Override public Timeout timeout() { - return Timeout.NONE; - } - - @Override public void close() { - closed = true; - } - }); - } - } - - static final class SynchronousExecutor implements Executor { - @Override public void execute(Runnable command) { - command.run(); - } + serverListener.assertClosing(1000, "Bye from Client!"); } } diff --git a/okhttp/src/main/java/okhttp3/OkHttpClient.java b/okhttp/src/main/java/okhttp3/OkHttpClient.java index 91160a56f192..a22e7e4484a9 100644 --- a/okhttp/src/main/java/okhttp3/OkHttpClient.java +++ b/okhttp/src/main/java/okhttp3/OkHttpClient.java @@ -117,8 +117,7 @@ *

OkHttp also uses daemon threads for HTTP/2 connections. These will exit automatically if they * remain idle. */ -public class OkHttpClient - implements Cloneable, Call.Factory, WebSocketCall.Factory, NewWebSocket.Factory { +public class OkHttpClient implements Cloneable, Call.Factory, NewWebSocket.Factory { private static final List DEFAULT_PROTOCOLS = Util.immutableList( Protocol.HTTP_2, Protocol.HTTP_1_1); @@ -391,19 +390,12 @@ public List networkInterceptors() { return new RealCall(this, request, false /* for web socket */); } - /** - * Prepares the {@code request} to create a web socket at some point in the future. - */ - @Override public WebSocketCall newWebSocketCall(Request request) { - return new RealWebSocketCall(this, request); - } - /** * Uses {@code request} to connect a new web socket. */ @Override public NewWebSocket newWebSocket(Request request, NewWebSocket.Listener listener) { - RealNewWebSocket webSocket = new RealNewWebSocket(this, request, listener, new SecureRandom()); - webSocket.connnect(); + RealNewWebSocket webSocket = new RealNewWebSocket(request, listener, new SecureRandom()); + webSocket.connect(this); return webSocket; } diff --git a/okhttp/src/main/java/okhttp3/RealWebSocketCall.java b/okhttp/src/main/java/okhttp3/RealWebSocketCall.java deleted file mode 100644 index da65db1c4bec..000000000000 --- a/okhttp/src/main/java/okhttp3/RealWebSocketCall.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Copyright (C) 2014 Square, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package okhttp3; - -import java.io.IOException; -import java.net.ProtocolException; -import java.security.SecureRandom; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ThreadPoolExecutor; -import okhttp3.internal.Util; -import okhttp3.internal.connection.StreamAllocation; -import okhttp3.internal.ws.RealWebSocket; -import okhttp3.internal.ws.WebSocketProtocol; -import okio.ByteString; - -import static java.util.concurrent.TimeUnit.SECONDS; - -final class RealWebSocketCall implements WebSocketCall { - private static final List ONLY_HTTP1 = Collections.singletonList(Protocol.HTTP_1_1); - - /** The application's original request unadulterated by web socket headers. */ - private final Request originalRequest; - private final RealCall call; - private final Random random; - private final String key; - - RealWebSocketCall(OkHttpClient client, Request request) { - this(client, request, new SecureRandom()); - } - - RealWebSocketCall(OkHttpClient client, Request request, Random random) { - if (!"GET".equals(request.method())) { - throw new IllegalArgumentException("Request must be GET: " + request.method()); - } - this.random = random; - - byte[] nonce = new byte[16]; - random.nextBytes(nonce); - key = ByteString.of(nonce).base64(); - - client = client.newBuilder() - .readTimeout(0, SECONDS) // i.e., no timeout because this is a long-lived connection. - .writeTimeout(0, SECONDS) // i.e., no timeout because this is a long-lived connection. - .protocols(ONLY_HTTP1) - .build(); - - originalRequest = request; - request = request.newBuilder() - .header("Upgrade", "websocket") - .header("Connection", "Upgrade") - .header("Sec-WebSocket-Key", key) - .header("Sec-WebSocket-Version", "13") - .build(); - - call = new RealCall(client, request, true /* for web socket */); - } - - @Override public void enqueue(final WebSocketListener listener) { - Callback responseCallback = new Callback() { - @Override public void onResponse(Call call, Response response) { - StreamWebSocket webSocket; - try { - webSocket = create(response, listener); - } catch (IOException e) { - listener.onFailure(e, response); - return; - } - - webSocket.loopReader(); - } - - @Override public void onFailure(Call call, IOException e) { - listener.onFailure(e, null); - } - }; - call.enqueue(responseCallback); - } - - StreamWebSocket create(Response response, WebSocketListener listener) throws IOException { - if (response.code() != 101) { - throw new ProtocolException("Expected HTTP 101 response but was '" - + response.code() - + " " - + response.message() - + "'"); - } - - String headerConnection = response.header("Connection"); - if (!"Upgrade".equalsIgnoreCase(headerConnection)) { - throw new ProtocolException( - "Expected 'Connection' header value 'Upgrade' but was '" + headerConnection + "'"); - } - String headerUpgrade = response.header("Upgrade"); - if (!"websocket".equalsIgnoreCase(headerUpgrade)) { - throw new ProtocolException( - "Expected 'Upgrade' header value 'websocket' but was '" + headerUpgrade + "'"); - } - String headerAccept = response.header("Sec-WebSocket-Accept"); - String acceptExpected = WebSocketProtocol.acceptHeader(key); - if (!acceptExpected.equals(headerAccept)) { - throw new ProtocolException("Expected 'Sec-WebSocket-Accept' header value '" - + acceptExpected - + "' but was '" - + headerAccept - + "'"); - } - - String name = response.request().url().redact().toString(); - ThreadPoolExecutor replyExecutor = - new ThreadPoolExecutor(1, 1, 1, SECONDS, new LinkedBlockingDeque(), - Util.threadFactory(Util.format("OkHttp %s WebSocket Replier", name), true)); - replyExecutor.allowCoreThreadTimeOut(true); - - StreamAllocation streamAllocation = call.streamAllocation(); - streamAllocation.noNewStreams(); // Web socket connections can't be re-used. - return new StreamWebSocket(streamAllocation, random, replyExecutor, listener, response, name); - } - - @Override public Request request() { - return originalRequest; - } - - @Override public void cancel() { - call.cancel(); - } - - @Override public boolean isExecuted() { - return call.isExecuted(); - } - - @Override public boolean isCanceled() { - return call.isCanceled(); - } - - @Override public WebSocketCall clone() { - return new RealWebSocketCall(call.client, originalRequest, random); - } - - // Keep static so that the WebSocketCall instance can be garbage collected. - static final class StreamWebSocket extends RealWebSocket { - private final StreamAllocation streamAllocation; - private final ExecutorService executor; - - StreamWebSocket(StreamAllocation streamAllocation, Random random, ExecutorService executor, - WebSocketListener listener, Response response, String name) { - super(true /* is client */, streamAllocation.connection().source, - streamAllocation.connection().sink, random, executor, listener, response, name); - this.streamAllocation = streamAllocation; - this.executor = executor; - } - - @Override protected void shutdown() { - executor.shutdown(); - streamAllocation.streamFinished(true, streamAllocation.codec()); - } - } -} diff --git a/okhttp/src/main/java/okhttp3/internal/ws/RealNewWebSocket.java b/okhttp/src/main/java/okhttp3/internal/ws/RealNewWebSocket.java index 380c9939a2b7..a288e2e7d6cd 100644 --- a/okhttp/src/main/java/okhttp3/internal/ws/RealNewWebSocket.java +++ b/okhttp/src/main/java/okhttp3/internal/ws/RealNewWebSocket.java @@ -70,11 +70,16 @@ public final class RealNewWebSocket implements NewWebSocket, WebSocketReader.Fra private final Listener listener; private final Random random; private final String key; - private final Call call; + + /** Non-null for client websockets. These can be canceled. */ + private Call call; /** This runnable processes the outgoing queues. Call {@link #runWriter()} to after enqueueing. */ private final NamedRunnable writerRunnable; + /** Null until this web socket is connected. Only accessed by the reader thread. */ + private WebSocketReader reader; + // All mutable web socket state is guarded by this. /** @@ -83,7 +88,7 @@ public final class RealNewWebSocket implements NewWebSocket, WebSocketReader.Fra */ private boolean writerRunning; - /** Null until this web Socket is connected. Note that messages may be enqueued before that. */ + /** Null until this web socket is connected. Note that messages may be enqueued before that. */ private WebSocketWriter writer; /** @@ -114,7 +119,7 @@ public final class RealNewWebSocket implements NewWebSocket, WebSocketReader.Fra /** True if this web socket failed and the listener has been notified. */ private boolean failed; - public RealNewWebSocket(OkHttpClient client, Request request, Listener listener, Random random) { + public RealNewWebSocket(Request request, Listener listener, Random random) { if (!"GET".equals(request.method())) { throw new IllegalArgumentException("Request must be GET: " + request.method()); } @@ -136,19 +141,6 @@ public RealNewWebSocket(OkHttpClient client, Request request, Listener listener, } } }; - - client = client.newBuilder() - .readTimeout(0, SECONDS) // i.e., no timeout because this is a long-lived connection. - .writeTimeout(0, SECONDS) // i.e., no timeout because this is a long-lived connection. - .protocols(ONLY_HTTP1) - .build(); - request = request.newBuilder() - .header("Upgrade", "websocket") - .header("Connection", "Upgrade") - .header("Sec-WebSocket-Key", key) - .header("Sec-WebSocket-Version", "13") - .build(); - this.call = Internal.instance.newWebSocketCall(client, request); } @Override public Request request() { @@ -163,7 +155,19 @@ public RealNewWebSocket(OkHttpClient client, Request request, Listener listener, call.cancel(); } - public void connnect() { + public void connect(OkHttpClient client) { + client = client.newBuilder() + .readTimeout(0, SECONDS) // i.e., no timeout because this is a long-lived connection. + .writeTimeout(0, SECONDS) // i.e., no timeout because this is a long-lived connection. + .protocols(ONLY_HTTP1) + .build(); + Request request = originalRequest.newBuilder() + .header("Upgrade", "websocket") + .header("Connection", "Upgrade") + .header("Sec-WebSocket-Key", key) + .header("Sec-WebSocket-Version", "13") + .build(); + call = Internal.instance.newWebSocketCall(client, request); call.enqueue(new Callback() { @Override public void onResponse(Call call, Response response) { try { @@ -179,8 +183,11 @@ public void connnect() { streamAllocation.noNewStreams(); // Prevent connection pooling! Streams streams = new ClientStreams(streamAllocation); + // Process all websocket messages. try { - readWebsocket(streams, response); + listener.onOpen(RealNewWebSocket.this, response); + initReaderAndWriter(streams); + loopReader(); } catch (Exception e) { failWebSocket(e, null); } @@ -211,14 +218,15 @@ private void checkResponse(Response response) throws ProtocolException { } String headerAccept = response.header("Sec-WebSocket-Accept"); - String acceptExpected = Util.shaBase64(key + WebSocketProtocol.ACCEPT_MAGIC); + String acceptExpected = ByteString.encodeUtf8(key + WebSocketProtocol.ACCEPT_MAGIC) + .sha1().base64(); if (!acceptExpected.equals(headerAccept)) { throw new ProtocolException("Expected 'Sec-WebSocket-Accept' header value '" + acceptExpected + "' but was '" + headerAccept + "'"); } } - void readWebsocket(Streams streams, Response response) throws IOException { + public void initReaderAndWriter(Streams streams) throws IOException { synchronized (this) { this.streams = streams; this.writer = new WebSocketWriter(streams.client, streams.sink, random); @@ -227,15 +235,28 @@ void readWebsocket(Streams streams, Response response) throws IOException { } } - // Receive frames until there are no more. - WebSocketReader reader = new WebSocketReader(streams.client, streams.source, this); - listener.onOpen(this, response); + reader = new WebSocketReader(streams.client, streams.source, this); + } + + /** Receive frames until there are no more. */ + public void loopReader() throws IOException { while (receivedCloseCode == -1) { // This method call results in one or more onRead* methods being called on this thread. reader.processNextFrame(); } } + /** Receive a single frame and return true if there are more frames to read. */ + boolean processNextFrame() throws IOException { + try { + reader.processNextFrame(); + return receivedCloseCode == -1; + } catch (Exception e) { + failWebSocket(e, null); + return false; + } + } + @Override public void onReadMessage(ResponseBody body) throws IOException { try { if (body.contentType().equals(WebSocket.TEXT)) { @@ -301,7 +322,7 @@ void readWebsocket(Streams streams, Response response) throws IOException { return send(bytes, OPCODE_BINARY); } - private synchronized boolean send(final ByteString data, final int formatOpcode) { + private synchronized boolean send(ByteString data, int formatOpcode) { // Don't send new frames after we've failed or enqueued a close frame. if (failed || enqueuedClose) return false; @@ -318,6 +339,15 @@ private synchronized boolean send(final ByteString data, final int formatOpcode) return true; } + public synchronized boolean pong(ByteString payload) { + // Don't send pongs after we've failed or sent the close frame. + if (failed || (enqueuedClose && messageAndCloseQueue.isEmpty())) return false; + + pongQueue.add(payload); + runWriter(); + return true; + } + @Override public synchronized boolean close(final int code, final String reason) { // TODO(jwilson): confirm reason is well-formed. (<=123 bytes, etc.) @@ -461,7 +491,7 @@ public Close(int code, String reason) { } } - abstract static class Streams implements Closeable { + public abstract static class Streams implements Closeable { final boolean client; final BufferedSource source; final BufferedSink sink; diff --git a/okhttp/src/main/java/okhttp3/internal/ws/RealWebSocket.java b/okhttp/src/main/java/okhttp3/internal/ws/RealWebSocket.java deleted file mode 100644 index 8c360d8025fd..000000000000 --- a/okhttp/src/main/java/okhttp3/internal/ws/RealWebSocket.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Copyright (C) 2014 Square, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package okhttp3.internal.ws; - -import java.io.IOException; -import java.net.ProtocolException; -import java.util.Random; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import okhttp3.MediaType; -import okhttp3.RequestBody; -import okhttp3.Response; -import okhttp3.ResponseBody; -import okhttp3.WebSocket; -import okhttp3.WebSocketListener; -import okhttp3.internal.NamedRunnable; -import okhttp3.internal.Util; -import okhttp3.internal.platform.Platform; -import okio.BufferedSink; -import okio.BufferedSource; -import okio.ByteString; -import okio.Okio; - -import static okhttp3.internal.platform.Platform.INFO; -import static okhttp3.internal.ws.WebSocketProtocol.CLOSE_ABNORMAL_TERMINATION; -import static okhttp3.internal.ws.WebSocketProtocol.CLOSE_CLIENT_GOING_AWAY; -import static okhttp3.internal.ws.WebSocketProtocol.CLOSE_PROTOCOL_EXCEPTION; -import static okhttp3.internal.ws.WebSocketProtocol.OPCODE_BINARY; -import static okhttp3.internal.ws.WebSocketProtocol.OPCODE_TEXT; -import static okhttp3.internal.ws.WebSocketReader.FrameCallback; - -/** - * An implementation of {@link WebSocket} which sits on top of {@link WebSocketReader} and - * {@link WebSocketWriter}. - * - *

Threading

- * This class deals with three threads concurrently and care must be taken to only access the - * appropriate resources on each: - * - * Instance variables have prefixes matching the thread names based on the thread on which they can - * be accessed. A prefix of "writer" indicates both "Sender" and "Replier" threads can access. - */ -public abstract class RealWebSocket implements WebSocket, FrameCallback { - private final WebSocketReader reader; - private final WebSocketListener readerListener; - /** True after a close frame was read by the reader. No frames will follow it. */ - private boolean readerSawClose; - - final WebSocketWriter writer; - /** True after calling {@link WebSocketWriter#writeClose(int, String)} to send a close frame. */ - final AtomicBoolean writerClosed = new AtomicBoolean(); - - /** Guarded by itself. Must check {@link #isShutdown} before enqueuing work. */ - private final Executor replier; - - /** True after calling {@link #close(int, String)}. No writes are allowed afterward. */ - private boolean senderSentClose; - /** True after {@link IOException}. {@link #close(int, String)} becomes only valid call. */ - private boolean senderWantsClose; - - private final Response response; - private final String name; - - /** The thread looping the reader. Will become null when looping stops for any reason. */ - private volatile Thread looperThread; - /** Guarded by {@link #replier}. True after calling {@link #shutdown()}. */ - private boolean isShutdown; - - protected RealWebSocket(boolean isClient, BufferedSource source, BufferedSink sink, Random random, - Executor replier, WebSocketListener readerListener, Response response, String name) { - this.readerListener = readerListener; - this.replier = replier; - this.response = response; - this.name = name; - - reader = new WebSocketReader(isClient, source, this); - writer = new WebSocketWriter(isClient, sink, random); - } - - ////// READER THREAD - - /** Read and process all socket messages delivering callbacks to the supplied listener. */ - public final void loopReader() { - looperThread = Thread.currentThread(); - - try { - try { - readerListener.onOpen(this, response); - } catch (Throwable t) { - Util.throwIfFatal(t); - replyToReaderError(t); - readerListener.onFailure(t, null); - return; - } - - while (processNextFrame()) { - } - } finally { - looperThread = null; - } - } - - /** - * Read a single control frame or all frames of a message from the web socket and deliver any - * notifications to the listener. Returns false when no more messages can be read. - */ - final boolean processNextFrame() { - try { - // This method call results in one or more onRead* methods being called on this thread. - reader.processNextFrame(); - - return !readerSawClose; - } catch (Throwable t) { - Util.throwIfFatal(t); - replyToReaderError(t); - if (t instanceof IOException && !(t instanceof ProtocolException)) { - readerListener.onClose(CLOSE_ABNORMAL_TERMINATION, ""); - } else { - readerListener.onFailure(t, null); - } - return false; - } - } - - @Override public final void onReadMessage(ResponseBody message) throws IOException { - readerListener.onMessage(message); - } - - @Override public final void onReadPing(ByteString buffer) { - replyToPeerPing(buffer); - } - - @Override public final void onReadPong(ByteString buffer) { - readerListener.onPong(buffer); - } - - @Override public final void onReadClose(int code, String reason) { - replyToPeerClose(code, reason); - readerSawClose = true; - readerListener.onClose(code, reason); - } - - ///// REPLIER THREAD (executed on replier, contends with sender thread) - - /** Replies with a pong when a ping frame is read from the peer. */ - private void replyToPeerPing(final ByteString payload) { - Runnable replierPong = new NamedRunnable("OkHttp %s WebSocket Pong Reply", name) { - @Override protected void execute() { - try { - writer.writePong(payload); - } catch (IOException t) { - Platform.get().log(INFO, "Unable to send pong reply in response to peer ping.", t); - } - } - }; - synchronized (replier) { - if (!isShutdown) { - replier.execute(replierPong); - } - } - } - - /** Replies and closes this web socket when a close frame is read from the peer. */ - private void replyToPeerClose(final int code, final String reason) { - Runnable replierClose = new NamedRunnable("OkHttp %s WebSocket Close Reply", name) { - @Override protected void execute() { - if (writerClosed.compareAndSet(false, true)) { - try { - writer.writeClose(code, reason); - } catch (IOException t) { - Platform.get().log(INFO, "Unable to send close reply in response to peer close.", t); - } - } - - quietlyCloseConnection(); - } - }; - synchronized (replier) { - if (!isShutdown) { - replier.execute(replierClose); - } - } - } - - private void replyToReaderError(final Throwable t) { - Runnable replierClose = new NamedRunnable("OkHttp %s WebSocket Fatal Reply", name) { - @Override protected void execute() { - if (writerClosed.compareAndSet(false, true)) { - // For protocol and runtime exceptions, try to inform the server of such. - boolean protocolException = t instanceof ProtocolException; - boolean runtimeException = !(t instanceof IOException); - if (protocolException || runtimeException) { - int code = protocolException ? CLOSE_PROTOCOL_EXCEPTION : CLOSE_CLIENT_GOING_AWAY; - try { - writer.writeClose(code, null); - } catch (IOException inner) { - Platform.get() - .log(INFO, "Unable to send close in response to listener error.", inner); - } - } - } - - quietlyCloseConnection(); - } - }; - synchronized (replier) { - if (!isShutdown) { - replier.execute(replierClose); - } - } - } - - ////// SENDER THREAD (aka user thread) - - @Override public final void message(RequestBody message) throws IOException { - if (message == null) throw new NullPointerException("message == null"); - if (senderSentClose) throw new IllegalStateException("closed"); - if (senderWantsClose) throw new IllegalStateException("must call close()"); - if (Thread.currentThread() == looperThread) { - throw new IllegalStateException("attempting to write from reader thread"); - } - - MediaType contentType = message.contentType(); - if (contentType == null) { - throw new IllegalArgumentException( - "Message content type was null. Must use WebSocket.TEXT or WebSocket.BINARY."); - } - String contentSubtype = contentType.subtype(); - - int formatOpcode; - if (WebSocket.TEXT.subtype().equals(contentSubtype)) { - formatOpcode = OPCODE_TEXT; - } else if (WebSocket.BINARY.subtype().equals(contentSubtype)) { - formatOpcode = OPCODE_BINARY; - } else { - throw new IllegalArgumentException("Unknown message content type: " - + contentType.type() + "/" + contentType.subtype() // Omit any implicitly added charset. - + ". Must use WebSocket.TEXT or WebSocket.BINARY."); - } - - BufferedSink sink = Okio.buffer(writer.newMessageSink(formatOpcode, message.contentLength())); - try { - message.writeTo(sink); - sink.close(); - } catch (IOException e) { - senderWantsClose = true; - throw e; - } - } - - @Override public final void ping(ByteString payload) throws IOException { - if (payload == null) throw new NullPointerException("payload == null"); - if (senderSentClose) throw new IllegalStateException("closed"); - if (senderWantsClose) throw new IllegalStateException("must call close()"); - if (Thread.currentThread() == looperThread) { - throw new IllegalStateException("attempting to write from reader thread"); - } - - try { - writer.writePing(payload); - } catch (IOException e) { - senderWantsClose = true; - throw e; - } - } - - /** Send an unsolicited pong with the specified payload. */ - public final void pong(ByteString payload) throws IOException { - if (payload == null) throw new NullPointerException("payload == null"); - if (senderSentClose) throw new IllegalStateException("closed"); - if (senderWantsClose) throw new IllegalStateException("must call close()"); - if (Thread.currentThread() == looperThread) { - throw new IllegalStateException("attempting to write from reader thread"); - } - - try { - writer.writePong(payload); - } catch (IOException e) { - senderWantsClose = true; - throw e; - } - } - - @Override public final void close(int code, String reason) throws IOException { - if (senderSentClose) throw new IllegalStateException("closed"); - if (Thread.currentThread() == looperThread) { - throw new IllegalStateException("attempting to write from reader thread"); - } - - senderSentClose = true; - - // Not doing a CAS because we want writer to throw if already closed via peer close. - writerClosed.set(true); - - try { - writer.writeClose(code, reason); - } catch (IOException e) { - quietlyCloseConnection(); - throw e; - } - - // NOTE: We do not close the connection here! That will happen when we read the close reply. - } - - ////// ANY THREAD - - void quietlyCloseConnection() { - synchronized (replier) { - if (isShutdown) return; - isShutdown = true; - } - try { - shutdown(); - } catch (Throwable inner) { - Util.throwIfFatal(inner); - Platform.get().log(INFO, "Unable to close web socket connection.", inner); - } - } - - /** Perform any tear-down work (close the connection, shutdown executors). */ - protected abstract void shutdown(); -} diff --git a/samples/guide/src/main/java/okhttp3/recipes/WebSocketEcho.java b/samples/guide/src/main/java/okhttp3/recipes/WebSocketEcho.java index d73717125652..817d768971dd 100644 --- a/samples/guide/src/main/java/okhttp3/recipes/WebSocketEcho.java +++ b/samples/guide/src/main/java/okhttp3/recipes/WebSocketEcho.java @@ -1,24 +1,13 @@ package okhttp3.recipes; -import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import okhttp3.NewWebSocket; import okhttp3.OkHttpClient; import okhttp3.Request; -import okhttp3.RequestBody; import okhttp3.Response; -import okhttp3.ResponseBody; -import okhttp3.WebSocket; -import okhttp3.WebSocketListener; import okio.ByteString; -import static okhttp3.WebSocket.BINARY; -import static okhttp3.WebSocket.TEXT; - -public final class WebSocketEcho implements WebSocketListener { - private final ExecutorService writeExecutor = Executors.newSingleThreadExecutor(); - +public final class WebSocketEcho extends NewWebSocket.Listener { private void run() { OkHttpClient client = new OkHttpClient.Builder() .readTimeout(0, TimeUnit.MILLISECONDS) @@ -27,48 +16,34 @@ private void run() { Request request = new Request.Builder() .url("ws://echo.websocket.org") .build(); - client.newWebSocketCall(request).enqueue(this); + client.newWebSocket(request, this); // Trigger shutdown of the dispatcher's executor so this process can exit cleanly. client.dispatcher().executorService().shutdown(); } - @Override public void onOpen(final WebSocket webSocket, Response response) { - writeExecutor.execute(new Runnable() { - @Override public void run() { - try { - webSocket.message(RequestBody.create(TEXT, "Hello...")); - webSocket.message(RequestBody.create(TEXT, "...World!")); - webSocket.message(RequestBody.create(BINARY, ByteString.decodeHex("deadbeef"))); - webSocket.close(1000, "Goodbye, World!"); - } catch (IOException e) { - System.err.println("Unable to send messages: " + e.getMessage()); - } - } - }); + @Override public void onOpen(NewWebSocket webSocket, Response response) { + webSocket.send("Hello..."); + webSocket.send("...World!"); + webSocket.send(ByteString.decodeHex("deadbeef")); + webSocket.close(1000, "Goodbye, World!"); } - @Override public void onMessage(ResponseBody message) throws IOException { - if (message.contentType() == TEXT) { - System.out.println("MESSAGE: " + message.string()); - } else { - System.out.println("MESSAGE: " + message.source().readByteString().hex()); - } - message.close(); + @Override public void onMessage(NewWebSocket webSocket, String text) { + System.out.println("MESSAGE: " + text); } - @Override public void onPong(ByteString payload) { - System.out.println("PONG: " + payload.utf8()); + @Override public void onMessage(NewWebSocket webSocket, ByteString bytes) { + System.out.println("MESSAGE: " + bytes.hex()); } - @Override public void onClose(int code, String reason) { + @Override public void onClosing(NewWebSocket webSocket, int code, String reason) { + webSocket.close(1000, null); System.out.println("CLOSE: " + code + " " + reason); - writeExecutor.shutdown(); } - @Override public void onFailure(Throwable t, Response response) { + @Override public void onFailure(NewWebSocket webSocket, Throwable t, Response response) { t.printStackTrace(); - writeExecutor.shutdown(); } public static void main(String... args) { diff --git a/samples/slack/src/main/java/okhttp3/slack/RtmSession.java b/samples/slack/src/main/java/okhttp3/slack/RtmSession.java index 83011d44ca88..17b0a7bed3de 100644 --- a/samples/slack/src/main/java/okhttp3/slack/RtmSession.java +++ b/samples/slack/src/main/java/okhttp3/slack/RtmSession.java @@ -17,73 +17,58 @@ import java.io.Closeable; import java.io.IOException; +import okhttp3.NewWebSocket; import okhttp3.Response; -import okhttp3.ResponseBody; -import okhttp3.WebSocket; -import okhttp3.WebSocketCall; -import okhttp3.WebSocketListener; -import okio.ByteString; /** A realtime messaging session. */ -public final class RtmSession implements WebSocketListener, Closeable { +public final class RtmSession extends NewWebSocket.Listener implements Closeable { private final SlackApi slackApi; - private WebSocketCall webSocketCall; /** Guarded by this. */ - private WebSocket webSocket; + private NewWebSocket webSocket; public RtmSession(SlackApi slackApi) { this.slackApi = slackApi; } public void open(String accessToken) throws IOException { - if (webSocketCall != null) throw new IllegalStateException(); + if (webSocket != null) throw new IllegalStateException(); RtmStartResponse rtmStartResponse = slackApi.rtmStart(accessToken); - webSocketCall = slackApi.rtm(rtmStartResponse.url); - webSocketCall.enqueue(this); + webSocket = slackApi.rtm(rtmStartResponse.url, this); } // TODO(jwilson): can I read the response body? Do I have to? // the body from slack is a 0-byte-buffer - @Override public synchronized void onOpen(WebSocket webSocket, Response response) { + @Override public synchronized void onOpen(NewWebSocket webSocket, Response response) { System.out.println("onOpen: " + response); - this.webSocket = webSocket; } // TOOD(jwilson): decode incoming messages and dispatch them somewhere. - @Override public void onMessage(ResponseBody message) throws IOException { - System.out.println("onMessage: " + message.string()); + @Override public void onMessage(NewWebSocket webSocket, String text) { + System.out.println("onMessage: " + text); } - @Override public void onPong(ByteString payload) { - System.out.println("onPong: " + payload); - } - - @Override public void onClose(int code, String reason) { + @Override public void onClosing(NewWebSocket webSocket, int code, String reason) { + webSocket.close(1000, null); System.out.println("onClose (" + code + "): " + reason); } - // TODO(jwilson): can I read the response body? Do I have to? - @Override public void onFailure(Throwable t, Response response) { + @Override public void onFailure(NewWebSocket webSocket, Throwable t, Response response) { + // TODO(jwilson): can I read the response body? Do I have to? System.out.println("onFailure " + response); } @Override public void close() throws IOException { - if (webSocketCall == null) return; + if (webSocket == null) return; - WebSocket webSocket; + NewWebSocket webSocket; synchronized (this) { webSocket = this.webSocket; } - // TODO(jwilson): Racy? Is there an interleaving of events where the websocket is not closed? - // Our docs say we can’t close if we have an active writer: that seems like it - // could cause problems? if (webSocket != null) { webSocket.close(1000, "bye"); - } else { - webSocketCall.cancel(); } } } diff --git a/samples/slack/src/main/java/okhttp3/slack/SlackApi.java b/samples/slack/src/main/java/okhttp3/slack/SlackApi.java index 0d0086b579c0..3bfee53492b0 100644 --- a/samples/slack/src/main/java/okhttp3/slack/SlackApi.java +++ b/samples/slack/src/main/java/okhttp3/slack/SlackApi.java @@ -22,10 +22,10 @@ import java.io.IOException; import okhttp3.Call; import okhttp3.HttpUrl; +import okhttp3.NewWebSocket; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; -import okhttp3.WebSocketCall; import okio.ByteString; /** @@ -105,10 +105,10 @@ public RtmStartResponse rtmStart(String accessToken) throws IOException { } /** See https://api.slack.com/rtm. */ - public WebSocketCall rtm(HttpUrl url) { - return httpClient.newWebSocketCall(new Request.Builder() + public NewWebSocket rtm(HttpUrl url, NewWebSocket.Listener listener) { + return httpClient.newWebSocket(new Request.Builder() .url(url) - .build()); + .build(), listener); } static final class SlackJsonAdapters {