Skip to content

Commit

Permalink
feat: support async iterables for response.body
Browse files Browse the repository at this point in the history
Closes #267
  • Loading branch information
kitsonk committed Jan 25, 2021
1 parent d21efc8 commit ac190df
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 6 deletions.
59 changes: 59 additions & 0 deletions async_iterable_reader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2018-2021 the oak authors. All rights reserved. MIT license.

import { copyBytes } from "./deps.ts";

export class AsyncIterableReader<T> implements Deno.Reader {
#asyncIterator: AsyncIterator<T>;
#closed = false;
#current: Uint8Array | undefined;
#processValue: (value: T) => Uint8Array;

constructor(
asyncIterable: AsyncIterable<T>,
processValue: (value: T) => Uint8Array,
) {
this.#asyncIterator = asyncIterable[Symbol.asyncIterator]();
this.#processValue = processValue;
}

#close = () => {
if (this.#asyncIterator.return) {
this.#asyncIterator.return();
}
// deno-lint-ignore no-explicit-any
(this as any).#asyncIterator = undefined;
this.#closed = true;
};

async read(p: Uint8Array): Promise<number | null> {
if (this.#closed) {
return null;
}
if (p.byteLength === 0) {
this.#close();
return 0;
}
if (!this.#current) {
const { value, done } = await this.#asyncIterator.next();
if (done) {
this.#close();
}
if (value !== undefined) {
this.#current = this.#processValue(value);
}
}
if (!this.#current) {
if (!this.#closed) {
this.#close();
}
return null;
}
const len = copyBytes(this.#current, p);
if (len >= this.#current.byteLength) {
this.#current = undefined;
} else {
this.#current = this.#current.slice(len);
}
return len;
}
}
88 changes: 88 additions & 0 deletions async_iterable_reader_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2018-2021 the oak authors. All rights reserved. MIT license.

import { assert, assertEquals, test } from "./test_deps.ts";

import { AsyncIterableReader } from "./async_iterable_reader.ts";

const encoder = new TextEncoder();
const decoder = new TextDecoder();

test({
name: "AsyncIterableReader - basic",
async fn() {
const rs = new ReadableStream<string>({
start(controller) {
controller.enqueue("hello deno");
controller.close();
},
});

const air = new AsyncIterableReader(rs, encoder.encode);

let buf = new Uint8Array(1000);
let bytesRead = await air.read(buf);
assertEquals(bytesRead, 10);
assert(decoder.decode(buf).startsWith("hello deno"));

buf = new Uint8Array(1000);
bytesRead = await air.read(buf);
assertEquals(bytesRead, null);
},
});

test({
name: "AsyncIterableReader - multiple chunks",
async fn() {
const rs = new ReadableStream<string>({
start(controller) {
controller.enqueue("hello");
controller.enqueue("deno");
controller.close();
},
});

const air = new AsyncIterableReader(rs, encoder.encode);

let buf = new Uint8Array(1000);
let bytesRead = await air.read(buf);
assertEquals(bytesRead, 5);
assert(decoder.decode(buf).startsWith("hello"));

buf = new Uint8Array(1000);
bytesRead = await air.read(buf);
assertEquals(bytesRead, 4);
assert(decoder.decode(buf).startsWith("deno"));

buf = new Uint8Array(1000);
bytesRead = await air.read(buf);
assertEquals(bytesRead, null);
},
});

test({
name: "AsyncIterableReader - overflow",
async fn() {
const rs = new ReadableStream<string>({
start(controller) {
controller.enqueue("hello deno");
controller.close();
},
});

const air = new AsyncIterableReader(rs, encoder.encode);

let buf = new Uint8Array(5);
let bytesRead = await air.read(buf);
assertEquals(bytesRead, 5);
assert(decoder.decode(buf).startsWith("hello"));

buf = new Uint8Array(5);
bytesRead = await air.read(buf);
assertEquals(bytesRead, 5);
assert(decoder.decode(buf).startsWith(" deno"));

buf = new Uint8Array(5);
bytesRead = await air.read(buf);
assertEquals(bytesRead, null);
},
});
37 changes: 31 additions & 6 deletions response.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2018-2020 the oak authors. All rights reserved. MIT license.

