Skip to content

feat(common, node): best effort to emit BatchedUpdateNotification so clients know the rowid that changed, not just which table #646

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
c740eb1
Refactor onAbortPromise to use addEventListener
douglascayers Jun 27, 2025
599a603
Add changeset entry
douglascayers Jun 27, 2025
a0b8b61
Update .changeset/moody-hats-sleep.md
douglascayers Jun 29, 2025
3808477
Remove test that doesn't test the SDK. The logic is explained in the …
douglascayers Jun 29, 2025
01e3403
Avoid memory leaks by removing event listener after race.
douglascayers Jun 29, 2025
fcd53c9
Update changeset entry
douglascayers Jun 29, 2025
7a6407d
Avoid memory leak by ensuring abort handler is removed
douglascayers Jun 29, 2025
d08cc08
Consolidate logic by calling abortHandler() if signal is already aborted
douglascayers Jun 29, 2025
f9561d2
Remove unused import of 'abort'
douglascayers Jun 29, 2025
b1fe7a3
Add functions to convert between batched update notifications and ind…
douglascayers Jun 29, 2025
f20f6c7
Add DisposeManager class for centralized management of disposers
douglascayers Jun 29, 2025
699ae01
Ensure queued updates aren't skipped if occur quicker than Controlled…
douglascayers Jun 29, 2025
fa64ce5
Emit the data for the BatchedUpdateNotification event that's availabl…
douglascayers Jun 29, 2025
ce67260
Add changeset entry
douglascayers Jun 29, 2025
6bb8457
Merge remote-tracking branch 'origin/main' into node-collect-committe…
douglascayers Jul 2, 2025
6a68d52
Merge branch 'common-async-onabortpromise' into node-collect-committe…
douglascayers Jul 2, 2025
29f6253
Refactor to not need setInterval when flushing updates
douglascayers Jul 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/giant-camels-speak.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/common': minor
'@powersync/node': minor
'@powersync/web': minor
---

PowerSyncDatabase.onChange does a best effort to provide the table name, DML operation, and rowid in the change event. Previously, only table names were emitted.
5 changes: 5 additions & 0 deletions .changeset/moody-hats-sleep.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

Use addEventListener instead of overwriting the onabort property, preventing interference with outside users also setting the property on the same signal. Remove event listener when race settles to avoid memory leak.
144 changes: 91 additions & 53 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ import {
QueryResult,
Transaction,
UpdateNotification,
convertToBatchedUpdateNotification,
convertToUpdateNotifications,
isBatchedUpdateNotification
} from '../db/DBAdapter.js';
import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js';
import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js';
import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js';
import { Schema } from '../db/schema/Schema.js';
import { BaseObserver } from '../utils/BaseObserver.js';
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
import { throttleTrailing } from '../utils/async.js';
import { DisposeManager } from '../utils/DisposeManager.js';
import { mutexRunExclusive } from '../utils/mutex.js';
import { ConnectionManager } from './ConnectionManager.js';
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
Expand All @@ -34,6 +35,7 @@ import {
type PowerSyncConnectionOptions,
type RequiredAdditionalConnectionOptions
} from './sync/stream/AbstractStreamingSyncImplementation.js';
import { sleep } from '../utils/async.js';

