Skip to content

Commit

Permalink
Merge pull request square#1669 from square/jw/unblock
Browse files Browse the repository at this point in the history
Call onOpen on the reader thread and force writing on another thread.
  • Loading branch information
JakeWharton committed May 22, 2015
2 parents ce09d95 + 2cde676 commit f1d137e
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -670,19 +670,8 @@ private void handleWebSocketUpgrade(Socket socket, BufferedSource source, Buffer
.protocol(Protocol.HTTP_1_1)
.build();

// The callback might act synchronously. Give it its own thread.
new Thread(new Runnable() {
@Override public void run() {
try {
listener.onOpen(webSocket, fancyResponse);
} catch (IOException e) {
// TODO try to write close frame?
connectionClose.countDown();
}
}
}, "MockWebServer WebSocket Writer " + request.getPath()).start();
listener.onOpen(webSocket, fancyResponse);

// Use this thread to continuously read messages.
while (webSocket.readMessage()) {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ private void runTest(final long number, final long count) throws IOException {
private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
private WebSocket webSocket;

@Override public void onOpen(WebSocket webSocket, Response response)
throws IOException {
@Override public void onOpen(WebSocket webSocket, Response response) {
System.out.println("Executing test case " + number + "/" + count);
this.webSocket = webSocket;
}
Expand Down Expand Up @@ -118,8 +117,7 @@ private long getTestCount() throws IOException {
final AtomicLong countRef = new AtomicLong();
final AtomicReference<IOException> failureRef = new AtomicReference<>();
newWebSocket("/getCaseCount").enqueue(new WebSocketListener() {
@Override public void onOpen(WebSocket webSocket, Response response)
throws IOException {
@Override public void onOpen(WebSocket webSocket, Response response) {
}

@Override public void onMessage(BufferedSource payload, WebSocket.PayloadType type)
Expand Down Expand Up @@ -157,8 +155,7 @@ private long getTestCount() throws IOException {
private void updateReports() {
final CountDownLatch latch = new CountDownLatch(1);
newWebSocket("/updateReports?agent=" + Version.userAgent()).enqueue(new WebSocketListener() {
@Override public void onOpen(WebSocket webSocket, Response response)
throws IOException {
@Override public void onOpen(WebSocket webSocket, Response response) {
}

@Override public void onMessage(BufferedSource payload, WebSocket.PayloadType type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,16 @@ public final class WebSocketCallTest {

@Test public void serverMessage() throws IOException {
WebSocketListener serverListener = new EmptyWebSocketListener() {
@Override public void onOpen(WebSocket webSocket, Response response)
throws IOException {
webSocket.sendMessage(TEXT, new Buffer().writeUtf8("Hello, WebSockets!"));
@Override public void onOpen(final WebSocket webSocket, Response response) {
new Thread() {
@Override public void run() {
try {
webSocket.sendMessage(TEXT, new Buffer().writeUtf8("Hello, WebSockets!"));
} catch (IOException e) {
throw new AssertionError(e);
}
}
}.start();
}
};
server.enqueue(new MockResponse().withWebSocketUpgrade(serverListener));
Expand All @@ -96,12 +103,19 @@ public final class WebSocketCallTest {

@Test public void serverStreamingMessage() throws IOException {
WebSocketListener serverListener = new EmptyWebSocketListener() {
@Override public void onOpen(WebSocket webSocket, Response response)
throws IOException {
BufferedSink sink = webSocket.newMessageSink(TEXT);
sink.writeUtf8("Hello, ").flush();
sink.writeUtf8("WebSockets!").flush();
sink.close();
@Override public void onOpen(final WebSocket webSocket, Response response) {
new Thread() {
@Override public void run() {
try {
BufferedSink sink = webSocket.newMessageSink(TEXT);
sink.writeUtf8("Hello, ").flush();
sink.writeUtf8("WebSockets!").flush();
sink.close();
} catch (IOException e) {
throw new AssertionError(e);
}
}
}.start();
}
};
server.enqueue(new MockResponse().withWebSocketUpgrade(serverListener));
Expand Down Expand Up @@ -235,8 +249,7 @@ private WebSocket awaitWebSocket(Request request) {
final AtomicReference<IOException> failureRef = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
call.enqueue(new WebSocketListener() {
@Override public void onOpen(WebSocket webSocket, Response response)
throws IOException {
@Override public void onOpen(WebSocket webSocket, Response response) {
webSocketRef.set(webSocket);
responseRef.set(response);
latch.countDown();
Expand Down Expand Up @@ -274,8 +287,7 @@ private WebSocket awaitWebSocket(Request request) {
}

private static class EmptyWebSocketListener implements WebSocketListener {
@Override public void onOpen(WebSocket webSocket, Response response)
throws IOException {
@Override public void onOpen(WebSocket webSocket, Response response) {
}

@Override public void onMessage(BufferedSource payload, WebSocket.PayloadType type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.squareup.okhttp.Request;
import com.squareup.okhttp.Response;
import com.squareup.okhttp.internal.Internal;
import com.squareup.okhttp.internal.NamedRunnable;
import com.squareup.okhttp.internal.Util;
import com.squareup.okhttp.internal.ws.RealWebSocket;
import com.squareup.okhttp.internal.ws.WebSocketProtocol;
Expand Down Expand Up @@ -169,13 +168,8 @@ private void createWebSocket(Response response, WebSocketListener listener)

listener.onOpen(webSocket, response);

// Start a dedicated thread for reading the web socket.
new Thread(new NamedRunnable("OkHttp WebSocket reader %s", request.urlString()) {
@Override protected void execute() {
while (webSocket.readMessage()) {
}
}
}).start();
while (webSocket.readMessage()) {
}
}

// Keep static so that the WebSocketCall instance can be garbage collected.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,15 @@
/** Listener for server-initiated messages on a connected {@link WebSocket}. */
public interface WebSocketListener {
/**
* Called when the request has successfully been upgraded to a web socket.
* Called when the request has successfully been upgraded to a web socket. This method is called
* on the message reading thread to allow setting up any state before the
* {@linkplain #onMessage message}, {@linkplain #onPong pong}, and {@link #onClose close}
* callbacks start.
* <p>
* <b>Do not</b> use this callback to write to the web socket. Start a new thread or use
* another thread in your application.
*/
void onOpen(WebSocket webSocket, Response response) throws IOException;
void onOpen(WebSocket webSocket, Response response);

/**
* Called when the transport or protocol layer of this web socket errors during communication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.squareup.okhttp.ws.WebSocketCall;
import com.squareup.okhttp.ws.WebSocketListener;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import okio.Buffer;
import okio.BufferedSource;

Expand All @@ -15,6 +17,8 @@
import static com.squareup.okhttp.ws.WebSocket.PayloadType.TEXT;

public final class WebSocketEcho implements WebSocketListener {
private final Executor writeExecutor = Executors.newSingleThreadExecutor();

private void run() throws IOException {
OkHttpClient client = new OkHttpClient();

Expand All @@ -27,21 +31,28 @@ private void run() throws IOException {
client.getDispatcher().getExecutorService().shutdown();
}

@Override public void onOpen(WebSocket webSocket, Response response)
throws IOException {
webSocket.sendMessage(TEXT, new Buffer().writeUtf8("Hello..."));
webSocket.sendMessage(TEXT, new Buffer().writeUtf8("...World!"));
webSocket.sendMessage(BINARY, new Buffer().writeInt(0xdeadbeef));
webSocket.close(1000, "Goodbye, World!");
@Override public void onOpen(final WebSocket webSocket, Response response) {
writeExecutor.execute(new Runnable() {
@Override public void run() {
try {
webSocket.sendMessage(TEXT, new Buffer().writeUtf8("Hello..."));
webSocket.sendMessage(TEXT, new Buffer().writeUtf8("...World!"));
webSocket.sendMessage(BINARY, new Buffer().writeInt(0xdeadbeef));
webSocket.close(1000, "Goodbye, World!");
} catch (IOException e) {
System.err.println("Unable to send messages: " + e.getMessage());
}
}
});
}

@Override public void onMessage(BufferedSource payload, PayloadType type) throws IOException {
switch (type) {
case TEXT:
System.out.println(payload.readUtf8());
System.out.println("MESSAGE: " + payload.readUtf8());
break;
case BINARY:
System.out.println(payload.readByteString().hex());
System.out.println("MESSAGE: " + payload.readByteString().hex());
break;
default:
throw new IllegalStateException("Unknown payload type: " + type);
Expand Down

0 comments on commit f1d137e

Please sign in to comment.