Skip to content

Fix "Invalid state: Controller is already closed"#4770

Open
gaearon wants to merge 2 commits intohonojs:mainfrom
gaearon:streaming-bug-crash
Open

Fix "Invalid state: Controller is already closed"#4770
gaearon wants to merge 2 commits intohonojs:mainfrom
gaearon:streaming-bug-crash

Conversation

@gaearon
Copy link

@gaearon gaearon commented Feb 25, 2026

Fixes #4769
Fixes honojs/node-server#233

The fix is authored by Claude. I steered it to make failing tests first.

All three tests are failing before the fix and passing after the fix.

@yusukebe
Copy link
Member

@gaearon Hey Dan, Thank you for the PR.

The issue #4769 you faced, and honojs/node-server#233 are different problems, and we should think about them separately (right? @usualoma ). But I think this fix is resolvable and acceptable. @usualoma What do you think of this?

@gaearon
Copy link
Author

gaearon commented Feb 25, 2026

Ah maybe that's a different message and I got them confused

@usualoma
Copy link
Member

Hi @gaearon
Thank you for creating the pull request!
I agree with @yusukebe's comment—it seems unrelated to the node-server issue, but I still think the fix in this PR is valid.

Would you please consider the following changes?

Continue processing until the end

Currently, there is an inconsistency in whether “processing is interrupted” or “continues until completion” at the following two points.

To unify this with the latter approach, I would like to implement it as follows.

diff --git i/src/jsx/streaming.ts w/src/jsx/streaming.ts
index 0c0c5b2d..e21d40c7 100644
--- i/src/jsx/streaming.ts
+++ w/src/jsx/streaming.ts
@@ -158,10 +158,9 @@ export const renderToReadableStream = (
           true,
           context
         )