import { AsyncIterableReader } from "./async_iterable_reader.ts";
import { contentType, Status } from "./deps.ts";
import type { Request } from "./request.ts";
import type { ServerResponse } from "./types.d.ts";
Expand Down Expand Up @@ -40,11 +41,29 @@ const BODY_TYPES = ["string", "number", "bigint", "boolean", "symbol"];

const encoder = new TextEncoder();

/** Guard for `Deno.Reader`. */
/** Guard for Async Iterables */
// deno-lint-ignore no-explicit-any
function isReader(value: any): value is Deno.Reader {
return value && typeof value === "object" && "read" in value &&
typeof value.read === "function";
function isAsyncIterable(value: unknown): value is AsyncIterable<any> {
return typeof value === "object" && value !== null &&
Symbol.asyncIterator in value &&
// deno-lint-ignore no-explicit-any
typeof (value as any)[Symbol.asyncIterator] === "function";
}

/** Guard for `Deno.Reader`. */
function isReader(value: unknown): value is Deno.Reader {
return typeof value === "object" && value !== null && "read" in value &&
typeof (value as Record<string, unknown>).read === "function";
}

function toUint8Array(body: Body): Uint8Array {
let bodyText: string;
if (BODY_TYPES.includes(typeof body)) {
bodyText = String(body);
} else {
bodyText = JSON.stringify(body);
}
return encoder.encode(bodyText);
}

async function convertBody(
Expand All @@ -58,6 +77,8 @@ async function convertBody(
type = type ?? (isHtml(bodyText) ? "html" : "text/plain");
} else if (body instanceof Uint8Array || isReader(body)) {
result = body;
} else if (isAsyncIterable(body)) {
result = new AsyncIterableReader(body, toUint8Array);
} else if (body && typeof body === "object") {
result = encoder.encode(JSON.stringify(body));
type = type ?? "json";
Expand Down Expand Up @@ -99,14 +120,18 @@ export class Response {

/** The body of the response. The body will be automatically processed when
* the response is being sent and converted to a `Uint8Array` or a
* `Deno.Reader`. */
* `Deno.Reader`.
*
* Automatic conversion to a `Deno.Reader` occurs for async iterables. */
get body(): Body | BodyFunction {
return this.#body;
}

/** The body of the response. The body will be automatically processed when
* the response is being sent and converted to a `Uint8Array` or a
* `Deno.Reader`. */
* `Deno.Reader`.
*
* Automatic conversion to a `Deno.Reader` occurs for async iterables. */
set body(value: Body | BodyFunction) {
if (!this.#writable) {
throw new Error("The response is not writable.");
Expand Down
25 changes: 25 additions & 0 deletions response_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ function decodeBody(body: Uint8Array | Deno.Reader | undefined): string {
return decoder.decode(body as Uint8Array);
}

function isReader(value: unknown): value is Deno.Reader {
return typeof value === "object" && value !== null && "read" in value &&
typeof (value as Record<string, unknown>).read === "function";
}

function createMockRequest({
headers,
accepts = (_contentType: string) => {
Expand Down Expand Up @@ -144,6 +149,26 @@ test({
},
});

test({
name: "response.body as async iterator",
async fn() {
const response = new Response(createMockRequest());
response.body = new ReadableStream<string>({
start(controller) {
controller.enqueue("hello deno");
controller.close();
},
});
const serverResponse = await response.toServerResponse();
assert(isReader(serverResponse.body));
const p = new Uint8Array(1000);
const len = await serverResponse.body.read(p);
assert(decoder.decode(p).startsWith("hello deno"));
assertEquals(len, 10);
assertEquals(serverResponse.status, 200);
},
});

test({
name: "response.body as async function",
async fn() {
Expand Down

0 comments on commit ac190df

Please sign in to comment.