Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 44 additions & 41 deletions src/adapters/_node/send.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,53 +70,56 @@ export function streamBody(
stream: ReadableStream,
nodeRes: NodeServerResponse,
): Promise<void> | void {
// stream is already destroyed
if (nodeRes.destroyed) {
stream.cancel();
return;
}
try {
// stream is already destroyed
if (nodeRes.destroyed) {
stream.cancel();
return;
}

const reader = stream.getReader();
const reader = stream.getReader();

// Cancel the stream and destroy the response
function streamCancel(error?: Error) {
reader.cancel(error).catch(() => {});
if (error) {
nodeRes.destroy(error);
// Cancel the stream and destroy the response
function streamCancel(error?: Error) {
reader.cancel(error).catch(() => {});
if (error) {
nodeRes.destroy(error);
}
}
}

function streamHandle({
done,
value,
}: ReadableStreamReadResult<Uint8Array>): void | Promise<void> {
try {
if (done) {
// End the response
nodeRes.end();
} else if ((nodeRes as NodeHttp.ServerResponse).write(value)) {
// Continue reading recursively
reader.read().then(streamHandle, streamCancel);
} else {
// Wait for the drain event to continue reading
nodeRes.once("drain", () =>
reader.read().then(streamHandle, streamCancel),
);
function streamHandle({
done,
value,
}: ReadableStreamReadResult<Uint8Array>): void | Promise<void> {
try {
if (done) {
// End the response
nodeRes.end();
} else if ((nodeRes as NodeHttp.ServerResponse).write(value)) {
// Continue reading recursively
reader.read().then(streamHandle, streamCancel);
} else {
// Wait for the drain event to continue reading
nodeRes.once("drain", () =>
reader.read().then(streamHandle, streamCancel),
);
}
} catch (error) {
streamCancel(error instanceof Error ? error : undefined);
}
} catch (error) {
streamCancel(error instanceof Error ? error : undefined);
}
}

// Listen for close and error events to cancel the stream
nodeRes.on("close", streamCancel);
nodeRes.on("error", streamCancel);
reader.read().then(streamHandle, streamCancel);
// Listen for close and error events to cancel the stream
nodeRes.on("close", streamCancel);
nodeRes.on("error", streamCancel);
reader.read().then(streamHandle, streamCancel);

// Return a promise that resolves when the stream is closed
return reader.closed.catch(streamCancel).finally(() => {
// cleanup listeners
nodeRes.off("close", streamCancel);
nodeRes.off("error", streamCancel);
});
// Return a promise that resolves when the stream is closed
return reader.closed.catch(streamCancel).finally(() => {
// cleanup listeners
nodeRes.off("close", streamCancel);
nodeRes.off("error", streamCancel);
});
// eslint-disable-next-line no-empty
} catch {}
}
Loading