Skip to content

Commit

Permalink
feat: add experimental new live events API (#797)
Browse files Browse the repository at this point in the history
* feat: CLDX-2062

* revert: fix in `listen`, will open dedicated PR
  • Loading branch information
stipsan authored May 7, 2024
1 parent 53552df commit de0cec7
Show file tree
Hide file tree
Showing 10 changed files with 430 additions and 4 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@sanity/client",
"version": "6.17.3",
"version": "6.18.0-canary.0",
"description": "Client for retrieving, creating and patching data from Sanity.io",
"keywords": [
"sanity",
Expand Down
6 changes: 6 additions & 0 deletions src/SanityClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {AssetsClient, ObservableAssetsClient} from './assets/AssetsClient'
import {defaultConfig, initConfig} from './config'
import * as dataMethods from './data/dataMethods'
import {_listen} from './data/listen'
import {LiveClient} from './data/live'
import {ObservablePatch, Patch} from './data/patch'
import {ObservableTransaction, Transaction} from './data/transaction'
import {DatasetsClient, ObservableDatasetsClient} from './datasets/DatasetsClient'
Expand Down Expand Up @@ -43,6 +44,7 @@ export type {
_listen,
AssetsClient,
DatasetsClient,
LiveClient,
ObservableAssetsClient,
ObservableDatasetsClient,
ObservableProjectsClient,
Expand All @@ -55,6 +57,7 @@ export type {
export class ObservableSanityClient {
assets: ObservableAssetsClient
datasets: ObservableDatasetsClient
live: LiveClient
projects: ObservableProjectsClient
users: ObservableUsersClient

Expand All @@ -76,6 +79,7 @@ export class ObservableSanityClient {

this.assets = new ObservableAssetsClient(this, this.#httpRequest)
this.datasets = new ObservableDatasetsClient(this, this.#httpRequest)
this.live = new LiveClient(this)
this.projects = new ObservableProjectsClient(this, this.#httpRequest)
this.users = new ObservableUsersClient(this, this.#httpRequest)
}
Expand Down Expand Up @@ -695,6 +699,7 @@ export class ObservableSanityClient {
export class SanityClient {
assets: AssetsClient
datasets: DatasetsClient
live: LiveClient
projects: ProjectsClient
users: UsersClient

Expand All @@ -721,6 +726,7 @@ export class SanityClient {

this.assets = new AssetsClient(this, this.#httpRequest)
this.datasets = new DatasetsClient(this, this.#httpRequest)
this.live = new LiveClient(this)
this.projects = new ProjectsClient(this, this.#httpRequest)
this.users = new UsersClient(this, this.#httpRequest)

Expand Down
7 changes: 6 additions & 1 deletion src/data/dataMethods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ export function _dataRequest(
const useGet = !isMutation && strQuery.length < getQuerySizeLimit
const stringQuery = useGet ? strQuery : ''
const returnFirst = options.returnFirst
const {timeout, token, tag, headers, returnQuery} = options
const {timeout, token, tag, headers, returnQuery, lastLiveEventId} = options

const uri = _getDataUrl(client, endpoint, stringQuery)

Expand All @@ -274,6 +274,7 @@ export function _dataRequest(
returnQuery,
perspective: options.perspective,
resultSourceMap: options.resultSourceMap,
lastLiveEventId: Array.isArray(lastLiveEventId) ? lastLiveEventId[0] : lastLiveEventId,
canUseCdn: isQuery,
signal: options.signal,
fetch: options.fetch,
Expand Down Expand Up @@ -375,6 +376,10 @@ export function _requestObservable<R>(
}
}

if (options.lastLiveEventId) {
options.query = {...options.query, lastLiveEventId: options.lastLiveEventId}
}

if (options.returnQuery === false) {
options.query = {returnQuery: 'false', ...options.query}
}
Expand Down
126 changes: 126 additions & 0 deletions src/data/live.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import {Observable} from 'rxjs'

import type {ObservableSanityClient, SanityClient} from '../SanityClient'
import type {Any, LiveEventMessage, LiveEventRestart} from '../types'
import {_getDataUrl} from './dataMethods'

/**
* @alpha this API is experimental and may change or even be removed
*/
export class LiveClient {
#client: SanityClient | ObservableSanityClient
constructor(client: SanityClient | ObservableSanityClient) {
this.#client = client
}

events(): Observable<LiveEventMessage | LiveEventRestart> {
const path = _getDataUrl(this.#client, 'live/events')
const url = new URL(this.#client.getUrl(path, false))

const listenFor = ['restart', 'message'] as const

return new Observable((observer) => {
let es: InstanceType<typeof EventSource> | undefined
let reconnectTimer: NodeJS.Timeout
let stopped = false
// Unsubscribe differs from stopped in that we will never reopen.
// Once it is`true`, it will never be `false` again.
let unsubscribed = false

open()

// EventSource will emit a regular event if it fails to connect, however the API will emit an `error` MessageEvent if the server goes down
// So we need to handle both cases
function onError(evt: MessageEvent | Event) {
if (stopped) {
return
}

// If the event has a `data` property, then it`s a MessageEvent emitted by the API and we should forward the error and close the connection
if ('data' in evt) {
const event = parseEvent(evt)
observer.error(new Error(event.message, {cause: event}))
}

// Unless we've explicitly stopped the ES (in which case `stopped` should be true),
// we should never be in a disconnected state. By default, EventSource will reconnect
// automatically, in which case it sets readyState to `CONNECTING`, but in some cases
// (like when a laptop lid is closed), it closes the connection. In these cases we need
// to explicitly reconnect.
if (es!.readyState === es!.CLOSED) {
unsubscribe()
clearTimeout(reconnectTimer)
reconnectTimer = setTimeout(open, 100)
}
}

function onMessage(evt: Any) {
const event = parseEvent(evt)
return event instanceof Error ? observer.error(event) : observer.next(event)
}

function unsubscribe() {
if (!es) return
es.removeEventListener('error', onError)
for (const type of listenFor) {
es.removeEventListener(type, onMessage)
}
es.close()
}

async function getEventSource() {
const EventSourceImplementation: typeof EventSource =
typeof EventSource === 'undefined'
? ((await import('@sanity/eventsource')).default as typeof EventSource)
: EventSource

// If the listener has been unsubscribed from before we managed to load the module,
// do not set up the EventSource.
if (unsubscribed) {
return
}

const evs = new EventSourceImplementation(url.toString())
evs.addEventListener('error', onError)
for (const type of listenFor) {
evs.addEventListener(type, onMessage)
}
return evs
}

function open() {
getEventSource()
.then((eventSource) => {
if (eventSource) {
es = eventSource
// Handle race condition where the observer is unsubscribed before the EventSource is set up
if (unsubscribed) {
unsubscribe()
}
}
})
.catch((reason) => {
observer.error(reason)
stop()
})
}

function stop() {
stopped = true
unsubscribe()
unsubscribed = true
}

return stop
})
}
}

function parseEvent(event: MessageEvent) {
try {
const data = (event.data && JSON.parse(event.data)) || {}
return {type: event.type, id: event.lastEventId, ...data}
} catch (err) {
return err
}
}
21 changes: 21 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ export interface RequestObservableOptions extends Omit<RequestOptions, 'url'> {
returnQuery?: boolean
resultSourceMap?: boolean | 'withKeyArraySelector'
perspective?: ClientPerspective
/** @alpha this API is experimental and may change or even be removed */
lastLiveEventId?: string
}

/** @public */
Expand Down Expand Up @@ -479,6 +481,8 @@ export interface QueryParams {
token?: never
/** @deprecated you're using a fetch option as a GROQ parameter, this is likely a mistake */
useCdn?: never
/** @deprecated you're using a fetch option as a GROQ parameter, this is likely a mistake */
lastLiveEventId?: never
/* eslint-enable @typescript-eslint/no-explicit-any */
}

Expand Down Expand Up @@ -743,6 +747,8 @@ export interface ResponseQueryOptions extends RequestOptions {
// The `cache` and `next` options are specific to the Next.js App Router integration
cache?: 'next' extends keyof RequestInit ? RequestInit['cache'] : never
next?: ('next' extends keyof RequestInit ? RequestInit : never)['next']
/** @alpha this API is experimental and may change or even be removed */
lastLiveEventId?: string | string[] | null
}

/** @public */
Expand Down Expand Up @@ -785,6 +791,8 @@ export interface RawQueryResponse<R> {
ms: number
result: R
resultSourceMap?: ContentSourceMap
/** @alpha this API is experimental and may change or even be removed */
syncTags?: SyncTag[]
}

/** @public */
Expand Down Expand Up @@ -999,6 +1007,19 @@ export interface ContentSourceMap {
paths: ContentSourceMapPaths
}

/** @alpha this API is experimental and may change or even be removed */
export type SyncTag = `s1:${string}`
/** @alpha this API is experimental and may change or even be removed */
export interface LiveEventRestart {
type: 'restart'
}
/** @alpha this API is experimental and may change or even be removed */
export interface LiveEventMessage {
type: 'message'
id: string
tags: SyncTag[]
}

export type {
ContentSourceMapParsedPath,
ContentSourceMapParsedPathKeyedSegment,
Expand Down
84 changes: 84 additions & 0 deletions test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,90 @@ describe('client', async () => {
expect(res[0].rating, 'data should match').toBe(5)
})

test.skipIf(isEdge)('can query for documents with last live event ID', async () => {
nock(projectHost())
.get(
`/vX/data/query/foo?query=*&returnQuery=false&lastLiveEventId=MTA0MDM1Nnx2a2lQY200bnRHQQ`,
)
.reply(200, {
ms: 123,
result,
})

const res = await getClient({apiVersion: 'X'}).fetch(
'*',
{},
{lastLiveEventId: 'MTA0MDM1Nnx2a2lQY200bnRHQQ'},
)
expect(res.length, 'length should match').toBe(1)
expect(res[0].rating, 'data should match').toBe(5)
})

test.skipIf(isEdge)(
'allows passing last live event ID from Next.js style searchParams',
async () => {
nock(projectHost())
.get(
`/vX/data/query/foo?query=*&returnQuery=false&lastLiveEventId=MTA0MDM1Nnx2a2lQY200bnRHQQ`,
)
.reply(200, {
ms: 123,
result,
})

const res = await getClient({apiVersion: 'X'}).fetch(
'*',
{},
// searchParams in Next.js will return an arry of strings in some cases,
// as an convenience we allow it, and behave the same way as URLSearchParams.get() when that happens:
// we pick the first value in the array
{lastLiveEventId: ['MTA0MDM1Nnx2a2lQY200bnRHQQ', 'some-other-value']},
)
expect(res.length, 'length should match').toBe(1)
expect(res[0].rating, 'data should match').toBe(5)
},
)

test.skipIf(isEdge)(
'allows passing last live event ID from URLSearchParams that might be null',
async () => {
nock(projectHost()).get(`/vX/data/query/foo?query=*&returnQuery=false`).reply(200, {
ms: 123,
result,
})
const searchParams = new URLSearchParams('')

const res = await getClient({apiVersion: 'X'}).fetch(
'*',
{},
// URLSearchParams.get() will return null if the key is not found, we should handle that
{lastLiveEventId: searchParams.get('lastLiveEventId')},
)
expect(res.length, 'length should match').toBe(1)
expect(res[0].rating, 'data should match').toBe(5)
},
)

test.skipIf(isEdge)(
'allows passing last live event ID from URLSearchParams that might be an empty string',
async () => {
nock(projectHost()).get(`/vX/data/query/foo?query=*&returnQuery=false`).reply(200, {
ms: 123,
result,
})
const searchParams = new URLSearchParams('lastLiveEventId=')

const res = await getClient({apiVersion: 'X'}).fetch(
'*',
{},
// URLSearchParams.get() will return null if the key is not found, we should handle that
{lastLiveEventId: searchParams.get('lastLiveEventId')},
)
expect(res.length, 'length should match').toBe(1)
expect(res[0].rating, 'data should match').toBe(5)
},
)

test.skipIf(isEdge)(
'can query for documents with resultSourceMap and perspective',
async () => {
Expand Down
1 change: 1 addition & 0 deletions test/helpers/sseServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export const createSseServer = (onRequest: OnRequest): Promise<http.Server> =>
let channel
if (
request?.url?.indexOf('/v1/data/listen/') === 0 ||
request?.url?.indexOf('/vX/data/live/events/') === 0 ||
request?.url?.indexOf('/listen/beerns?query=') === 0
) {
channel = new SseChannel({jsonEncode: true})
Expand Down
Loading

0 comments on commit de0cec7

Please sign in to comment.