Skip to content

Commit

Permalink
Additional error checking for SSE (square#4082)
Browse files Browse the repository at this point in the history
* Additional error checking for SSE
* move to sse package, more tests, kotlin null friendly
  • Loading branch information
yschimke authored Jun 21, 2018
1 parent 67bb8b2 commit f6502e8
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 24 deletions.
12 changes: 12 additions & 0 deletions okhttp-sse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@
</links>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<archive>
<manifestEntries>
<Automatic-Module-Name>okhttp3.sse</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
50 changes: 35 additions & 15 deletions okhttp-sse/src/main/java/okhttp3/internal/sse/RealEventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.EventListener;
import okhttp3.EventSource;
import okhttp3.EventSourceListener;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.internal.Util;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okio.BufferedSource;

public final class RealEventSource
Expand All @@ -50,24 +51,43 @@ public void connect(OkHttpClient client) {
}

@Override public void onResponse(Call call, Response response) {
//noinspection ConstantConditions main body is never null
BufferedSource source = response.body().source();
ServerSentEventReader reader = new ServerSentEventReader(source, this);
try {
//noinspection ConstantConditions main body is never null
BufferedSource source = response.body().source();
ServerSentEventReader reader = new ServerSentEventReader(source, this);

response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
if (!response.isSuccessful()) {
listener.onFailure(this, null, response);
return;
}

try {
listener.onOpen(this, response);
while (reader.processNextEvent()) {
MediaType contentType = response.body().contentType();
if (!isEventStream(contentType)) {
listener.onFailure(this,
new IllegalStateException("Invalid content-type: " + contentType), response);
return;
}
} catch (Exception e) {
listener.onFailure(this, e, response);
return;

response = response.newBuilder().body(Util.EMPTY_RESPONSE).build();

try {
listener.onOpen(this, response);
while (reader.processNextEvent()) {
}
} catch (Exception e) {
listener.onFailure(this, e, response);
return;
}

listener.onClosed(this);
} finally {
response.close();
}
}

listener.onClosed(this);
private static boolean isEventStream(@Nullable MediaType contentType) {
return contentType != null && contentType.type().equals("text") && contentType.subtype()
.equals("event-stream");
}

@Override public void onFailure(Call call, IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/** Private support classes for server-sent events. */
@javax.annotation.ParametersAreNonnullByDefault
package okhttp3.internal.sse;
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3;
package okhttp3.sse;

import okhttp3.Request;

public interface EventSource {
/** Returns the original request that initiated this event source. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3;
package okhttp3.sse;

import javax.annotation.Nullable;
import okhttp3.Response;

public abstract class EventSourceListener {
/**
Expand Down Expand Up @@ -43,6 +44,7 @@ public void onClosed(EventSource eventSource) {
* Invoked when an event source has been closed due to an error reading from or writing to the
* network. Incoming events may have been lost. No further calls to this listener will be made.
*/
public void onFailure(EventSource eventSource, Throwable t, @Nullable Response response) {
public void onFailure(EventSource eventSource, @Nullable Throwable t,
@Nullable Response response) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okhttp3;
package okhttp3.sse;

import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.internal.sse.RealEventSource;

public final class EventSources {
Expand Down
3 changes: 3 additions & 0 deletions okhttp-sse/src/main/java/okhttp3/sse/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/** Support for server-sent events. */
@javax.annotation.ParametersAreNonnullByDefault
package okhttp3.sse;
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
*/
package okhttp3.internal.sse;

import okhttp3.EventSource;
import okhttp3.EventSources;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSources;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public final class EventSourceHttpTest {
@Rule public final MockWebServer server = new MockWebServer();

Expand All @@ -38,14 +40,35 @@ public final class EventSourceHttpTest {
@Test public void event() {
server.enqueue(new MockResponse().setBody(""
+ "data: hey\n"
+ "\n"));
+ "\n").setHeader("content-type", "text/event-stream"));

EventSource source = newEventSource();

assertEquals("/", source.request().url().encodedPath());

listener.assertOpen();
listener.assertEvent(null, null, "hey");
listener.assertClose();
}

@Test public void badContentType() {
server.enqueue(new MockResponse().setBody(""
+ "data: hey\n"
+ "\n").setHeader("content-type", "text/plain"));

EventSource source = newEventSource();
listener.assertFailure("Invalid content-type: text/plain");
}

@Test public void badResponseCode() {
server.enqueue(new MockResponse().setBody(""
+ "data: hey\n"
+ "\n").setHeader("content-type", "text/event-stream").setResponseCode(401));

EventSource source = newEventSource();
listener.assertFailure(null);
}

private EventSource newEventSource() {
Request request = new Request.Builder()
.url(server.url("/"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import javax.annotation.Nullable;
import okhttp3.EventSource;
import okhttp3.EventSourceListener;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.Response;
import okhttp3.internal.platform.Platform;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

public final class EventSourceRecorder extends EventSourceListener {
Expand Down Expand Up @@ -89,6 +90,18 @@ public void assertClose() {
}
}

public void assertFailure(@Nullable String message) {
Object event = nextEvent();
if (!(event instanceof Failure)) {
throw new AssertionError("Expected Failure but was " + event);
}
if (message != null) {
assertEquals(message, ((Failure) event).t.getMessage());
} else {
assertNull(((Failure) event).t);
}
}

static final class Open {
final EventSource eventSource;
final Response response;
Expand Down

0 comments on commit f6502e8

Please sign in to comment.