Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/silent-ears-agree.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@electric-sql/client": patch
---

Buffer SSE messages until up-to-date message to avoid duplicate operations from being published on the shape stream.
43 changes: 21 additions & 22 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -398,22 +398,12 @@ export class ShapeStream<T extends Row<unknown> = Row>
backOffOpts
)

this.#fetchClient = createFetchWithConsumedMessages(
createFetchWithResponseHeadersCheck(
createFetchWithChunkBuffer(fetchWithBackoffClient)
)
)

const sseFetchWithBackoffClient = createFetchWithBackoff(
baseFetchClient,
backOffOpts,
true
)

this.#sseFetchClient = createFetchWithResponseHeadersCheck(
createFetchWithChunkBuffer(sseFetchWithBackoffClient)
createFetchWithChunkBuffer(fetchWithBackoffClient)
)

this.#fetchClient = createFetchWithConsumedMessages(this.#sseFetchClient)

this.#subscribeToVisibilityChanges()
}

Expand Down Expand Up @@ -498,7 +488,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
fetchUrl,
requestAbortController,
headers: requestHeaders,
resumingFromPause: true,
resumingFromPause,
})
} catch (e) {
// Handle abort error triggered by refresh
Expand Down Expand Up @@ -671,9 +661,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
}
}

async #onMessages(messages: string, schema: Schema, isSseMessage = false) {
const batch = this.#messageParser.parse(messages, schema)

async #onMessages(batch: Array<Message<T>>, isSseMessage = false) {
// Update isUpToDate
if (batch.length > 0) {
const lastMessage = batch[batch.length - 1]
Expand Down Expand Up @@ -738,8 +726,9 @@ export class ShapeStream<T extends Row<unknown> = Row>
const schema = this.#schema! // we know that it is not undefined because it is set by `this.#onInitialResponse`
const res = await response.text()
const messages = res || `[]`
const batch = this.#messageParser.parse<Array<Message<T>>>(messages, schema)

await this.#onMessages(messages, schema)
await this.#onMessages(batch)
}

