Skip to content

Commit

Permalink
feat(ts-client): Start stream after subscribe (#2188)
Browse files Browse the repository at this point in the history
Addresses part of #1997

Had to change some tests around - obviously this is not a full move
towards the desired behaviour, e.g. we might even want for the stream to
stop when it has _no_ subscribers, but I think this provides some
beneficial properties to the stream (e.g. if subscribing to it after
async gap no messages lost)

If we think this just complicates the client and that if we want to
address the issue we should do it in one comprehensive go, I'm happy to
close this as well
  • Loading branch information
msfstef authored Dec 19, 2024
1 parent 88c77cd commit 1c28aee
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 15 deletions.
5 changes: 5 additions & 0 deletions .changeset/mean-humans-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@electric-sql/client": patch
---

Start streaming only after at least one subscriber is present.
8 changes: 6 additions & 2 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
]
>()

#started = false
#lastOffset: Offset
#liveCacheBuster: string // Seconds since our Electric Epoch 😎
#lastSyncedAt?: number // unix time
Expand Down Expand Up @@ -290,8 +291,6 @@ export class ShapeStream<T extends Row<unknown> = Row>
this.#fetchClient = createFetchWithResponseHeadersCheck(
createFetchWithChunkBuffer(fetchWithBackoffClient)
)

this.#start()
}

get shapeHandle() {
Expand All @@ -311,6 +310,9 @@ export class ShapeStream<T extends Row<unknown> = Row>
}

async #start() {
if (this.#started) throw new Error(`Cannot start stream twice`)
this.#started = true

try {
while (
(!this.options.signal?.aborted && !this.#isUpToDate) ||
Expand Down Expand Up @@ -462,6 +464,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
}

// Restart
this.#started = false
this.#start()
}
return
Expand All @@ -481,6 +484,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
const subscriptionId = Math.random()

this.#subscribers.set(subscriptionId, [callback, onError])
if (!this.#started) this.#start()

return () => {
this.#subscribers.delete(subscriptionId)
Expand Down
39 changes: 28 additions & 11 deletions packages/typescript-client/test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ describe(`Shape`, () => {
signal: aborter.signal,
})

await sleep(100) // give some time for the initial fetch to complete
// give some time for the initial fetch to complete
await waitForFetch(shapeStream)
expect(shapeStream.isConnected()).true

const shape = new Shape(shapeStream)
Expand Down Expand Up @@ -328,6 +329,8 @@ describe(`Shape`, () => {
},
})

const unsubscribe = shapeStream.subscribe(() => unsubscribe())

await sleep(100) // give some time for the initial fetch to complete
expect(shapeStream.isConnected()).true

Expand All @@ -347,7 +350,7 @@ describe(`Shape`, () => {
issuesTableUrl,
}) => {
const mockErrorHandler = vi.fn()
new ShapeStream({
const shapeStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
params: {
table: issuesTableUrl,
Expand All @@ -360,7 +363,7 @@ describe(`Shape`, () => {
onError: mockErrorHandler,
})

await sleep(10) // give some time for the initial fetch to complete
await waitForFetch(shapeStream)
expect(mockErrorHandler.mock.calls.length).toBe(1)
expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError)
})
Expand All @@ -383,7 +386,7 @@ describe(`Shape`, () => {
}
})

new ShapeStream({
const shapeStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
params: {
table: issuesTableUrl,
Expand All @@ -402,7 +405,7 @@ describe(`Shape`, () => {
onError: mockErrorHandler,
})

await sleep(50) // give some time for the fetches to complete
await waitForFetch(shapeStream)
expect(mockErrorHandler.mock.calls.length).toBe(1)
expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError)
})
Expand All @@ -425,7 +428,7 @@ describe(`Shape`, () => {
}
})

new ShapeStream({
const shapeStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
params: {
table: issuesTableUrl,
Expand All @@ -446,7 +449,7 @@ describe(`Shape`, () => {
onError: mockErrorHandler,
})

await sleep(50) // give some time for the fetches to complete
await waitForFetch(shapeStream)
expect(mockErrorHandler.mock.calls.length).toBe(1)
expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError)
})
Expand Down Expand Up @@ -489,7 +492,7 @@ describe(`Shape`, () => {
onError: mockErrorHandler,
})

await sleep(50) // give some time for the first fetch to complete
await waitForFetch(shapeStream)
expect(mockErrorHandler.mock.calls.length).toBe(1)
expect(mockErrorHandler.mock.calls[0][0]).toBeInstanceOf(FetchError)
expect(shapeStream.isConnected()).toBe(false)
Expand Down Expand Up @@ -521,7 +524,9 @@ describe(`Shape`, () => {
},
})

await sleep(10) // give some time for the initial fetch to complete
const unsub = shapeStream.subscribe(() => unsub())
await sleep(10) // give sometime for fetch to fail

expect(shapeStream.isConnected()).false

const expectedErrorMessage = new MissingHeadersError(url, [
Expand Down Expand Up @@ -549,6 +554,7 @@ describe(`Shape`, () => {
},
})

const unsubLive = shapeStreamLive.subscribe(() => unsubLive())
await sleep(10) // give some time for the initial fetch to complete
expect(shapeStreamLive.isConnected()).false

Expand All @@ -573,7 +579,8 @@ describe(`Shape`, () => {
subscribe: false,
})

await sleep(100) // give some time for the fetch to complete
await waitForFetch(shapeStream)
await sleep(50)

// We should no longer be connected because
// the initial fetch finished and we've not subscribed to changes
Expand All @@ -594,7 +601,7 @@ describe(`Shape`, () => {

expect(shapeStream.isLoading()).true

await sleep(200) // give some time for the initial fetch to complete
await waitForFetch(shapeStream)

expect(shapeStream.isLoading()).false
})
Expand Down Expand Up @@ -665,3 +672,13 @@ describe(`Shape`, () => {
}
})
})

