Skip to content

Commit

Permalink
Migrate MockWebServer to the async web sockets API.
Browse files Browse the repository at this point in the history
  • Loading branch information
squarejesse committed Nov 19, 2016
1 parent aa09514 commit 40dddb9
Show file tree
Hide file tree
Showing 14 changed files with 268 additions and 1,605 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import okhttp3.Headers;
import okhttp3.NewWebSocket;
import okhttp3.internal.Internal;
import okhttp3.internal.http2.Settings;
import okhttp3.WebSocketListener;
import okio.Buffer;

/** A scripted response to be replayed by the mock web server. */
Expand All @@ -45,7 +45,7 @@ public final class MockResponse implements Cloneable {

private List<PushPromise> promises = new ArrayList<>();
private Settings settings;
private WebSocketListener webSocketListener;
private NewWebSocket.Listener webSocketListener;

/** Creates a new mock response with an empty body. */
public MockResponse() {
Expand Down Expand Up @@ -284,7 +284,7 @@ public Settings getSettings() {
* Attempts to perform a web socket upgrade on the connection. This will overwrite any previously
* set status or body.
*/
public MockResponse withWebSocketUpgrade(WebSocketListener listener) {
public MockResponse withWebSocketUpgrade(NewWebSocket.Listener listener) {
setStatus("HTTP/1.1 101 Switching Protocols");
setHeader("Connection", "Upgrade");
setHeader("Upgrade", "websocket");
Expand All @@ -293,7 +293,7 @@ public MockResponse withWebSocketUpgrade(WebSocketListener listener) {
return this;
}

public WebSocketListener getWebSocketListener() {
public NewWebSocket.Listener getWebSocketListener() {
return webSocketListener;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
import okhttp3.internal.http2.Http2Stream;
import okhttp3.internal.http2.Settings;
import okhttp3.internal.platform.Platform;
import okhttp3.internal.ws.RealWebSocket;
import okhttp3.internal.ws.RealNewWebSocket;
import okhttp3.internal.ws.WebSocketProtocol;
import okio.Buffer;
import okio.BufferedSink;
Expand Down Expand Up @@ -676,14 +676,15 @@ private void handleWebSocketUpgrade(Socket socket, BufferedSource source, Buffer
replyExecutor.allowCoreThreadTimeOut(true);

final CountDownLatch connectionClose = new CountDownLatch(1);
RealWebSocket webSocket =
new RealWebSocket(false /* is server */, source, sink, new SecureRandom(), replyExecutor,
response.getWebSocketListener(), fancyResponse, name) {
@Override protected void shutdown() {
connectionClose.countDown();
}
};

RealNewWebSocket.Streams streams = new RealNewWebSocket.Streams(false, source, sink) {
@Override public void close() {
connectionClose.countDown();
}
};
RealNewWebSocket webSocket = new RealNewWebSocket(fancyRequest,
response.getWebSocketListener(), new SecureRandom());
response.getWebSocketListener().onOpen(webSocket, fancyResponse);
webSocket.initReaderAndWriter(streams);
webSocket.loopReader();

// Even if messages are no longer being read we need to wait for the connection close signal.
Expand Down
113 changes: 36 additions & 77 deletions okhttp-tests/src/main/java/okhttp3/AutobahnTester.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,12 @@

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.internal.Version;
import okio.BufferedSource;
import okio.ByteString;

import static okhttp3.WebSocket.BINARY;
import static okhttp3.WebSocket.TEXT;

/**
* Exercises the web socket implementation against the <a
* href="http://autobahn.ws/testsuite/">Autobahn Testsuite</a>.
Expand All @@ -42,9 +36,9 @@ public static void main(String... args) throws IOException {

final OkHttpClient client = new OkHttpClient();

private WebSocketCall newWebSocket(String path) {
private NewWebSocket newWebSocket(String path, NewWebSocket.Listener listener) {
Request request = new Request.Builder().url(HOST + path).build();
return client.newWebSocketCall(request);
return client.newWebSocket(request, listener);
}

public void run() throws IOException {
Expand All @@ -65,51 +59,30 @@ public void run() throws IOException {
private void runTest(final long number, final long count) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicLong startNanos = new AtomicLong();
newWebSocket("/runCase?case=" + number + "&agent=okhttp") //
.enqueue(new WebSocketListener() {
private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
private WebSocket webSocket;

@Override public void onOpen(WebSocket webSocket, Response response) {
this.webSocket = webSocket;

System.out.println("Executing test case " + number + "/" + count);
startNanos.set(System.nanoTime());
}

@Override public void onMessage(final ResponseBody message) throws IOException {
final RequestBody response;
if (message.contentType() == TEXT) {
response = RequestBody.create(TEXT, message.string());
} else {
BufferedSource source = message.source();
response = RequestBody.create(BINARY, source.readByteString());
source.close();
}
sendExecutor.execute(new Runnable() {
@Override public void run() {
try {
webSocket.message(response);
} catch (IOException e) {
e.printStackTrace(System.out);
}
}
});
}

@Override public void onPong(ByteString payload) {
}

@Override public void onClose(int code, String reason) {
sendExecutor.shutdown();
latch.countDown();
}

@Override public void onFailure(Throwable t, Response response) {
t.printStackTrace(System.out);
latch.countDown();
}
});
newWebSocket("/runCase?case=" + number + "&agent=okhttp", new NewWebSocket.Listener() {
@Override public void onOpen(NewWebSocket webSocket, Response response) {
System.out.println("Executing test case " + number + "/" + count);
startNanos.set(System.nanoTime());
}

@Override public void onMessage(final NewWebSocket webSocket, final ByteString bytes) {
webSocket.send(bytes);
}

@Override public void onMessage(final NewWebSocket webSocket, final String text) {
webSocket.send(text);
}

@Override public void onClosing(NewWebSocket webSocket, int code, String reason) {
webSocket.close(1000, null);
latch.countDown();
}

@Override public void onFailure(NewWebSocket webSocket, Throwable t, Response response) {
t.printStackTrace(System.out);
latch.countDown();
}
});
try {
if (!latch.await(30, TimeUnit.SECONDS)) {
throw new IllegalStateException("Timed out waiting for test " + number + " to finish.");
Expand All @@ -127,23 +100,17 @@ private long getTestCount() throws IOException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicLong countRef = new AtomicLong();
final AtomicReference<Throwable> failureRef = new AtomicReference<>();
newWebSocket("/getCaseCount").enqueue(new WebSocketListener() {
@Override public void onOpen(WebSocket webSocket, Response response) {
}

@Override public void onMessage(ResponseBody message) throws IOException {
countRef.set(message.source().readDecimalLong());
message.close();
newWebSocket("/getCaseCount", new NewWebSocket.Listener() {
@Override public void onMessage(NewWebSocket webSocket, String text) {
countRef.set(Long.parseLong(text));
}

@Override public void onPong(ByteString payload) {
}

@Override public void onClose(int code, String reason) {
@Override public void onClosing(NewWebSocket webSocket, int code, String reason) {
webSocket.close(1000, null);
latch.countDown();
}

@Override public void onFailure(Throwable t, Response response) {
@Override public void onFailure(NewWebSocket webSocket, Throwable t, Response response) {
failureRef.set(t);
latch.countDown();
}
Expand All @@ -164,21 +131,13 @@ private long getTestCount() throws IOException {

private void updateReports() {
final CountDownLatch latch = new CountDownLatch(1);
newWebSocket("/updateReports?agent=" + Version.userAgent()).enqueue(new WebSocketListener() {
@Override public void onOpen(WebSocket webSocket, Response response) {
}

@Override public void onMessage(ResponseBody message) throws IOException {
}

@Override public void onPong(ByteString payload) {
}

@Override public void onClose(int code, String reason) {
newWebSocket("/updateReports?agent=" + Version.userAgent(), new NewWebSocket.Listener() {
@Override public void onClosing(NewWebSocket webSocket, int code, String reason) {
webSocket.close(code, null);
latch.countDown();
}

@Override public void onFailure(Throwable t, Response response) {
@Override public void onFailure(NewWebSocket webSocket, Throwable t, Response response) {
latch.countDown();
}
});
Expand Down
Loading

0 comments on commit 40dddb9

Please sign in to comment.