Skip to content

Commit

Permalink
Close StreamSource when <turbo-stream-source> disconnects
Browse files Browse the repository at this point in the history
Closes [#969][]

When a `<turbo-stream-source>` element is removed, close the connection
that is created during connection.

[#969]: #969
  • Loading branch information
seanpdoyle committed Sep 5, 2023
1 parent 0826b81 commit a225075
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ export type Action = "advance" | "replace" | "restore"

export type Position = { x: number; y: number }

export type StreamSource = {
export type StreamSource = (EventSource | WebSocket) & {
addEventListener(
type: "message",
listener: (event: MessageEvent) => void,
Expand Down
2 changes: 2 additions & 0 deletions src/elements/stream_source_element.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export class StreamSourceElement extends HTMLElement {

disconnectedCallback() {
if (this.streamSource) {
this.streamSource.close()

disconnectStreamSource(this.streamSource)
}
}
Expand Down
32 changes: 29 additions & 3 deletions src/tests/functional/stream_tests.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { test } from "@playwright/test"
import { Page, test } from "@playwright/test"
import { assert } from "chai"
import { nextBeat, nextEventNamed, readEventLogs, waitUntilNoSelector, waitUntilText } from "../helpers/page"
import { StreamSourceElement } from "../../elements/stream_source_element"

test.beforeEach(async ({ page }) => {
await page.goto("/src/tests/fixtures/stream.html")
Expand Down Expand Up @@ -104,6 +105,9 @@ test("test receiving a stream message over SSE", async ({ page }) => {
`<turbo-stream-source id="stream-source" src="/__turbo/messages"></turbo-stream-source>`
)
})
await nextBeat()
assert.equal(await getReadyState(page, "stream-source"), await page.evaluate(() => EventSource.OPEN))

const messages = await page.locator("#messages .message")

assert.deepEqual(await messages.allTextContents(), ["First"])
Expand All @@ -113,11 +117,33 @@ test("test receiving a stream message over SSE", async ({ page }) => {
await waitUntilText(page, "Hello world!")
assert.deepEqual(await messages.allTextContents(), ["First", "Hello world!"])

await page.evaluate(() => document.getElementById("stream-source")?.remove())
await nextBeat()
const readyState = await page.evaluate((id) => {
const element = document.getElementById(id) as StreamSourceElement

if (element && element.streamSource) {
element.remove()

return element.streamSource.readyState
} else {
return -1
}
}, "stream-source")
assert.equal(readyState, await page.evaluate(() => EventSource.CLOSED))

await page.click("#async button")
await nextBeat()

assert.deepEqual(await messages.allTextContents(), ["First", "Hello world!"])
})

async function getReadyState(page: Page, id: string): Promise<number> {
return page.evaluate((id) => {
const element = document.getElementById(id) as StreamSourceElement

if (element?.streamSource) {
return element.streamSource.readyState
} else {
return -1
}
}, id)
}

0 comments on commit a225075

Please sign in to comment.