async #requestShapeSSE(opts: {
Expand All @@ -750,6 +739,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
const { fetchUrl, requestAbortController, headers } = opts
const fetch = this.#sseFetchClient
try {
let buffer: Array<Message<T>> = []
await fetchEventSource(fetchUrl.toString(), {
headers,
fetch,
Expand All @@ -759,11 +749,20 @@ export class ShapeStream<T extends Row<unknown> = Row>
},
onmessage: (event: EventSourceMessage) => {
if (event.data) {
// Process the SSE message
// The event.data is a single JSON object, so we wrap it in an array
const messages = `[${event.data}]`
// event.data is a single JSON object
const schema = this.#schema! // we know that it is not undefined because it is set in onopen when we call this.#onInitialResponse
this.#onMessages(messages, schema, true)
const message = this.#messageParser.parse<Message<T>>(
event.data,
schema
)
buffer.push(message)

if (isUpToDateMessage(message)) {
// Flush the buffer on up-to-date message.
// Ensures that we only process complete batches of operations.
this.#onMessages(buffer, true)
buffer = []
}
}
},
onerror: (error: Error) => {
Expand Down
9 changes: 1 addition & 8 deletions packages/typescript-client/src/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ export const BackoffDefaults = {

export function createFetchWithBackoff(
fetchClient: typeof fetch,
backoffOptions: BackoffOptions = BackoffDefaults,
sseMode: boolean = false
backoffOptions: BackoffOptions = BackoffDefaults
): typeof fetch {
const {
initialDelay,
Expand Down Expand Up @@ -67,12 +66,6 @@ export function createFetchWithBackoff(
if (result.ok) return result

const err = await FetchError.fromResponse(result, url.toString())
if (err.status === 409 && sseMode) {
// The json body is [ { headers: { control: 'must-refetch' } } ] in normal mode
// and is { headers: { control: 'must-refetch' } } in SSE mode
// So in SSE mode we need to wrap it in an array
err.json = [err.json]
}

throw err
} catch (e) {
Expand Down
7 changes: 4 additions & 3 deletions packages/typescript-client/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@
* If we are not in SSE mode this function will return undefined.
*/
export function getOffset(message: ControlMessage): Offset | undefined {
const lsn = Number(message.headers.global_last_seen_lsn)
if (lsn && !isNaN(lsn)) {
return `${lsn}_0`
const lsn = message.headers.global_last_seen_lsn
if (!lsn) {
return

Check warning on line 63 in packages/typescript-client/src/helpers.ts

View check run for this annotation

Codecov / codecov/patch

packages/typescript-client/src/helpers.ts#L63

Added line #L63 was not covered by tests
}
return `${lsn}_0` as Offset
}
6 changes: 3 additions & 3 deletions packages/typescript-client/src/parser.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ColumnInfo, GetExtensions, Message, Row, Schema, Value } from './types'
import { ColumnInfo, GetExtensions, Row, Schema, Value } from './types'
import { ParserNullValueError } from './error'

type NullToken = null | `NULL`
Expand Down Expand Up @@ -98,7 +98,7 @@ export class MessageParser<T extends Row<unknown>> {
this.parser = { ...defaultParser, ...parser }
}

parse(messages: string, schema: Schema): Message<T>[] {
parse<Result>(messages: string, schema: Schema): Result {
return JSON.parse(messages, (key, value) => {
// typeof value === `object` && value !== null
// is needed because there could be a column named `value`
Expand All @@ -117,7 +117,7 @@ export class MessageParser<T extends Row<unknown>> {
})
}
return value
}) as Message<T>[]
}) as Result
}

// Parses the message values using the provided parser based on the schema information
Expand Down
2 changes: 1 addition & 1 deletion packages/typescript-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export type Row<Extensions = never> = Record<string, Value<Extensions>>
export type GetExtensions<T extends Row<unknown>> =
T extends Row<infer Extensions> ? Extensions : never

export type Offset = `-1` | `${number}_${number}`
export type Offset = `-1` | `${number}_${number}` | `${bigint}_${number}`

interface Header {
[key: Exclude<string, `operation` | `control`>]: Value
Expand Down
202 changes: 201 additions & 1 deletion packages/typescript-client/test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import { describe, expect, inject, vi } from 'vitest'
import { v4 as uuidv4 } from 'uuid'
import { setTimeout as sleep } from 'node:timers/promises'
import { testWithIssuesTable as it } from './support/test-context'
import { ShapeStream, Shape, FetchError } from '../src'
import {
ShapeStream,
Shape,
FetchError,
isChangeMessage,
isControlMessage,
} from '../src'
import { Message, Row, ChangeMessage } from '../src/types'
import { MissingHeadersError } from '../src/error'
import { resolveValue } from '../src'
Expand Down Expand Up @@ -1099,6 +1105,170 @@ describe.for(fetchAndSse)(
}
)

describe(`Shape - SSE`, () => {
it(`should handle SSE messages in batches`, async ({
issuesTableUrl,
insertIssues,
aborter,
}) => {
// Create some initial data
const [id1] = await insertIssues({ title: `initial title` })

// Track if we've already thrown an error to ensure we only throw once
let hasThrownError = false

let resolveRefresh: () => void = () => {}
const refreshPromise = new Promise<void>((resolve) => {
resolveRefresh = resolve
})

// Custom fetch client that intercepts SSE messages
const customFetchClient = async (
input: string | URL | Request,
init?: RequestInit
) => {
const url = input.toString()

// Only intercept SSE requests (those with experimental_live_sse=true)
if (url.includes(`experimental_live_sse=true`)) {
// Create a custom response that intercepts the SSE stream
const response = await fetch(input, init)

// Create a custom readable stream that intercepts messages
const originalBody = response.body
if (!originalBody) {
throw new Error(`No response body`)
}

const filteredStream = response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(
createSSEFilterStream((event) => {
const data = event.slice(6) // remove 'data: ' prefix

let message
try {
message = JSON.parse(data)
} catch (parseError) {
// Ignore JSON parse errors for non-JSON lines
}

// Check if this is the first up-to-date message
if (
message.headers?.control === `up-to-date` &&
!hasThrownError
) {
hasThrownError = true

// Force a refresh to interrupt the stream
shapeStream.forceDisconnectAndRefresh().then(resolveRefresh)

// Filter it out
return false
}

return true
})
)
.pipeThrough(new TextEncoderStream())

// Return a new response with our custom stream
return new Response(filteredStream, {
status: response.status,
statusText: response.statusText,
headers: response.headers,
})
}

// For non-SSE requests, just forward to the real fetch
return fetch(input, init)
}

const shapeStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
params: { table: issuesTableUrl },
signal: aborter.signal,
experimentalLiveSse: true,
fetchClient: customFetchClient,
})

// Track received messages to ensure no duplicates
const receivedRows: Array<Row> = []
const messageIds = new Set<string>()

let resolveInitialSync: () => void = () => {}
const initialSyncComplete = new Promise<void>((resolve) => {
resolveInitialSync = resolve
})

// Subscribe to the shape stream
const unsubscribe = shapeStream.subscribe((messages) => {
for (const message of messages) {
if (isChangeMessage(message)) {
// Check for duplicates
const rowId = message.key
if (messageIds.has(rowId)) {
throw new Error(`Duplicate message received for id: ${rowId}`)
}
messageIds.add(rowId)
receivedRows.push(message.value)
}

if (
isControlMessage(message) &&
message.headers.control === `up-to-date`
) {
resolveInitialSync()
}
}
})

// Wait for initial sync
await initialSyncComplete

// Insert another issue to trigger an update
const [id2] = await insertIssues({ title: `second title` })

// Wait for the update to be processed
await vi.waitFor(
() => {
expect(receivedRows.length).toBe(2)
},
{ timeout: 5000 }
)

// Verify we received both messages without duplicates
expect(receivedRows).toEqual([
{
id: id1,
title: `initial title`,
priority: 10,
},
{
id: id2,
title: `second title`,
priority: 10,
},
])

// Check that we interrupted the stream
expect(hasThrownError).toBe(true)

// Await the refresh to complete
await refreshPromise

// Verify that there are no duplicates after the refresh
expect(receivedRows.length).toBe(2)
expect(messageIds.size).toBe(2)

// Verify the stream is connected and up to date
expect(shapeStream.isConnected()).toBe(true)
expect(shapeStream.isUpToDate).toBe(true)

unsubscribe()
})
})

function waitForFetch(stream: ShapeStream): Promise<void> {
let unsub = () => {}
return new Promise<void>((resolve) => {
Expand All @@ -1108,3 +1278,33 @@ function waitForFetch(stream: ShapeStream): Promise<void> {
)
}).finally(() => unsub())
}

// Simple SSE parser that buffers lines until an event is complete
// And filters out events that don't pass the filter function
function createSSEFilterStream(filterFn: (event: string) => boolean) {
let buffer = ``
return new TransformStream({
transform(chunk, controller) {
buffer += chunk
const lines = buffer.split(`\n`)
buffer = lines.pop() || `` // Keep the last incomplete line
let currentEvent = ``
for (const line of lines) {
currentEvent += line + `\n`
if (line.trim() === ``) {
// End of event
if (filterFn(currentEvent)) {
controller.enqueue(currentEvent)
}
currentEvent = ``
}
}
},
flush(controller) {
// Emit any remaining buffered event
if (buffer && filterFn(buffer)) {
controller.enqueue(buffer)
}
},
})
}