function waitForFetch(stream: ShapeStream): Promise<void> {
let unsub = () => {}
return new Promise<void>((resolve) => {
unsub = stream.subscribe(
() => resolve(),
() => resolve()
)
}).finally(() => unsub())
}
44 changes: 42 additions & 2 deletions packages/typescript-client/test/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe(`ShapeStream`, () => {
}

const aborter = new AbortController()
new ShapeStream({
const stream = new ShapeStream({
url: shapeUrl,
params: {
table: `foo`,
Expand All @@ -35,6 +35,7 @@ describe(`ShapeStream`, () => {
'X-Custom-Header': `my-value`,
},
})
const unsub = stream.subscribe(() => unsub())

await new Promise((resolve) =>
eventTarget.addEventListener(`fetch`, resolve, { once: true })
Expand All @@ -60,7 +61,7 @@ describe(`ShapeStream`, () => {
}

const aborter = new AbortController()
new ShapeStream({
const stream = new ShapeStream({
url: shapeUrl,
params: {
table: `foo`,
Expand All @@ -72,6 +73,8 @@ describe(`ShapeStream`, () => {
fetchClient: fetchWrapper,
})

const unsub = stream.subscribe(() => unsub())

await new Promise((resolve) =>
eventTarget.addEventListener(`fetch`, resolve, { once: true })
)
Expand All @@ -80,4 +83,41 @@ describe(`ShapeStream`, () => {
`columns=id&handle=potato&offset=-1&table=foo&where=a%3D1`
)
})

it(`should start requesting only after first subscription`, async () => {
const eventTarget = new EventTarget()
const fetchWrapper = (): Promise<Response> => {
eventTarget.dispatchEvent(new Event(`fetch`))
return Promise.resolve(Response.error())
}

const aborter = new AbortController()
const stream = new ShapeStream({
url: shapeUrl,
params: {
table: `foo`,
where: `a=1`,
columns: [`id`],
},
handle: `potato`,
signal: aborter.signal,
fetchClient: fetchWrapper,
})

// should not fire any fetch requests
await new Promise<void>((resolve, reject) => {
eventTarget.addEventListener(`fetch`, reject, { once: true })
setTimeout(() => resolve(), 100)
})

// should fire fetch immediately after subbing
const startedStreaming = new Promise<void>((resolve, reject) => {
eventTarget.addEventListener(`fetch`, () => resolve(), {
once: true,
})
setTimeout(() => reject(`timed out`), 100)
})
const unsub = stream.subscribe(() => unsub())
await startedStreaming
})
})

0 comments on commit 1c28aee

Please sign in to comment.