Skip to content

Commit

Permalink
Improve eventStream function (sergiodxa#212)
Browse files Browse the repository at this point in the history
- Accept all ResponseInit options, not just headers
- Warn when using headers that needs to be overwritten
- Write tests
  • Loading branch information
sergiodxa authored Jul 3, 2023
1 parent 85b5e30 commit cf030c6
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 7 deletions.
26 changes: 19 additions & 7 deletions src/server/event-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ interface SendFunctionArgs {
data: string;
}

interface EventStreamOptions {
headers: HeadersInit;
}

interface SendFunction {
(args: SendFunctionArgs): void;
}
Expand All @@ -32,7 +28,11 @@ interface InitFunction {
* @param init The function that will be called to initialize the stream, here you can subscribe to your events
* @returns A Response object that can be returned from a loader
*/
export function eventStream(signal: AbortSignal, init: InitFunction, options: EventStreamOptions = { headers: {} }) {
export function eventStream(
signal: AbortSignal,
init: InitFunction,
options: ResponseInit = {}
) {
let stream = new ReadableStream({
start(controller) {
let encoder = new TextEncoder();
Expand Down Expand Up @@ -60,10 +60,22 @@ export function eventStream(signal: AbortSignal, init: InitFunction, options: Ev
},
});

const headers = new Headers(options.headers);
let headers = new Headers(options.headers);

if (headers.has("Content-Type")) {
console.warn("Overriding Content-Type header to `text/event-stream`");
}

if (headers.has("Cache-Control")) {
console.warn("Overriding Cache-Control header to `no-cache`");
}

if (headers.has("Connection")) {
console.warn("Overriding Connection header to `keep-alive`");
}

headers.set("Content-Type", "text/event-stream");
headers.set("Cache-Control", "no-cache");
headers.set("Cache-Control", "no-cache");
headers.set("Connection", "keep-alive");

return new Response(stream, { headers });
Expand Down
104 changes: 104 additions & 0 deletions test/server/event-stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/* eslint-disable @typescript-eslint/no-empty-function */
import { eventStream } from "../../src";

describe(eventStream, () => {
test("returns a response", () => {
let controller = new AbortController();
let response = eventStream(controller.signal, (_, __) => {
return () => {};
});
controller.abort();
expect(response).toBeInstanceOf(Response);
});

test("response is a readable stream", async () => {
let controller = new AbortController();
let response = eventStream(controller.signal, (_, __) => {
return () => {};
});
controller.abort();
if (!response.body) throw new Error("Response body is undefined");
let reader = response.body.getReader();
let { done } = await reader.read();
expect(done).toBe(true);
});

test("can send data to the client with the send function", async () => {
let controller = new AbortController();
let response = eventStream(controller.signal, (send, _) => {
send({ data: "hello" });
return () => {};
});

controller.abort();

if (!response.body) throw new Error("Response body is undefined");

let reader = response.body.getReader();

let { value: event } = await reader.read();
expect(event).toEqual(new TextEncoder().encode("event: message\n"));

let { value: data } = await reader.read();
expect(data).toEqual(new TextEncoder().encode("data: hello\n\n"));

let { done } = await reader.read();
expect(done).toBe(true);
});

describe("Headers Overrides", () => {
test("overrrides Content-Type header", () => {
let spy = jest.spyOn(console, "warn").mockImplementation(() => {});

let response = eventStream(
new AbortController().signal,
(_, abort) => {
return () => abort();
},
{ headers: { "Content-Type": "text/html" } }
);

expect(spy).toHaveBeenCalledWith(
"Overriding Content-Type header to `text/event-stream`"
);

expect(response.headers.get("Content-Type")).toBe("text/event-stream");
});

test("overrides Cache-Control", () => {
let spy = jest.spyOn(console, "warn").mockImplementation(() => {});

let response = eventStream(
new AbortController().signal,
(_, abort) => {
return () => abort();
},
{ headers: { "Cache-Control": "max-age=60" } }
);

expect(spy).toHaveBeenCalledWith(
"Overriding Cache-Control header to `no-cache`"
);

expect(response.headers.get("Content-Type")).toBe("text/event-stream");
});

test("overrides Connection", () => {
let spy = jest.spyOn(console, "warn").mockImplementation(() => {});

let response = eventStream(
new AbortController().signal,
(_, abort) => {
return () => abort();
},
{ headers: { Connection: "close" } }
);

expect(spy).toHaveBeenCalledWith(
"Overriding Connection header to `keep-alive`"
);

expect(response.headers.get("Content-Type")).toBe("text/event-stream");
});
});
});

0 comments on commit cf030c6

Please sign in to comment.