Skip to content

Commit 711a96a

Browse files
committed
POC of exposing direct op-sqlite connections.
1 parent 47a2302 commit 711a96a

File tree

4 files changed

+98
-5
lines changed

4 files changed

+98
-5
lines changed

packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ export class OPSQLiteConnection extends BaseObserver<DBAdapterListener> {
5858
});
5959
}
6060

61+
hasUpdates(): boolean {
62+
return this.updateBuffer.length > 0;
63+
}
64+
6165
flushUpdates() {
6266
if (!this.updateBuffer.length) {
6367
return;

packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ANDROID_DATABASE_PATH, getDylibPath, IOS_LIBRARY_PATH, open, type DB } from '@op-engineering/op-sqlite';
1+
import { getDylibPath, open, type DB } from '@op-engineering/op-sqlite';
22
import { BaseObserver, DBAdapter, DBAdapterListener, DBLockOptions, QueryResult, Transaction } from '@powersync/common';
33
import Lock from 'async-lock';
44
import { Platform } from 'react-native';
@@ -158,6 +158,10 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
158158

159159
async readLock<T>(fn: (tx: OPSQLiteConnection) => Promise<T>, options?: DBLockOptions): Promise<T> {
160160
await this.initialized;
161+
if (this.readConnections?.length == 0) {
162+
// When opened with no read connections, use the write connection for reads.
163+
return this.writeLock(fn, options);
164+
}
161165
return new Promise(async (resolve, reject) => {
162166
const execute = async () => {
163167
// Find an available connection that is not busy
@@ -231,11 +235,11 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
231235
}
232236

233237
readTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions): Promise<T> {
234-
return this.readLock((ctx) => this.internalTransaction(ctx, fn));
238+
return this.readLock((ctx) => this.internalTransaction(ctx, 'BEGIN', fn));
235239
}
236240

237241
writeTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions): Promise<T> {
238-
return this.writeLock((ctx) => this.internalTransaction(ctx, fn));
242+
return this.writeLock((ctx) => this.internalTransaction(ctx, 'BEGIN EXCLUSIVE', fn));
239243
}
240244

241245
getAll<T>(sql: string, parameters?: any[]): Promise<T[]> {
@@ -264,6 +268,7 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
264268

265269
protected async internalTransaction<T>(
266270
connection: OPSQLiteConnection,
271+
transactionType: 'BEGIN' | 'BEGIN EXCLUSIVE',
267272
fn: (tx: Transaction) => Promise<T>
268273
): Promise<T> {
269274
let finalized = false;
@@ -282,7 +287,7 @@ export class OPSQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
282287
return connection.execute('ROLLBACK');
283288
};
284289
try {
285-
await connection.execute('BEGIN');
290+
await connection.execute(transactionType);
286291
const result = await fn({
287292
execute: (query, params) => connection.execute(query, params),
288293
executeRaw: (query, params) => connection.executeRaw(query, params),

packages/powersync-op-sqlite/src/db/OPSqliteDBOpenFactory.ts

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
import { DBAdapter, SQLOpenFactory, SQLOpenOptions } from '@powersync/common';
22
import { OPSQLiteDBAdapter } from './OPSqliteAdapter';
33
import { DEFAULT_SQLITE_OPTIONS, SqliteOptions } from './SqliteOptions';
4+
import { OPSQLiteConnection } from './OPSQLiteConnection';
5+
import { DB } from '@op-engineering/op-sqlite';
46

57
export interface OPSQLiteOpenFactoryOptions extends SQLOpenOptions {
68
sqliteOptions?: SqliteOptions;
79
}
810
export class OPSqliteOpenFactory implements SQLOpenFactory {
911
private sqliteOptions: Required<SqliteOptions>;
12+
private adapters: Set<OPSQLiteDBAdapter> = new Set();
1013

1114
constructor(protected options: OPSQLiteOpenFactoryOptions) {
1215
this.sqliteOptions = {
@@ -16,10 +19,85 @@ export class OPSqliteOpenFactory implements SQLOpenFactory {
1619
}
1720

1821
openDB(): DBAdapter {
19-
return new OPSQLiteDBAdapter({
22+
const adapter = new OPSQLiteDBAdapter({
2023
name: this.options.dbFilename,
2124
dbLocation: this.options.dbLocation,
2225
sqliteOptions: this.sqliteOptions
2326
});
27+
this.adapters.add(adapter);
28+
(adapter as any).abortController.signal.addEventListener('abort', () => {
29+
this.adapters.delete(adapter);
30+
});
31+
32+
return adapter;
33+
}
34+
35+
/**
36+
* Opens a direct op-sqlite DB connection. This can be used concurrently with PowerSyncDatabase.
37+
*
38+
* This can be used to execute synchronous queries, or to access other op-sqlite functionality directly.
39+
*
40+
* Update notifications are propagated to any other PowerSyncDatabase opened with this factory.
41+
*
42+
* If a write statement or transaction is currently open on any of the other adapters, any
43+
* write statements on this connection will block until the others are done. This may create a deadlock,
44+
* since this also blocks the JavaScript thread. For that reason, do any write statements in a
45+
* writeLock() on the PowerSyncDatabase.
46+
*
47+
* Read statements can execute concurrently with write statements, so does not have the same risk.
48+
*
49+
* This is not recommended for most use cases, as synchronous queries block the JavaScript thread,
50+
* and the code is not portable to other platforms.
51+
*/
52+
async openDirectConnection(): Promise<DB> {
53+
const adapter = new OPSQLiteDBAdapter({
54+
name: this.options.dbFilename,
55+
dbLocation: this.options.dbLocation,
56+
sqliteOptions: {
57+
...this.sqliteOptions,
58+
readConnections: 0,
59+
// Configure the BUSY_TIMEOUT to be very short, since this is a direct connection.
60+
// In general, we should not wait for a lock when using any synchronous queries,
61+
// since any locks won't be released while we lock the JS thread.
62+
lockTimeoutMs: 50
63+
}
64+
});
65+
await (adapter as any).initialized;
66+
67+
const connection = (adapter as any).writeConnection as OPSQLiteConnection;
68+
connection.registerListener({
69+
tablesUpdated: (updateNotification) => {
70+
// Pass on to all other adapters.
71+
this.adapters.forEach((adapter) => {
72+
adapter.iterateListeners((listener) => {
73+
listener.tablesUpdated?.(updateNotification);
74+
});
75+
});
76+
}
77+
});
78+
const database = (connection as any).DB as DB;
79+
80+
database.commitHook(() => {
81+
// This is effectively a "pre-commit" hook, so changes may not actually reflect yet.
82+
// To make sure the changes reflect, we first get start a new write transaction (not just a
83+
// write lock, since we need to get a lock on the actual SQLite file).
84+
const firstAdapter = [...this.adapters][0];
85+
if (firstAdapter != null && connection.hasUpdates()) {
86+
firstAdapter
87+
.writeLock(async (tx) => {
88+
// Slightly less overhead than writeTransaction().
89+
await tx.execute('BEGIN EXCLUSIVE; ROLLBACK;');
90+
})
91+
.catch((e) => {
92+
// Ignore
93+
})
94+
.finally(() => {
95+
// This triggers the listeners registered above
96+
connection.flushUpdates();
97+
});
98+
}
99+
});
100+
101+
return database;
24102
}
25103
}

packages/powersync-op-sqlite/src/db/SqliteOptions.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ export interface SqliteOptions {
5353
path: string;
5454
entryPoint?: string;
5555
}>;
56+
57+
/**
58+
* Number of read-only connections to use. Defaults to 5.
59+
*/
60+
readConnections?: number;
5661
}
5762

5863
export enum TemporaryStorageOption {
@@ -89,5 +94,6 @@ export const DEFAULT_SQLITE_OPTIONS: Required<SqliteOptions> = {
8994
temporaryStorage: TemporaryStorageOption.MEMORY,
9095
lockTimeoutMs: 30000,
9196
encryptionKey: null,
97+
readConnections: 5,
9298
extensions: []
9399
};

0 commit comments

Comments
 (0)