Skip to content

Commit 4293746

Browse files
committed
change the change event emitter to be cleaner
1 parent 4bca87f commit 4293746

File tree

4 files changed

+131
-121
lines changed

4 files changed

+131
-121
lines changed

packages/db/src/collection.ts

Lines changed: 73 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { withArrayChangeTracking, withChangeTracking } from "./proxy"
33
import { Transaction, getActiveTransaction } from "./transactions"
44
import { SortedMap } from "./SortedMap"
55
import type {
6+
ChangeListener,
67
ChangeMessage,
78
CollectionConfig,
89
Fn,
@@ -32,24 +33,6 @@ interface PendingSyncedTransaction<T extends object = Record<string, unknown>> {
3233
operations: Array<OptimisticChangeMessage<T>>
3334
}
3435

35-
// Event system for collections
36-
type CollectionEventType = `insert` | `update` | `delete`
37-
38-
interface CollectionEvent<T, TKey extends string | number> {
39-
type: CollectionEventType
40-
key: TKey
41-
value: T
42-
previousValue?: T
43-
}
44-
45-
type EventListener<T, TKey extends string | number> = (
46-
event: CollectionEvent<T, TKey>
47-
) => void
48-
type KeyListener<T> = (
49-
value: T | undefined,
50-
previousValue: T | undefined
51-
) => void
52-
5336
/**
5437
* Enhanced Collection interface that includes both data type T and utilities TUtils
5538
* @template T - The type of items in the collection
@@ -229,13 +212,8 @@ export class CollectionImpl<
229212
private _size = 0
230213

231214
// Event system
232-
private eventListeners = new Set<EventListener<T, TKey>>()
233-
private keyListeners = new Map<TKey, Set<KeyListener<T>>>()
234-
235-
// Batching for subscribeChanges
236-
private changesBatchListeners = new Set<
237-
(changes: Array<ChangeMessage<T>>) => void
238-
>()
215+
private changeListeners = new Set<ChangeListener<T, TKey>>()
216+
private changeKeyListeners = new Map<TKey, Set<ChangeListener<T, TKey>>>()
239217

240218
// Utilities namespace
241219
// This is populated by createCollection
@@ -389,7 +367,7 @@ export class CollectionImpl<
389367
this._size = this.calculateSize()
390368

391369
// Collect events for changes
392-
const events: Array<CollectionEvent<T, TKey>> = []
370+
const events: Array<ChangeMessage<T, TKey>> = []
393371
this.collectOptimisticChanges(previousState, previousDeletes, events)
394372

395373
// Emit all events at once
@@ -417,7 +395,7 @@ export class CollectionImpl<
417395
private collectOptimisticChanges(
418396
previousUpserts: Map<TKey, T>,
419397
previousDeletes: Set<TKey>,
420-
events: Array<CollectionEvent<T, TKey>>
398+
events: Array<ChangeMessage<T, TKey>>
421399
): void {
422400
const allKeys = new Set([
423401
...previousUpserts.keys(),
@@ -473,80 +451,32 @@ export class CollectionImpl<
473451
/**
474452
* Emit multiple events at once to all listeners
475453
*/
476-
private emitEvents(events: Array<CollectionEvent<T, TKey>>): void {
477-
// Emit to individual event listeners
478-
for (const event of events) {
479-
this.emitEvent(event)
480-
}
481-
482-
// Convert to ChangeMessage format and emit to subscribeChanges listeners
483-
if (events.length > 0) {
484-
const changeMessages: Array<ChangeMessage<T>> = events.map((event) => {
485-
const changeMessage: ChangeMessage<T> = {
486-
type: event.type,
487-
key: event.key,
488-
value: event.value,
489-
}
490-
491-
if (event.previousValue) {
492-
;(changeMessage as any).previousValue = event.previousValue
493-
}
494-
495-
return changeMessage
496-
})
497-
498-
for (const listener of this.changesBatchListeners) {
499-
listener(changeMessages)
500-
}
501-
}
502-
}
503-
504-
/**
505-
* Emit an event to individual listeners (not batched)
506-
*/
507-
private emitEvent(event: CollectionEvent<T, TKey>): void {
508-
// Emit to general listeners
509-
for (const listener of this.eventListeners) {
510-
listener(event)
511-
}
512-
513-
// Emit to key-specific listeners
514-
const keyListeners = this.keyListeners.get(event.key)
515-
if (keyListeners) {
516-
for (const listener of keyListeners) {
517-
listener(
518-
event.type === `delete` ? undefined : event.value,
519-
event.previousValue
520-
)
454+
private emitEvents(changes: Array<ChangeMessage<T, TKey>>): void {
455+
if (changes.length > 0) {
456+
// Emit to general listeners
457+
for (const listener of this.changeListeners) {
458+
listener(changes)
521459
}
522-
}
523-
}
524460

525-
/**
526-
* Subscribe to collection events
527-
*/
528-
public subscribe(listener: EventListener<T, TKey>): () => void {
529-
this.eventListeners.add(listener)
530-
return () => {
531-
this.eventListeners.delete(listener)
532-
}
533-
}
534-
535-
/**
536-
* Subscribe to changes for a specific key
537-
*/
538-
public subscribeKey(key: TKey, listener: KeyListener<T>): () => void {
539-
if (!this.keyListeners.has(key)) {
540-
this.keyListeners.set(key, new Set())
541-
}
542-
this.keyListeners.get(key)!.add(listener)
461+
// Emit to key-specific listeners
462+
if (this.changeKeyListeners.size > 0) {
463+
// Group changes by key, but only for keys that have listeners
464+
const changesByKey = new Map<TKey, Array<ChangeMessage<T, TKey>>>()
465+
for (const change of changes) {
466+
if (this.changeKeyListeners.has(change.key)) {
467+
if (!changesByKey.has(change.key)) {
468+
changesByKey.set(change.key, [])
469+
}
470+
changesByKey.get(change.key)!.push(change)
471+
}
472+
}
543473

544-
return () => {
545-
const listeners = this.keyListeners.get(key)
546-
if (listeners) {
547-
listeners.delete(listener)
548-
if (listeners.size === 0) {
549-
this.keyListeners.delete(key)
474+
// Emit batched changes to each key's listeners
475+
for (const [key, keyChanges] of changesByKey) {
476+
const keyListeners = this.changeKeyListeners.get(key)!
477+
for (const listener of keyListeners) {
478+
listener(keyChanges)
479+
}
550480
}
551481
}
552482
}
@@ -650,7 +580,7 @@ export class CollectionImpl<
650580
)
651581
) {
652582
const changedKeys = new Set<TKey>()
653-
const events: Array<CollectionEvent<T, TKey>> = []
583+
const events: Array<ChangeMessage<T, TKey>> = []
654584

655585
for (const transaction of this.pendingSyncedTransactions) {
656586
for (const operation of transaction.operations) {
@@ -1315,16 +1245,55 @@ export class CollectionImpl<
13151245
* @returns A function that can be called to unsubscribe from the changes
13161246
*/
13171247
public subscribeChanges(
1318-
callback: (changes: Array<ChangeMessage<T>>) => void
1248+
callback: (changes: Array<ChangeMessage<T>>) => void,
1249+
{ includeInitialState = false }: { includeInitialState?: boolean } = {}
13191250
): () => void {
1320-
// First send the current state as changes
1321-
callback(this.currentStateAsChanges())
1251+
if (includeInitialState) {
1252+
// First send the current state as changes
1253+
callback(this.currentStateAsChanges())
1254+
}
13221255

13231256
// Add to batched listeners
1324-
this.changesBatchListeners.add(callback)
1257+
this.changeListeners.add(callback)
1258+
1259+
return () => {
1260+
this.changeListeners.delete(callback)
1261+
}
1262+
}
1263+
1264+
/**
1265+
* Subscribe to changes for a specific key
1266+
*/
1267+
public subscribeChangesKey(
1268+
key: TKey,
1269+
listener: ChangeListener<T, TKey>,
1270+
{ includeInitialState = false }: { includeInitialState?: boolean } = {}
1271+
): () => void {
1272+
if (!this.changeKeyListeners.has(key)) {
1273+
this.changeKeyListeners.set(key, new Set())
1274+
}
1275+
1276+
if (includeInitialState) {
1277+
// First send the current state as changes
1278+
listener([
1279+
{
1280+
type: `insert`,
1281+
key,
1282+
value: this.get(key)!,
1283+
},
1284+
])
1285+
}
1286+
1287+
this.changeKeyListeners.get(key)!.add(listener)
13251288

13261289
return () => {
1327-
this.changesBatchListeners.delete(callback)
1290+
const listeners = this.changeKeyListeners.get(key)
1291+
if (listeners) {
1292+
listeners.delete(listener)
1293+
if (listeners.size === 0) {
1294+
this.changeKeyListeners.delete(key)
1295+
}
1296+
}
13281297
}
13291298
}
13301299

packages/db/src/query/compiled-query.ts

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -174,18 +174,8 @@ export class CompiledQuery<TResults extends object = Record<string, unknown>> {
174174

175175
// Subscribe to changes
176176
Object.entries(this.inputCollections).forEach(([key, collection]) => {
177-
const unsubscribe = collection.subscribe((event) => {
178-
const change: ChangeMessage = {
179-
type: event.type,
180-
key: event.key,
181-
value: event.value,
182-
}
183-
184-
if (event.previousValue) {
185-
;(change as any).previousValue = event.previousValue
186-
}
187-
188-
this.sendChangesToInput(key, [change], collection.config.getKey)
177+
const unsubscribe = collection.subscribeChanges((changes) => {
178+
this.sendChangesToInput(key, changes, collection.config.getKey)
189179
this.incrementVersion()
190180
this.sendFrontierToAllInputs()
191181
this.runGraph()

packages/db/src/types.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,11 @@ export interface SyncConfig<
104104
getSyncMetadata?: () => Record<string, unknown>
105105
}
106106

107-
export interface ChangeMessage<T extends object = Record<string, unknown>> {
108-
key: string | number
107+
export interface ChangeMessage<
108+
T extends object = Record<string, unknown>,
109+
TKey extends string | number = string | number,
110+
> {
111+
key: TKey
109112
value: T
110113
previousValue?: T
111114
type: OperationType
@@ -216,3 +219,8 @@ export type KeyedNamespacedRow = [unknown, NamespacedRow]
216219
* a `select` clause.
217220
*/
218221
export type NamespacedAndKeyedStream = IStreamBuilder<KeyedNamespacedRow>
222+
223+
export type ChangeListener<
224+
T extends object = Record<string, unknown>,
225+
TKey extends string | number = string | number,
226+
> = (changes: Array<ChangeMessage<T, TKey>>) => void

packages/db/tests/collection-subscribe-changes.test.ts

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ describe(`Collection.subscribeChanges`, () => {
4242
// await waitForChanges()
4343

4444
// Subscribe to changes
45-
const unsubscribe = collection.subscribeChanges(callback)
45+
const unsubscribe = collection.subscribeChanges(callback, {
46+
includeInitialState: true,
47+
})
4648

4749
// Verify that callback was called with initial state
4850
expect(callback).toHaveBeenCalledTimes(1)
@@ -62,6 +64,43 @@ describe(`Collection.subscribeChanges`, () => {
6264
unsubscribe()
6365
})
6466

67+
it(`should not emit initial collection state as insert changes by default`, () => {
68+
const callback = vi.fn()
69+
70+
// Create collection with pre-populated data
71+
const collection = createCollection<{ value: string }>({
72+
id: `initial-state-test`,
73+
getKey: (item) => item.value,
74+
sync: {
75+
sync: ({ begin, write, commit }) => {
76+
// Immediately populate with initial data
77+
begin()
78+
write({
79+
type: `insert`,
80+
value: { value: `value1` },
81+
})
82+
write({
83+
type: `insert`,
84+
value: { value: `value2` },
85+
})
86+
commit()
87+
},
88+
},
89+
})
90+
91+
// Wait for initial sync to complete
92+
// await waitForChanges()
93+
94+
// Subscribe to changes
95+
const unsubscribe = collection.subscribeChanges(callback)
96+
97+
// Verify that callback was called with initial state
98+
expect(callback).toHaveBeenCalledTimes(0)
99+
100+
// Clean up
101+
unsubscribe()
102+
})
103+
65104
it(`should emit changes from synced operations using mitt emitter`, () => {
66105
const emitter = mitt()
67106
const callback = vi.fn()
@@ -516,7 +555,9 @@ describe(`Collection.subscribeChanges`, () => {
516555
}
517556

518557
// Subscribe to changes
519-
const unsubscribe = collection.subscribeChanges(callback)
558+
const unsubscribe = collection.subscribeChanges(callback, {
559+
includeInitialState: true,
560+
})
520561

521562
// First call should have initial state (2 items)
522563
expect(callback).toHaveBeenCalledTimes(1)
@@ -606,7 +647,9 @@ describe(`Collection.subscribeChanges`, () => {
606647
const mutationFn = async () => {}
607648

608649
// Subscribe to changes
609-
const unsubscribe = collection.subscribeChanges(callback)
650+
const unsubscribe = collection.subscribeChanges(callback, {
651+
includeInitialState: true,
652+
})
610653

611654
// Initial state emission
612655
expect(callback).toHaveBeenCalledTimes(1)

0 commit comments

Comments
 (0)