diff --git a/package-lock.json b/package-lock.json index 040fef5e..27e215d3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@sanity/client", - "version": "6.17.3", + "version": "6.18.0-canary.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@sanity/client", - "version": "6.17.3", + "version": "6.18.0-canary.0", "license": "MIT", "dependencies": { "@sanity/eventsource": "^5.0.2", diff --git a/package.json b/package.json index aa0144cc..19539aa0 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@sanity/client", - "version": "6.17.3", + "version": "6.18.0-canary.0", "description": "Client for retrieving, creating and patching data from Sanity.io", "keywords": [ "sanity", diff --git a/src/SanityClient.ts b/src/SanityClient.ts index d6eaf0e7..6d94d445 100644 --- a/src/SanityClient.ts +++ b/src/SanityClient.ts @@ -4,6 +4,7 @@ import {AssetsClient, ObservableAssetsClient} from './assets/AssetsClient' import {defaultConfig, initConfig} from './config' import * as dataMethods from './data/dataMethods' import {_listen} from './data/listen' +import {LiveClient} from './data/live' import {ObservablePatch, Patch} from './data/patch' import {ObservableTransaction, Transaction} from './data/transaction' import {DatasetsClient, ObservableDatasetsClient} from './datasets/DatasetsClient' @@ -43,6 +44,7 @@ export type { _listen, AssetsClient, DatasetsClient, + LiveClient, ObservableAssetsClient, ObservableDatasetsClient, ObservableProjectsClient, @@ -55,6 +57,7 @@ export type { export class ObservableSanityClient { assets: ObservableAssetsClient datasets: ObservableDatasetsClient + live: LiveClient projects: ObservableProjectsClient users: ObservableUsersClient @@ -76,6 +79,7 @@ export class ObservableSanityClient { this.assets = new ObservableAssetsClient(this, this.#httpRequest) this.datasets = new ObservableDatasetsClient(this, this.#httpRequest) + this.live = new LiveClient(this) this.projects = new ObservableProjectsClient(this, this.#httpRequest) this.users = new ObservableUsersClient(this, this.#httpRequest) } @@ -695,6 +699,7 @@ export class ObservableSanityClient { export class SanityClient { assets: AssetsClient datasets: DatasetsClient + live: LiveClient projects: ProjectsClient users: UsersClient @@ -721,6 +726,7 @@ export class SanityClient { this.assets = new AssetsClient(this, this.#httpRequest) this.datasets = new DatasetsClient(this, this.#httpRequest) + this.live = new LiveClient(this) this.projects = new ProjectsClient(this, this.#httpRequest) this.users = new UsersClient(this, this.#httpRequest) diff --git a/src/data/dataMethods.ts b/src/data/dataMethods.ts index 8b3ccfcf..21b29c13 100644 --- a/src/data/dataMethods.ts +++ b/src/data/dataMethods.ts @@ -257,7 +257,7 @@ export function _dataRequest( const useGet = !isMutation && strQuery.length < getQuerySizeLimit const stringQuery = useGet ? strQuery : '' const returnFirst = options.returnFirst - const {timeout, token, tag, headers, returnQuery} = options + const {timeout, token, tag, headers, returnQuery, lastLiveEventId} = options const uri = _getDataUrl(client, endpoint, stringQuery) @@ -274,6 +274,7 @@ export function _dataRequest( returnQuery, perspective: options.perspective, resultSourceMap: options.resultSourceMap, + lastLiveEventId: Array.isArray(lastLiveEventId) ? lastLiveEventId[0] : lastLiveEventId, canUseCdn: isQuery, signal: options.signal, fetch: options.fetch, @@ -375,6 +376,10 @@ export function _requestObservable( } } + if (options.lastLiveEventId) { + options.query = {...options.query, lastLiveEventId: options.lastLiveEventId} + } + if (options.returnQuery === false) { options.query = {returnQuery: 'false', ...options.query} } diff --git a/src/data/live.ts b/src/data/live.ts new file mode 100644 index 00000000..bbbd21d5 --- /dev/null +++ b/src/data/live.ts @@ -0,0 +1,126 @@ +import {Observable} from 'rxjs' + +import type {ObservableSanityClient, SanityClient} from '../SanityClient' +import type {Any, LiveEventMessage, LiveEventRestart} from '../types' +import {_getDataUrl} from './dataMethods' + +/** + * @alpha this API is experimental and may change or even be removed + */ +export class LiveClient { + #client: SanityClient | ObservableSanityClient + constructor(client: SanityClient | ObservableSanityClient) { + this.#client = client + } + + events(): Observable { + const path = _getDataUrl(this.#client, 'live/events') + const url = new URL(this.#client.getUrl(path, false)) + + const listenFor = ['restart', 'message'] as const + + return new Observable((observer) => { + let es: InstanceType | undefined + let reconnectTimer: NodeJS.Timeout + let stopped = false + // Unsubscribe differs from stopped in that we will never reopen. + // Once it is`true`, it will never be `false` again. + let unsubscribed = false + + open() + + // EventSource will emit a regular event if it fails to connect, however the API will emit an `error` MessageEvent if the server goes down + // So we need to handle both cases + function onError(evt: MessageEvent | Event) { + if (stopped) { + return + } + + // If the event has a `data` property, then it`s a MessageEvent emitted by the API and we should forward the error and close the connection + if ('data' in evt) { + const event = parseEvent(evt) + observer.error(new Error(event.message, {cause: event})) + } + + // Unless we've explicitly stopped the ES (in which case `stopped` should be true), + // we should never be in a disconnected state. By default, EventSource will reconnect + // automatically, in which case it sets readyState to `CONNECTING`, but in some cases + // (like when a laptop lid is closed), it closes the connection. In these cases we need + // to explicitly reconnect. + if (es!.readyState === es!.CLOSED) { + unsubscribe() + clearTimeout(reconnectTimer) + reconnectTimer = setTimeout(open, 100) + } + } + + function onMessage(evt: Any) { + const event = parseEvent(evt) + return event instanceof Error ? observer.error(event) : observer.next(event) + } + + function unsubscribe() { + if (!es) return + es.removeEventListener('error', onError) + for (const type of listenFor) { + es.removeEventListener(type, onMessage) + } + es.close() + } + + async function getEventSource() { + const EventSourceImplementation: typeof EventSource = + typeof EventSource === 'undefined' + ? ((await import('@sanity/eventsource')).default as typeof EventSource) + : EventSource + + // If the listener has been unsubscribed from before we managed to load the module, + // do not set up the EventSource. + if (unsubscribed) { + return + } + + const evs = new EventSourceImplementation(url.toString()) + evs.addEventListener('error', onError) + for (const type of listenFor) { + evs.addEventListener(type, onMessage) + } + return evs + } + + function open() { + getEventSource() + .then((eventSource) => { + if (eventSource) { + es = eventSource + // Handle race condition where the observer is unsubscribed before the EventSource is set up + if (unsubscribed) { + unsubscribe() + } + } + }) + .catch((reason) => { + observer.error(reason) + stop() + }) + } + + function stop() { + stopped = true + unsubscribe() + unsubscribed = true + } + + return stop + }) + } +} + +function parseEvent(event: MessageEvent) { + try { + const data = (event.data && JSON.parse(event.data)) || {} + return {type: event.type, id: event.lastEventId, ...data} + } catch (err) { + return err + } +} diff --git a/src/types.ts b/src/types.ts index 08ad2f55..a6479f17 100644 --- a/src/types.ts +++ b/src/types.ts @@ -306,6 +306,8 @@ export interface RequestObservableOptions extends Omit { returnQuery?: boolean resultSourceMap?: boolean | 'withKeyArraySelector' perspective?: ClientPerspective + /** @alpha this API is experimental and may change or even be removed */ + lastLiveEventId?: string } /** @public */ @@ -479,6 +481,8 @@ export interface QueryParams { token?: never /** @deprecated you're using a fetch option as a GROQ parameter, this is likely a mistake */ useCdn?: never + /** @deprecated you're using a fetch option as a GROQ parameter, this is likely a mistake */ + lastLiveEventId?: never /* eslint-enable @typescript-eslint/no-explicit-any */ } @@ -743,6 +747,8 @@ export interface ResponseQueryOptions extends RequestOptions { // The `cache` and `next` options are specific to the Next.js App Router integration cache?: 'next' extends keyof RequestInit ? RequestInit['cache'] : never next?: ('next' extends keyof RequestInit ? RequestInit : never)['next'] + /** @alpha this API is experimental and may change or even be removed */ + lastLiveEventId?: string | string[] | null } /** @public */ @@ -785,6 +791,8 @@ export interface RawQueryResponse { ms: number result: R resultSourceMap?: ContentSourceMap + /** @alpha this API is experimental and may change or even be removed */ + syncTags?: SyncTag[] } /** @public */ @@ -999,6 +1007,19 @@ export interface ContentSourceMap { paths: ContentSourceMapPaths } +/** @alpha this API is experimental and may change or even be removed */ +export type SyncTag = `s1:${string}` +/** @alpha this API is experimental and may change or even be removed */ +export interface LiveEventRestart { + type: 'restart' +} +/** @alpha this API is experimental and may change or even be removed */ +export interface LiveEventMessage { + type: 'message' + id: string + tags: SyncTag[] +} + export type { ContentSourceMapParsedPath, ContentSourceMapParsedPathKeyedSegment, diff --git a/test/client.test.ts b/test/client.test.ts index 9731487d..01844331 100644 --- a/test/client.test.ts +++ b/test/client.test.ts @@ -569,6 +569,90 @@ describe('client', async () => { expect(res[0].rating, 'data should match').toBe(5) }) + test.skipIf(isEdge)('can query for documents with last live event ID', async () => { + nock(projectHost()) + .get( + `/vX/data/query/foo?query=*&returnQuery=false&lastLiveEventId=MTA0MDM1Nnx2a2lQY200bnRHQQ`, + ) + .reply(200, { + ms: 123, + result, + }) + + const res = await getClient({apiVersion: 'X'}).fetch( + '*', + {}, + {lastLiveEventId: 'MTA0MDM1Nnx2a2lQY200bnRHQQ'}, + ) + expect(res.length, 'length should match').toBe(1) + expect(res[0].rating, 'data should match').toBe(5) + }) + + test.skipIf(isEdge)( + 'allows passing last live event ID from Next.js style searchParams', + async () => { + nock(projectHost()) + .get( + `/vX/data/query/foo?query=*&returnQuery=false&lastLiveEventId=MTA0MDM1Nnx2a2lQY200bnRHQQ`, + ) + .reply(200, { + ms: 123, + result, + }) + + const res = await getClient({apiVersion: 'X'}).fetch( + '*', + {}, + // searchParams in Next.js will return an arry of strings in some cases, + // as an convenience we allow it, and behave the same way as URLSearchParams.get() when that happens: + // we pick the first value in the array + {lastLiveEventId: ['MTA0MDM1Nnx2a2lQY200bnRHQQ', 'some-other-value']}, + ) + expect(res.length, 'length should match').toBe(1) + expect(res[0].rating, 'data should match').toBe(5) + }, + ) + + test.skipIf(isEdge)( + 'allows passing last live event ID from URLSearchParams that might be null', + async () => { + nock(projectHost()).get(`/vX/data/query/foo?query=*&returnQuery=false`).reply(200, { + ms: 123, + result, + }) + const searchParams = new URLSearchParams('') + + const res = await getClient({apiVersion: 'X'}).fetch( + '*', + {}, + // URLSearchParams.get() will return null if the key is not found, we should handle that + {lastLiveEventId: searchParams.get('lastLiveEventId')}, + ) + expect(res.length, 'length should match').toBe(1) + expect(res[0].rating, 'data should match').toBe(5) + }, + ) + + test.skipIf(isEdge)( + 'allows passing last live event ID from URLSearchParams that might be an empty string', + async () => { + nock(projectHost()).get(`/vX/data/query/foo?query=*&returnQuery=false`).reply(200, { + ms: 123, + result, + }) + const searchParams = new URLSearchParams('lastLiveEventId=') + + const res = await getClient({apiVersion: 'X'}).fetch( + '*', + {}, + // URLSearchParams.get() will return null if the key is not found, we should handle that + {lastLiveEventId: searchParams.get('lastLiveEventId')}, + ) + expect(res.length, 'length should match').toBe(1) + expect(res[0].rating, 'data should match').toBe(5) + }, + ) + test.skipIf(isEdge)( 'can query for documents with resultSourceMap and perspective', async () => { diff --git a/test/helpers/sseServer.ts b/test/helpers/sseServer.ts index 7c62c784..44d07a85 100644 --- a/test/helpers/sseServer.ts +++ b/test/helpers/sseServer.ts @@ -14,6 +14,7 @@ export const createSseServer = (onRequest: OnRequest): Promise => let channel if ( request?.url?.indexOf('/v1/data/listen/') === 0 || + request?.url?.indexOf('/vX/data/live/events/') === 0 || request?.url?.indexOf('/listen/beerns?query=') === 0 ) { channel = new SseChannel({jsonEncode: true}) diff --git a/test/live.test-d.ts b/test/live.test-d.ts new file mode 100644 index 00000000..6b69b659 --- /dev/null +++ b/test/live.test-d.ts @@ -0,0 +1,37 @@ +import {createClient, type SyncTag} from '@sanity/client' +import {describe, expectTypeOf, test} from 'vitest' + +describe('client.live.events', () => { + const client = createClient({}) + test('lastLiveEventId & syncTags', async () => { + const {searchParams} = new URL(location.href) + const {result: initial, syncTags} = await client.fetch( + `count(*[_type == $type])`, + {type: 'post'}, + {filterResponse: false, lastLiveEventId: searchParams.get('lastLiveEventId')}, + ) + expectTypeOf(initial).toMatchTypeOf() + expectTypeOf(syncTags!).toMatchTypeOf() + + client.live.events().subscribe((event) => { + if ( + event.type === 'message' && + Array.isArray(syncTags) && + event.tags.some((tag) => syncTags.includes(tag)) + ) { + searchParams.set('lastLiveEventId', event.id) + } + }) + }) + test('event types', async () => { + const subscription = client.live.events().subscribe((event) => { + if (event.type === 'restart') { + expectTypeOf(event).toMatchTypeOf<{type: 'restart'}>() + } + if (event.type === 'message') { + expectTypeOf(event).toMatchTypeOf<{type: 'message'; id: string; tags: SyncTag[]}>() + } + }) + expectTypeOf(subscription.unsubscribe).toMatchTypeOf<() => void>() + }) +}) diff --git a/test/live.test.ts b/test/live.test.ts new file mode 100644 index 00000000..2b2b6748 --- /dev/null +++ b/test/live.test.ts @@ -0,0 +1,146 @@ +import type {AddressInfo} from 'node:net' + +import {type ClientConfig, createClient} from '@sanity/client' +import {describe, expect, test, vitest} from 'vitest' + +import {createSseServer, type OnRequest} from './helpers/sseServer' + +const getClient = (options: ClientConfig & {port: number}) => + createClient({ + dataset: 'prod', + apiHost: `http://127.0.0.1:${options.port}`, + useProjectHostname: false, + useCdn: false, + apiVersion: 'X', + ...options, + }) + +const testSse = async (onRequest: OnRequest, options: ClientConfig = {}) => { + const server = await createSseServer(onRequest) + const client = getClient({port: (server!.address() as AddressInfo).port, ...options}) + return {server, client} +} + +describe.skipIf(typeof EdgeRuntime === 'string' || typeof document !== 'undefined')( + '.live.events()', + () => { + test('can listen for tags', async () => { + expect.assertions(2) + + const eventData = { + tags: ['tag1', 'tag2'], + } + + const {server, client} = await testSse(({request, channel}) => { + expect(request.url, 'url should be correct').toEqual(`/vX/data/live/events/prod`) + + channel!.send({id: '123', data: eventData}) + process.nextTick(() => channel!.close()) + }) + + try { + await new Promise((resolve, reject) => { + const subscription = client.live.events().subscribe({ + next: (msg) => { + expect(msg, 'event data should be correct').toEqual({ + ...eventData, + id: '123', + type: 'message', + }) + + subscription.unsubscribe() + resolve() + }, + error: (err) => { + subscription.unsubscribe() + reject(err) + }, + }) + }) + } finally { + server.close() + } + }) + + test('supports restart events', async () => { + expect.assertions(1) + + const {server, client} = await testSse(({channel}) => { + channel!.send({event: 'welcome'}) + channel!.send({event: 'restart'}) + process.nextTick(() => channel!.close()) + }) + + try { + await new Promise((resolve, reject) => { + const subscription = client.live.events().subscribe({ + next: (msg) => { + expect(msg.type, 'emits restart events to tell the client to reset local state').toBe( + 'restart', + ) + + subscription.unsubscribe() + resolve() + }, + error: (err) => { + subscription.unsubscribe() + reject(err) + }, + }) + }) + } finally { + server.close() + } + }) + + test('emits errors', async () => { + expect.assertions(1) + + const {server, client} = await testSse(({channel}) => { + channel!.send({event: 'error', data: {status: 500, message: 'Unfortunate error'}}) + channel!.close() + process.nextTick(() => channel!.close()) + }) + try { + await new Promise((resolve) => { + const subscription = client.live.events().subscribe({ + error: (err) => { + expect(err.message, 'should have passed error message').toBe('Unfortunate error') + + subscription.unsubscribe() + resolve() + }, + }) + }) + } finally { + server.close() + } + }) + + test('can immediately unsubscribe, does not connect to server', async () => { + const onMessage = vitest.fn() + const onError = vitest.fn() + const onRequest = vitest.fn(({channel}) => { + channel!.send({id: '123', data: {tags: ['tag1', 'tag2']}}) + process.nextTick(() => channel!.close()) + }) + + const {server, client} = await testSse(onRequest) + + client.live + .events() + .subscribe({ + next: onMessage, + error: onError, + }) + .unsubscribe() + + await new Promise((resolve) => setTimeout(resolve, 100)) + + expect(onMessage).not.toHaveBeenCalled() + expect(onError).not.toHaveBeenCalled() + expect(onRequest).not.toHaveBeenCalled() + server.close() + }) + }, +)