-        if (cancelled) {
-          return
+        if (!cancelled) {
+          controller.enqueue(textEncoder.encode(resolved))
         }
-        controller.enqueue(textEncoder.encode(resolved))
 
         let resolvedCount = 0
         const callbacks: Promise<void>[] = []

Simplify Testing

To minimize the use of try blocks within test code and delays via setTimeout(), I want to change it as follows.

diff --git i/src/jsx/streaming.test.tsx w/src/jsx/streaming.test.tsx
index 3c7f53b7..75c70cf8 100644
--- i/src/jsx/streaming.test.tsx
+++ w/src/jsx/streaming.test.tsx
@@ -887,55 +887,38 @@ d.replaceWith(c.content)
     const onRejection = (e: unknown) => unhandled.push(e)
     process.on('unhandledRejection', onRejection)
 
-    try {
-      const SubContent = () => {
-        const content = new Promise<HtmlEscapedString>((resolve) =>
-          setTimeout(() => resolve(<h2>World</h2>), 50)
-        )
-        return content
-      }
+    const SubContent = async () => <h2>World</h2>
+    const Content = async () => (
+      <>
+        <h1>Hello</h1>
+        <Suspense fallback={<p>Loading sub...</p>}>
+          <SubContent />
+        </Suspense>
+      </>
+    )
 
-      const Content = () => {
-        const content = new Promise<HtmlEscapedString>((resolve) =>
-          setTimeout(
-            () =>
-              resolve(
-                <>
-                  <h1>Hello</h1>
-                  <Suspense fallback={<p>Loading sub...</p>}>
-                    <SubContent />
-                  </Suspense>
-                </>
-              ),
-            20
-          )
-        )
-        return content
-      }
+    const onError = vi.fn()
+    const stream = renderToReadableStream(
+      <Suspense fallback={<p>Loading...</p>}>
+        <Content />
+      </Suspense>,
+      onError
+    )
 
-      const onError = vi.fn()
-      const stream = renderToReadableStream(
-        <Suspense fallback={<p>Loading...</p>}>
-          <Content />
-        </Suspense>,
-        onError
-      )
+    const reader = stream.getReader()
+    const firstChunk = await reader.read()
+    expect(firstChunk.done).toBe(false)
 
-      const reader = stream.getReader()
-      const firstChunk = await reader.read()
-      expect(firstChunk.done).toBe(false)
+    // Simulate client disconnect
+    await reader.cancel()
 
-      // Simulate client disconnect
-      await reader.cancel()
+    // Wait for nested Suspense callbacks to fire against the closed controller
+    await new Promise((resolve) => setTimeout(resolve))
 
-      // Wait for nested Suspense callbacks to fire against the closed controller
-      await new Promise((resolve) => setTimeout(resolve, 200))
+    expect(unhandled).toHaveLength(0)
+    expect(onError).not.toHaveBeenCalled()
 
-      expect(unhandled).toHaveLength(0)
-    } finally {
-      process.off('unhandledRejection', onRejection)
-      suspenseCounter++
-    }
+    process.off('unhandledRejection', onRejection)
   })
 
   it('should not call onError when reader is cancelled during a slow callback resolution', async () => {
@@ -943,51 +926,42 @@ d.replaceWith(c.content)
     const onRejection = (e: unknown) => unhandled.push(e)
     process.on('unhandledRejection', onRejection)
 
-    try {
-      let signalCallbackStarted!: () => void
-      const callbackStarted = new Promise<void>((r) => {
-        signalCallbackStarted = r
-      })
+    let signalCallbackStarted!: () => void
+    const callbackStarted = new Promise<void>((r) => {
+      signalCallbackStarted = r
+    })
 
-      const Content = () => {
-        return new Promise<HtmlEscapedString>((resolve) => {
-          setTimeout(() => {
-            const html = raw('<p>content</p>', [
-              ((opts: any) => {
-                if (opts.phase === HtmlEscapedCallbackPhase.BeforeStream) {
-                  signalCallbackStarted()
-                  return new Promise<string>((r) => setTimeout(() => r(''), 50))
-                }
-                return undefined
-              }) as any,
-            ])
-            resolve(html as unknown as HtmlEscapedString)
-          }, 10)
-        })
-      }
+    const Content = async () =>
+      raw('<p>content</p>', [
+        ((opts: any) => {
+          if (opts.phase === HtmlEscapedCallbackPhase.BeforeStream) {
+            signalCallbackStarted()
+            return new Promise<string>((r) => setTimeout(() => r('')))
+          }
+          return undefined
+        }) as any,
+      ])
 
-      const onError = vi.fn()
-      const stream = renderToReadableStream(
-        <Suspense fallback={<p>Loading...</p>}>
-          <Content />
-        </Suspense>,
-        onError
-      )
+    const onError = vi.fn()
+    const stream = renderToReadableStream(
+      <Suspense fallback={<p>Loading...</p>}>
+        <Content />
+      </Suspense>,
+      onError
+    )
 
-      const reader = stream.getReader()
-      await reader.read()
+    const reader = stream.getReader()
+    await reader.read()
 
-      await callbackStarted
-      await reader.cancel()
+    await callbackStarted
+    await reader.cancel()
 
-      await new Promise((resolve) => setTimeout(resolve, 200))
+    await new Promise((resolve) => setTimeout(resolve))
 
-      expect(onError).not.toHaveBeenCalled()
-      expect(unhandled).toHaveLength(0)
-    } finally {
-      process.off('unhandledRejection', onRejection)
-      suspenseCounter++
-    }
+    expect(unhandled).toHaveLength(0)
+    expect(onError).not.toHaveBeenCalled()
+
+    process.off('unhandledRejection', onRejection)
   })
 
   it('should not throw when cancelled before initial content resolves', async () => {
@@ -995,24 +969,20 @@ d.replaceWith(c.content)
     const onRejection = (e: unknown) => unhandled.push(e)
     process.on('unhandledRejection', onRejection)
 
-    try {
-      const onError = vi.fn()
-      const stream = renderToReadableStream(
-        new Promise<HtmlEscapedString>((resolve) =>
-          setTimeout(() => resolve(raw('<p>slow content</p>') as HtmlEscapedString), 50)
-        ),
-        onError
-      )
+    const onError = vi.fn()
+    const stream = renderToReadableStream(
+      Promise.resolve(raw('<p>slow content</p>') as HtmlEscapedString),
+      onError
+    )
 
-      const reader = stream.getReader()
-      await reader.cancel()
+    const reader = stream.getReader()
+    await reader.cancel()
 
-      await new Promise((resolve) => setTimeout(resolve, 200))
+    await new Promise((resolve) => setTimeout(resolve))
 
-      expect(onError).not.toHaveBeenCalled()
-      expect(unhandled).toHaveLength(0)
-    } finally {
-      process.off('unhandledRejection', onRejection)
-    }
+    expect(unhandled).toHaveLength(0)
+    expect(onError).not.toHaveBeenCalled()
+
+    process.off('unhandledRejection', onRejection)
   })
 })

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ERR_INVALID_STATE when canceling during nested Suspense Invalid state: ReadableStream is already closed

3 participants