export interface DisconnectAndClearOptions {
/** When set to false, data in local-only tables is preserved. */
Expand Down Expand Up @@ -86,7 +88,8 @@ export interface SQLWatchOptions {
}

export interface WatchOnChangeEvent {
changedTables: string[];
changedTables: string[]; // kept for backwards compatibility
update: BatchedUpdateNotification;
}

export interface WatchHandler {
Expand Down Expand Up @@ -1038,7 +1041,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* @returns A dispose function to stop watching for changes
*/
onChangeWithCallback(handler?: WatchOnChangeHandler, options?: SQLWatchOptions): () => void {
const { onChange, onError = (e: Error) => this.options.logger?.error(e) } = handler ?? {};
const { onChange, onError = (error: Error) => this.options.logger?.error(error) } = handler ?? {};
if (!onChange) {
throw new Error('onChange is required');
}
Expand All @@ -1047,40 +1050,102 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
const watchedTables = new Set<string>(
(resolvedOptions?.tables ?? []).flatMap((table) => [table, `ps_data__${table}`, `ps_data_local__${table}`])
);

const changedTables = new Set<string>();
const updatedTables = new Array<UpdateNotification>();
const throttleMs = resolvedOptions.throttleMs ?? DEFAULT_WATCH_THROTTLE_MS;

const executor = new ControlledExecutor(async (e: WatchOnChangeEvent) => {
await onChange(e);
});
const disposeManager = new DisposeManager();

const flushTableUpdates = throttleTrailing(
() =>
this.handleTableChanges(changedTables, watchedTables, (intersection) => {
if (resolvedOptions?.signal?.aborted) return;
executor.schedule({ changedTables: intersection });
}),
throttleMs
);
const dispose = () => disposeManager.dispose();

if (resolvedOptions.signal?.aborted || this.closed) {
return dispose;
}

const dispose = this.database.registerListener({
tablesUpdated: async (update) => {
let isFlushing = false;
let lastFlushTime = 0;

// Don't flush more often than the throttle interval.
const throttleFlush = async () => {
const timeSinceLastFlush = Date.now() - lastFlushTime;
if (timeSinceLastFlush < throttleMs) {
await sleep(throttleMs - timeSinceLastFlush);
}
lastFlushTime = Date.now();
};

// Periodically flush the accumulated updates from the db listener.
const triggerFlush = async () => {
// Skip if we're already flushing.
// Will retry when more updates arrive.
if (isFlushing) {
return;
}
try {
isFlushing = true;
// Keep flushing until no more changes are pending
while (updatedTables.length > 0) {
await throttleFlush();
await executeFlush();
}
} catch (error) {
onError?.(error);
} finally {
// Allow future flush attempts.
isFlushing = false;
}
};

const executeFlush = async () => {
// Get snapshot of the updated tables to avoid race conditions
// between async operations here and the listener that adds updates.
const updatesToFlush = [...updatedTables];
// Reset the queue to begin collecting new updates by the listener.
updatedTables.length = 0;
// Skip if we're already disposed.
if (disposeManager.isDisposed()) {
return;
}
// Dispose then skip if we're closed.
if (this.closed) {
disposeManager.dispose();
return;
}
// Broadcast the updates.
const update = convertToBatchedUpdateNotification(updatesToFlush);
if (update.tables.length > 0) {
await onChange({ changedTables: update.tables, update });
}
};

const disposeListener = this.database.registerListener({
tablesUpdated: (update) => {
try {
this.processTableUpdates(update, changedTables);
flushTableUpdates();
if (isBatchedUpdateNotification(update)) {
const rawUpdates = convertToUpdateNotifications(update);
for (const rawUpdate of rawUpdates) {
if (watchedTables.has(rawUpdate.table)) {
updatedTables.push(rawUpdate);
}
}
} else {
if (watchedTables.has(update.table)) {
updatedTables.push(update);
}
}
triggerFlush();
} catch (error) {
onError?.(error);
}
}
});

resolvedOptions.signal?.addEventListener('abort', () => {
executor.dispose();
dispose();
});
disposeManager.add(() => disposeListener());

if (resolvedOptions.signal) {
disposeManager.disposeOnAbort(resolvedOptions.signal);
}

return () => dispose();
return dispose;
}

/**
Expand Down Expand Up @@ -1119,33 +1184,6 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
});
}

private handleTableChanges(
changedTables: Set<string>,
watchedTables: Set<string>,
onDetectedChanges: (changedTables: string[]) => void
): void {
if (changedTables.size > 0) {
const intersection = Array.from(changedTables.values()).filter((change) => watchedTables.has(change));
if (intersection.length) {
onDetectedChanges(intersection);
}
}
changedTables.clear();
}

private processTableUpdates(
updateNotification: BatchedUpdateNotification | UpdateNotification,
changedTables: Set<string>
): void {
const tables = isBatchedUpdateNotification(updateNotification)
? updateNotification.tables
: [updateNotification.table];

for (const table of tables) {
changedTables.add(table);
}
}

/**
* @ignore
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { FULL_SYNC_PRIORITY, InternalProgressInformation } from '../../../db/cru
import * as sync_status from '../../../db/crud/SyncStatus.js';
import { AbortOperation } from '../../../utils/AbortOperation.js';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
import { onAbortPromise, throttleLeadingTrailing } from '../../../utils/async.js';
import { resolveEarlyOnAbort, throttleLeadingTrailing } from '../../../utils/async.js';
import {
BucketChecksum,
BucketDescription,
Expand Down Expand Up @@ -1062,7 +1062,7 @@ The next upload iteration will be delayed.`);
});
}

private async applyCheckpoint(checkpoint: Checkpoint, abort: AbortSignal) {
private async applyCheckpoint(checkpoint: Checkpoint, signal: AbortSignal) {
let result = await this.options.adapter.syncLocalDatabase(checkpoint);
const pending = this.pendingCrudUpload;

Expand All @@ -1079,9 +1079,9 @@ The next upload iteration will be delayed.`);
this.logger.debug(
'Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying.'
);
await Promise.race([pending, onAbortPromise(abort)]);
await resolveEarlyOnAbort(pending, signal);

if (abort.aborted) {
if (signal.aborted) {
return { applied: false, endIteration: true };
}

Expand Down
36 changes: 36 additions & 0 deletions packages/common/src/db/DBAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@ export enum RowUpdateType {
SQLITE_DELETE = 9,
SQLITE_UPDATE = 23
}

export interface TableUpdateOperation {
opType: RowUpdateType;
rowId: number;
}

/**
* Notification of an update to one or more tables, for the purpose of realtime change notifications.
*/
Expand Down Expand Up @@ -129,6 +131,40 @@ export function isBatchedUpdateNotification(
return 'tables' in update;
}

export function convertToBatchedUpdateNotification(updates: UpdateNotification[]): BatchedUpdateNotification {
const groupedUpdates: BatchedUpdateNotification['groupedUpdates'] = {};

for (const update of updates) {
groupedUpdates[update.table] ??= [];
groupedUpdates[update.table].push(update);
}

return {
tables: Object.keys(groupedUpdates),
rawUpdates: updates,
groupedUpdates
};
}

export function convertToUpdateNotifications(update: BatchedUpdateNotification): UpdateNotification[] {
// Not all implementations emit a complete batched update.
// Some only emit the table names, or not even that.
if (update.rawUpdates?.length) {
return update.rawUpdates;
}
if (Object.keys(update.groupedUpdates ?? {}).length) {
return Object.entries(update.groupedUpdates).flatMap(([table, updates]) =>
updates.map((update) => ({ ...update, table }))
);
}
if (update.tables?.length) {
return update.tables.map((table) => {
return { table } as unknown as UpdateNotification;
});
}
return [];
}

export function extractTableUpdates(update: BatchedUpdateNotification | UpdateNotification) {
return isBatchedUpdateNotification(update) ? update.tables : [update.table];
}
Loading