Skip to content

Commit 3d899b0

Browse files
committed
Refactor update notifications; avoid global locks where we can.
1 parent af6d485 commit 3d899b0

File tree

4 files changed

+23
-34
lines changed

4 files changed

+23
-34
lines changed

lib/src/connection_pool.dart

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,8 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
160160
return readLock((ctx) => callback(ctx as SqliteWriteContext),
161161
lockTimeout: lockTimeout, debugContext: debugContext);
162162
} else {
163-
// FIXME:
164-
// This should avoid using global locks, but then we need to fix
165-
// update notifications to only fire after commit.
166163
return _writeLock(callback,
167-
lockTimeout: lockTimeout, debugContext: debugContext, global: true);
164+
lockTimeout: lockTimeout, debugContext: debugContext, global: false);
168165
}
169166
}
170167

lib/src/sqlite_connection_impl.dart

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,21 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
105105
return _connectionMutex.locked;
106106
}
107107

108+
@override
109+
Future<T> lock<T>(Future<T> Function(SqliteWriteContext tx) callback,
110+
{bool? readOnly, Duration? lockTimeout, String? debugContext}) async {
111+
// Private lock to synchronize this with other statements on the same connection,
112+
// to ensure that transactions aren't interleaved.
113+
return _connectionMutex.lock(() async {
114+
final ctx = _TransactionContext(_isolateClient);
115+
try {
116+
return await callback(ctx);
117+
} finally {
118+
await ctx.close();
119+
}
120+
}, timeout: lockTimeout);
121+
}
122+
108123
@override
109124
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
110125
{Duration? lockTimeout, String? debugContext}) async {
@@ -262,7 +277,6 @@ Future<void> _sqliteConnectionIsolateInner(_SqliteConnectionParams params,
262277
final server = params.portServer;
263278
final commandPort = ReceivePort();
264279

265-
Timer? updateDebouncer;
266280
Set<String> updatedTables = {};
267281
int? txId;
268282
Object? txError;
@@ -271,25 +285,18 @@ Future<void> _sqliteConnectionIsolateInner(_SqliteConnectionParams params,
271285
if (updatedTables.isNotEmpty) {
272286
client.fire(UpdateNotification(updatedTables));
273287
updatedTables.clear();
274-
updateDebouncer?.cancel();
275-
updateDebouncer = null;
276288
}
277289
}
278290

279291
db.updates.listen((event) {
280292
updatedTables.add(event.tableName);
281-
282-
// This handles two cases:
283-
// 1. Update arrived after _SqliteIsolateClose (not sure if this could happen).
284-
// 2. Long-running _SqliteIsolateClosure that should fire updates while running.
285-
updateDebouncer ??=
286-
Timer(const Duration(milliseconds: 10), maybeFireUpdates);
287293
});
288294

289295
server.open((data) async {
290296
if (data is _SqliteIsolateClose) {
291297
if (txId != null) {
292298
if (!db.autocommit) {
299+
updatedTables.clear();
293300
db.execute('ROLLBACK');
294301
}
295302
txId = null;

lib/src/sqlite_database.dart

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -117,26 +117,11 @@ class SqliteDatabase with SqliteQueries implements SqliteConnection {
117117
}
118118

119119
void _listenForEvents() {
120-
UpdateNotification? updates;
121-
122120
Map<SendPort, StreamSubscription> subscriptions = {};
123121

124122
_eventsPort = PortServer((message) async {
125123
if (message is UpdateNotification) {
126-
if (updates == null) {
127-
updates = message;
128-
// Use the mutex to only send updates after the current transaction.
129-
// Do take care to avoid getting a lock for each individual update -
130-
// that could add massive performance overhead.
131-
mutex.lock(() async {
132-
if (updates != null) {
133-
_updatesController.add(updates!);
134-
updates = null;
135-
}
136-
});
137-
} else {
138-
updates!.tables.addAll(message.tables);
139-
}
124+
_updatesController.add(message);
140125
return null;
141126
} else if (message is InitDb) {
142127
await _initialized;

lib/src/sqlite_queries.dart

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,11 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection {
164164
return readTransaction((ctx) => callback(ctx as SqliteWriteContext),
165165
lockTimeout: lockTimeout);
166166
} else {
167-
// FIXME:
168-
// This should avoid using global locks, but then we need to fix
169-
// update notifications to only fire after commit.
170-
// ignore: deprecated_member_use_from_same_package
171-
return writeTransaction(callback, lockTimeout: lockTimeout);
167+
// Uses connection-level lock, unlike writeTransaction which uses a
168+
// database-level lock.
169+
return lock((ctx) async {
170+
return await internalWriteTransaction(ctx, callback);
171+
}, lockTimeout: lockTimeout, debugContext: 'writeTransaction()');
172172
}
173173
}
174174
}

0 commit comments

Comments
 (0)