Skip to content

Commit 53236a8

Browse files
committed
Rust: Trigger CRUD upload on connect
1 parent 48e7456 commit 53236a8

File tree

5 files changed

+67
-7
lines changed

5 files changed

+67
-7
lines changed

.changeset/light-clocks-hang.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': patch
3+
---
4+
5+
Rust client: Properly upload CRUD entries made while offline.

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,7 @@ The next upload iteration will be delayed.`);
840840
const adapter = this.options.adapter;
841841
const remote = this.options.remote;
842842
let receivingLines: Promise<void> | null = null;
843+
let hadSyncLine = false;
843844

844845
const abortController = new AbortController();
845846
signal.addEventListener('abort', () => abortController.abort());
@@ -884,6 +885,11 @@ The next upload iteration will be delayed.`);
884885
return;
885886
}
886887

888+
if (!hadSyncLine) {
889+
syncImplementation.triggerCrudUpload();
890+
hadSyncLine = true;
891+
}
892+
887893
await control(line.command, line.payload);
888894
}
889895
} finally {

packages/common/src/utils/DataStream.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
3939
protected isClosed: boolean;
4040

4141
protected processingPromise: Promise<void> | null;
42+
protected notifyDataAdded: ((_: null) => void) | null;
4243

4344
protected logger: ILogger;
4445

@@ -90,6 +91,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
9091
}
9192

9293
this.dataQueue.push(data);
94+
this.notifyDataAdded?.(null);
9395

9496
this.processQueue();
9597
}
@@ -151,7 +153,11 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
151153
await this.iterateAsyncErrored(async (l) => l.highWater?.());
152154
}
153155

154-
return (this.processingPromise = this._processQueue());
156+
const promise = (this.processingPromise = this._processQueue());
157+
promise.finally(() => {
158+
return (this.processingPromise = null);
159+
});
160+
return promise;
155161
}
156162

157163
/**
@@ -178,7 +184,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
178184

179185
protected async _processQueue() {
180186
if (this.isClosed || !this.hasDataReader()) {
181-
Promise.resolve().then(() => (this.processingPromise = null));
187+
await Promise.resolve();
182188
return;
183189
}
184190

@@ -188,10 +194,13 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
188194
}
189195

190196
if (this.dataQueue.length <= this.lowWatermark) {
191-
await this.iterateAsyncErrored(async (l) => l.lowWater?.());
192-
}
197+
const dataAdded = new Promise((resolve) => {
198+
this.notifyDataAdded = resolve;
199+
});
193200

194-
this.processingPromise = null;
201+
await Promise.race([this.iterateAsyncErrored(async (l) => l.lowWater?.()), dataAdded]);
202+
this.notifyDataAdded = null;
203+
}
195204

196205
if (this.dataQueue.length) {
197206
// Next tick

packages/node/tests/sync.test.ts

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ function defineSyncTests(impl: SyncClientImplementation) {
424424

425425
mockSyncServiceTest('interrupt and defrag', async ({ syncService }) => {
426426
let database = await syncService.createDatabase();
427-
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
427+
database.connect(new TestConnector(), options);
428428
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
429429

430430
syncService.pushLine({
@@ -442,7 +442,7 @@ function defineSyncTests(impl: SyncClientImplementation) {
442442
await database.close();
443443
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0));
444444
database = await syncService.createDatabase();
445-
database.connect(new TestConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
445+
database.connect(new TestConnector(), options);
446446
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
447447

448448
// A sync rule deploy could reset buckets, making the new bucket smaller than the existing one.
@@ -459,6 +459,39 @@ function defineSyncTests(impl: SyncClientImplementation) {
459459
await waitForSyncStatus(database, (s) => s.downloadProgress == null);
460460
});
461461
});
462+
463+
mockSyncServiceTest('should upload after connecting', async ({ syncService }) => {
464+
let database = await syncService.createDatabase();
465+
466+
database.execute('INSERT INTO lists (id, name) values (uuid(), ?)', ['local write']);
467+
const query = database.watchWithAsyncGenerator('SELECT name FROM lists')[Symbol.asyncIterator]();
468+
let rows = (await query.next()).value.rows._array;
469+
expect(rows).toStrictEqual([{ name: 'local write' }]);
470+
471+
database.connect(new TestConnector(), options);
472+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
473+
474+
syncService.pushLine({ checkpoint: { last_op_id: '1', write_checkpoint: '1', buckets: [bucket('a', 1)] } });
475+
syncService.pushLine({
476+
data: {
477+
bucket: 'a',
478+
data: [
479+
{
480+
checksum: 0,
481+
op_id: '1',
482+
op: 'PUT',
483+
object_id: '1',
484+
object_type: 'lists',
485+
data: '{"name": "from server"}'
486+
}
487+
]
488+
}
489+
});
490+
syncService.pushLine({ checkpoint_complete: { last_op_id: '1' } });
491+
492+
rows = (await query.next()).value.rows._array;
493+
expect(rows).toStrictEqual([{ name: 'from server' }]);
494+
});
462495
}
463496

464497
function bucket(name: string, count: number, options: { priority: number } = { priority: 3 }): BucketChecksum {

packages/node/tests/utils.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,13 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{
116116
});
117117

118118
return new Response(syncLines.pipeThrough(asLines) as any, { status: 200 });
119+
} else if (request.url.indexOf('/write-checkpoint2.json') != -1) {
120+
return new Response(
121+
JSON.stringify({
122+
data: { write_checkpoint: '1' }
123+
}),
124+
{ status: 200 }
125+
);
119126
} else {
120127
return new Response('Not found', { status: 404 });
121128
}

0 commit comments

Comments
 (0)