Skip to content

Commit 562af9a

Browse files
Merge branch 'main' into chore/add-kysely-test-example
2 parents 1ce6d53 + 96f1a87 commit 562af9a

File tree

6 files changed

+86
-28
lines changed

6 files changed

+86
-28
lines changed

.changeset/perfect-zebras-marry.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/common': patch
3+
'@powersync/web': patch
4+
'@powersync/react-native': patch
5+
---
6+
7+
Improved `getCrudBatch` to use a default limit of 100 CRUD entries.

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ export const DEFAULT_POWERSYNC_DB_OPTIONS = {
132132
crudUploadThrottleMs: DEFAULT_CRUD_UPLOAD_THROTTLE_MS
133133
};
134134

135+
export const DEFAULT_CRUD_BATCH_LIMIT = 100;
136+
135137
/**
136138
* Requesting nested or recursive locks can block the application in some circumstances.
137139
* This default lock timeout will act as a failsafe to throw an error if a lock cannot
@@ -492,7 +494,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
492494
* data by transaction. One batch may contain data from multiple transactions,
493495
* and a single transaction may be split over multiple batches.
494496
*/
495-
async getCrudBatch(limit: number): Promise<CrudBatch | null> {
497+
async getCrudBatch(limit: number = DEFAULT_CRUD_BATCH_LIMIT): Promise<CrudBatch | null> {
496498
const result = await this.getAll<CrudEntryJSON>(
497499
`SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} ORDER BY id ASC LIMIT ?`,
498500
[limit + 1]

packages/powersync-op-sqlite/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# @powersync/op-sqlite
22

3+
## 0.0.6
4+
5+
### Patch Changes
6+
7+
- 2b614bc: Improved queueing for read connections
8+
39
## 0.0.5
410

511
### Patch Changes

packages/powersync-op-sqlite/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@powersync/op-sqlite",
3-
"version": "0.0.5",
3+
"version": "0.0.6",
44
"description": "PowerSync - sync Postgres or MongoDB with SQLite in your React Native app for offline-first and real-time data",
55
"source": "./src/index.ts",
66
"main": "./lib/commonjs/index.js",

packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
3535

3636
protected initialized: Promise<void>;
3737

38-
protected readConnections: OPSQLiteConnection[] | null;
38+
protected readConnections: Array<{ busy: boolean; connection: OPSQLiteConnection }> | null;
3939

4040
protected writeConnection: OPSQLiteConnection | null;
4141

42+
private readQueue: Array<() => void> = [];
43+
4244
constructor(protected options: OPSQLiteAdapterOptions) {
4345
super();
4446
this.name = this.options.name;
@@ -88,7 +90,7 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
8890
let dbName = './'.repeat(i + 1) + dbFilename;
8991
const conn = await this.openConnection(dbName);
9092
await conn.execute('PRAGMA query_only = true');
91-
this.readConnections.push(conn);
93+
this.readConnections.push({ busy: false, connection: conn });
9294
}
9395
}
9496

@@ -145,36 +147,46 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
145147
close() {
146148
this.initialized.then(() => {
147149
this.writeConnection!.close();
148-
this.readConnections!.forEach((c) => c.close());
150+
this.readConnections!.forEach((c) => c.connection.close());
149151
});
150152
}
151153

152154
async readLock<T>(fn: (tx: OPSQLiteConnection) => Promise<T>, options?: DBLockOptions): Promise<T> {
153155
await this.initialized;
154-
// TODO: Use async queues to handle multiple read connections
155-
const sortedConnections = this.readConnections!.map((connection, index) => ({
156-
lockKey: `${LockType.READ}-${index}`,
157-
connection
158-
})).sort((a, b) => {
159-
const aBusy = this.locks.isBusy(a.lockKey);
160-
const bBusy = this.locks.isBusy(b.lockKey);
161-
// Sort by ones which are not busy
162-
return aBusy > bBusy ? 1 : 0;
156+
return new Promise(async (resolve, reject) => {
157+
const execute = async () => {
158+
// Find an available connection that is not busy
159+
const availableConnection = this.readConnections!.find((conn) => !conn.busy);
160+
161+
// If we have an available connection, use it
162+
if (availableConnection) {
163+
availableConnection.busy = true;
164+
try {
165+
resolve(await fn(availableConnection.connection));
166+
} catch (error) {
167+
reject(error);
168+
} finally {
169+
availableConnection.busy = false;
170+
// After query execution, process any queued tasks
171+
this.processQueue();
172+
}
173+
} else {
174+
// If no available connections, add to the queue
175+
this.readQueue.push(execute);
176+
}
177+
};
178+
179+
execute();
163180
});
181+
}
164182

165-
return new Promise(async (resolve, reject) => {
166-
try {
167-
await this.locks.acquire(
168-
sortedConnections[0].lockKey,
169-
async () => {
170-
resolve(await fn(sortedConnections[0].connection));
171-
},
172-
{ timeout: options?.timeoutMs }
173-
);
174-
} catch (ex) {
175-
reject(ex);
183+
private async processQueue(): Promise<void> {
184+
if (this.readQueue.length > 0) {
185+
const next = this.readQueue.shift();
186+
if (next) {
187+
next();
176188
}
177-
});
189+
}
178190
}
179191

180192
async writeLock<T>(fn: (tx: OPSQLiteConnection) => Promise<T>, options?: DBLockOptions): Promise<T> {

packages/web/tests/crud.test.ts

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
21
import { AbstractPowerSyncDatabase, Column, ColumnType, CrudEntry, Schema, Table, UpdateType } from '@powersync/common';
32
import { PowerSyncDatabase } from '@powersync/web';
3+
import pDefer from 'p-defer';
44
import { v4 as uuid } from 'uuid';
5+
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
56
import { generateTestDb } from './utils/testDb';
6-
import pDefer from 'p-defer';
77

88
const testId = '2290de4f-0488-4e50-abed-f8e8eb1d0b42';
99

@@ -311,4 +311,35 @@ describe('CRUD Tests', () => {
311311
await txPromise;
312312
expect(await r).toEqual(null);
313313
});
314+
315+
it('CRUD Batch Limits', async () => {
316+
const initialBatch = await powersync.getCrudBatch();
317+
expect(initialBatch, 'Initial CRUD batch should be null').null;
318+
319+
/**
320+
* Create some items. Use Multiple transactions to demonstrate getCrudBatch does not
321+
* group by transaction.
322+
*/
323+
for (let i = 0; i < 2; i++) {
324+
await powersync.writeTransaction(async (tx) => {
325+
for (let j = 0; j < 51; j++) {
326+
await tx.execute(`INSERT INTO assets(id, description) VALUES(uuid(), ?)`, [`test-${i}-${j}`]);
327+
}
328+
});
329+
}
330+
331+
// This should contain CRUD entries for the first and second transaction
332+
const smallBatch = await powersync.getCrudBatch(55);
333+
expect(smallBatch, 'CRUD items should be present').exist;
334+
expect(smallBatch?.crud.length, 'Should only be 55 CRUD items').eq(55);
335+
expect(smallBatch?.haveMore, 'There should be more CRUD items pending').true;
336+
337+
const defaultBatch = await powersync.getCrudBatch();
338+
expect(defaultBatch?.crud.length, 'Should use default limit').eq(100);
339+
expect(defaultBatch?.haveMore, 'There should be more CRUD items pending').true;
340+
341+
const maxBatch = await powersync.getCrudBatch(1000);
342+
expect(maxBatch?.crud.length, 'Should contain all created CRUD items').eq(102);
343+
expect(maxBatch?.haveMore, 'There should not be any more pending CRUD items').false;
344+
});
314345
});

0 commit comments

Comments
 (0)