Skip to content

feat: refetchMode replace for streamedQuery #9092

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions docs/reference/streamedQuery.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ const query = queryOptions({
- **Required**
- The function that returns a Promise of an AsyncIterable of data to stream in.
- Receives a [QueryFunctionContext](../guides/query-functions.md#queryfunctioncontext)
- `refetchMode?: 'append' | 'reset'`
- optional
- when set to `'reset'`, the query will erase all data and go back into `pending` state when a refetch occurs.
- when set to `'append'`, data will be appended on a refetch.
- defaults to `'reset'`
- `refetchMode?: 'append' | 'reset' | 'replace`
- Optional
- Defines how refetches are handled.
- Defaults to `'reset'`
- When set to `'reset'`, the query will erase all data and go back into `pending` state.
- When set to `'append'`, data will be appended to existing data.
- When set to `'replace'`, data will be written to the cache at the end of the stream.
119 changes: 116 additions & 3 deletions packages/query-core/src/__tests__/streamedQuery.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ describe('streamedQuery', () => {
vi.useRealTimers()
})

function createAsyncNumberGenerator(amount: number) {
function createAsyncNumberGenerator(amount: number, start = 0) {
return {
async *[Symbol.asyncIterator]() {
let num = 0
while (num < amount) {
let num = start
while (num < amount + start) {
await sleep(50)
yield num++
}
Expand Down Expand Up @@ -74,6 +74,61 @@ describe('streamedQuery', () => {
unsubscribe()
})

test('should allow Arrays to be returned from the stream', async () => {
const key = queryKey()
const observer = new QueryObserver(queryClient, {
queryKey: key,
queryFn: streamedQuery({
queryFn: async function* () {
for await (const num of createAsyncNumberGenerator(3)) {
yield [num, num] as const
}
},
}),
})

const unsubscribe = observer.subscribe(vi.fn())

expect(observer.getCurrentResult()).toMatchObject({
status: 'pending',
fetchStatus: 'fetching',
data: undefined,
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [[0, 0]],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [
[0, 0],
[1, 1],
],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'idle',
data: [
[0, 0],
[1, 1],
[2, 2],
],
})

unsubscribe()
})

test('should replace on refetch', async () => {
const key = queryKey()
const observer = new QueryObserver(queryClient, {
Expand Down Expand Up @@ -183,6 +238,64 @@ describe('streamedQuery', () => {
unsubscribe()
})

test('should support refetchMode replace', async () => {
const key = queryKey()
let offset = 0
const observer = new QueryObserver(queryClient, {
queryKey: key,
queryFn: streamedQuery({
queryFn: () => createAsyncNumberGenerator(2, offset),
refetchMode: 'replace',
}),
})

const unsubscribe = observer.subscribe(vi.fn())

expect(observer.getCurrentResult()).toMatchObject({
status: 'pending',
fetchStatus: 'fetching',
data: undefined,
})

await vi.advanceTimersByTimeAsync(100)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'idle',
data: [0, 1],
})

offset = 100

void observer.refetch()

await vi.advanceTimersByTimeAsync(10)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0, 1],
})

await vi.advanceTimersByTimeAsync(40)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0, 1],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'idle',
data: [100, 101],
})

unsubscribe()
})

test('should abort ongoing stream when refetch happens', async () => {
const key = queryKey()
const observer = new QueryObserver(queryClient, {
Expand Down
59 changes: 38 additions & 21 deletions packages/query-core/src/streamedQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,46 +6,63 @@ import type { QueryFunction, QueryFunctionContext, QueryKey } from './types'
* The query will be in a 'pending' state until the first chunk of data is received, but will go to 'success' after that.
* The query will stay in fetchStatus 'fetching' until the stream ends.
* @param queryFn - The function that returns an AsyncIterable to stream data from.
* @param refetchMode - Defaults to 'reset', which replaces data when a refetch happens. Set to 'append' to append new data to the existing data.
* @param refetchMode - Defines how re-fetches are handled.
* Defaults to `'reset'`, erases all data and puts the query back into `pending` state.
* Set to `'append'` to append new data to the existing data.
* Set to `'replace'` to write the data to the cache at the end of the stream.
*/
export function streamedQuery<
TQueryFnData = unknown,
TQueryKey extends QueryKey = QueryKey,
>({
queryFn,
refetchMode,
refetchMode = 'reset',
}: {
queryFn: (
context: QueryFunctionContext<TQueryKey>,
) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>>
refetchMode?: 'append' | 'reset'
refetchMode?: 'append' | 'reset' | 'replace'
}): QueryFunction<Array<TQueryFnData>, TQueryKey> {
return async (context) => {
if (refetchMode !== 'append') {
const query = context.client
.getQueryCache()
.find({ queryKey: context.queryKey, exact: true })
if (query && query.state.data !== undefined) {
query.setState({
status: 'pending',
data: undefined,
error: null,
fetchStatus: 'fetching',
})
}
const query = context.client
.getQueryCache()
.find({ queryKey: context.queryKey, exact: true })
const isRefetch = !!query && query.state.data !== undefined

if (isRefetch && refetchMode === 'reset') {
query.setState({
status: 'pending',
data: undefined,
error: null,
fetchStatus: 'fetching',
})
}

const result: Array<TQueryFnData> = []
const stream = await queryFn(context)

for await (const chunk of stream) {
if (context.signal.aborted) {
break
}
context.client.setQueryData<Array<TQueryFnData>>(
context.queryKey,
(prev = []) => {
return prev.concat(chunk)
},
)

// don't append to the cache directly when replace-refetching
if (!isRefetch || refetchMode !== 'replace') {
context.client.setQueryData<Array<TQueryFnData>>(
context.queryKey,
(prev = []) => {
return prev.concat([chunk])
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- return prev.concat(chunk)
+ return prev.concat([chunk])

this tiny change is hard to see, but it fixes an issue where TQueryFnData is in itself an array, which got flattened implicitly by concat

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a test for this, too: 0de0222

},
)
}
result.push(chunk)
}

// finalize result: replace-refetching needs to write to the cache
if (isRefetch && refetchMode === 'replace' && !context.signal.aborted) {
context.client.setQueryData<Array<TQueryFnData>>(context.queryKey, result)
}

return context.client.getQueryData(context.queryKey)!
}
}