Skip to content

Commit 2be1ff2

Browse files
authored
Merge pull request #4 from journeyapps/recursive-lock-errors
Improve recursive lock errors
2 parents 7cf6db9 + db3618e commit 2be1ff2

11 files changed

+168
-75
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 0.3.0
2+
3+
- Better error messages for recursive transactions.
4+
- Breaking change: Error by default when starting a read transaction within a write transaction.
5+
16
## 0.2.1
27

38
- Fix update notifications missing the first update.

README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ query access.
2323
* Automatically convert query args to JSON where applicable, making JSON1 operations simple.
2424
* Direct SQL queries - no wrapper classes or code generation required.
2525

26+
See this [blog post](https://www.powersync.co/blog/sqlite-optimizations-for-ultra-high-performance),
27+
explaining why these features are important for using SQLite.
28+
2629
## Installation
2730

2831
```sh
@@ -65,8 +68,8 @@ void main() async {
6568
// 1. Atomic persistence (all updates are either applied or rolled back).
6669
// 2. Improved throughput.
6770
await db.writeTransaction((tx) async {
68-
await db.execute('INSERT INTO test_data(data) values(?)', ['Test3']);
69-
await db.execute('INSERT INTO test_data(data) values(?)', ['Test4']);
71+
await tx.execute('INSERT INTO test_data(data) values(?)', ['Test3']);
72+
await tx.execute('INSERT INTO test_data(data) values(?)', ['Test4']);
7073
});
7174
7275
await db.close();

example/basic_example.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ void main() async {
2929
// 1. Atomic persistence (all updates are either applied or rolled back).
3030
// 2. Improved throughput.
3131
await db.writeTransaction((tx) async {
32-
await db.execute('INSERT INTO test_data(data) values(?)', ['Test3']);
33-
await db.execute('INSERT INTO test_data(data) values(?)', ['Test4']);
32+
await tx.execute('INSERT INTO test_data(data) values(?)', ['Test3']);
33+
await tx.execute('INSERT INTO test_data(data) values(?)', ['Test4']);
3434
});
3535

3636
// Close database to release resources

lib/src/connection_pool.dart

Lines changed: 64 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -50,58 +50,60 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
5050

5151
@override
5252
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
53-
{Duration? lockTimeout}) async {
53+
{Duration? lockTimeout, String? debugContext}) async {
5454
await _expandPool();
5555

56-
bool haveLock = false;
57-
var completer = Completer<T>();
56+
return _runZoned(() async {
57+
bool haveLock = false;
58+
var completer = Completer<T>();
59+
60+
var futures = _readConnections.sublist(0).map((connection) async {
61+
try {
62+
return await connection.readLock((ctx) async {
63+
if (haveLock) {
64+
// Already have a different lock - release this one.
65+
return false;
66+
}
67+
haveLock = true;
68+
69+
var future = callback(ctx);
70+
completer.complete(future);
71+
72+
// We have to wait for the future to complete before we can release the
73+
// lock.
74+
try {
75+
await future;
76+
} catch (_) {
77+
// Ignore
78+
}
79+
80+
return true;
81+
}, lockTimeout: lockTimeout, debugContext: debugContext);
82+
} on TimeoutException {
83+
return false;
84+
}
85+
});
86+
87+
final stream = Stream<bool>.fromFutures(futures);
88+
var gotAny = await stream.any((element) => element);
89+
90+
if (!gotAny) {
91+
// All TimeoutExceptions
92+
throw TimeoutException('Failed to get a read connection', lockTimeout);
93+
}
5894

59-
var futures = _readConnections.sublist(0).map((connection) async {
6095
try {
61-
return await connection.readLock((ctx) async {
62-
if (haveLock) {
63-
// Already have a different lock - release this one.
64-
return false;
65-
}
66-
haveLock = true;
67-
68-
var future = callback(ctx);
69-
completer.complete(future);
70-
71-
// We have to wait for the future to complete before we can release the
72-
// lock.
73-
try {
74-
await future;
75-
} catch (_) {
76-
// Ignore
77-
}
78-
79-
return true;
80-
}, lockTimeout: lockTimeout);
81-
} on TimeoutException {
82-
return false;
96+
return await completer.future;
97+
} catch (e) {
98+
// throw e;
99+
rethrow;
83100
}
84-
});
85-
86-
final stream = Stream<bool>.fromFutures(futures);
87-
var gotAny = await stream.any((element) => element);
88-
89-
if (!gotAny) {
90-
// All TimeoutExceptions
91-
throw TimeoutException('Failed to get a read connection', lockTimeout);
92-
}
93-
94-
try {
95-
return await completer.future;
96-
} catch (e) {
97-
// throw e;
98-
rethrow;
99-
}
101+
}, debugContext: debugContext ?? 'get*()');
100102
}
101103

102104
@override
103105
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
104-
{Duration? lockTimeout}) {
106+
{Duration? lockTimeout, String? debugContext}) {
105107
if (closed) {
106108
throw AssertionError('Closed');
107109
}
@@ -113,7 +115,25 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
113115
mutex: mutex,
114116
readOnly: false,
115117
openFactory: _factory);
116-
return _writeConnection!.writeLock(callback, lockTimeout: lockTimeout);
118+
return _runZoned(() {
119+
return _writeConnection!.writeLock(callback,
120+
lockTimeout: lockTimeout, debugContext: debugContext);
121+
}, debugContext: debugContext ?? 'execute()');
122+
}
123+
124+
/// The [Mutex] on individual connections do already error in recursive locks.
125+
///
126+
/// We duplicate the same check here, to:
127+
/// 1. Also error when the recursive transaction is handled by a different
128+
/// connection (with a different lock).
129+
/// 2. Give a more specific error message when it happens.
130+
T _runZoned<T>(T Function() callback, {required String debugContext}) {
131+
if (Zone.current[this] != null) {
132+
throw LockError(
133+
'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.');
134+
}
135+
var zone = Zone.current.fork(zoneValues: {this: true});
136+
return zone.run(callback);
117137
}
118138

119139
Future<void> _expandPool() async {

lib/src/mutex.dart

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class SimpleMutex implements Mutex {
4040
@override
4141
Future<T> lock<T>(Future<T> Function() callback, {Duration? timeout}) async {
4242
if (Zone.current[this] != null) {
43-
throw AssertionError('Recursive lock is not allowed');
43+
throw LockError('Recursive lock is not allowed');
4444
}
4545
var zone = Zone.current.fork(zoneValues: {this: true});
4646

@@ -132,7 +132,7 @@ class SharedMutex implements Mutex {
132132
@override
133133
Future<T> lock<T>(Future<T> Function() callback, {Duration? timeout}) async {
134134
if (Zone.current[this] != null) {
135-
throw AssertionError('Recursive lock is not allowed');
135+
throw LockError('Recursive lock is not allowed');
136136
}
137137
return runZoned(() async {
138138
await _acquire(timeout: timeout);
@@ -223,3 +223,14 @@ class _AcquireMessage {
223223
class _UnlockMessage {
224224
const _UnlockMessage();
225225
}
226+
227+
class LockError extends Error {
228+
final String message;
229+
230+
LockError(this.message);
231+
232+
@override
233+
String toString() {
234+
return 'LockError: $message';
235+
}
236+
}

lib/src/sqlite_connection.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,13 @@ abstract class SqliteConnection extends SqliteWriteContext {
9494
///
9595
/// In most cases, [readTransaction] should be used instead.
9696
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
97-
{Duration? lockTimeout});
97+
{Duration? lockTimeout, String? debugContext});
9898

9999
/// Takes a global lock, without starting a transaction.
100100
///
101101
/// In most cases, [writeTransaction] should be used instead.
102102
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
103-
{Duration? lockTimeout});
103+
{Duration? lockTimeout, String? debugContext});
104104

105105
Future<void> close();
106106
}

lib/src/sqlite_connection_impl.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
7979

8080
@override
8181
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
82-
{Duration? lockTimeout}) async {
82+
{Duration? lockTimeout, String? debugContext}) async {
8383
// Private lock to synchronize this with other statements on the same connection,
8484
// to ensure that transactions aren't interleaved.
8585
return _connectionMutex.lock(() async {
@@ -94,7 +94,7 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
9494

9595
@override
9696
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
97-
{Duration? lockTimeout}) async {
97+
{Duration? lockTimeout, String? debugContext}) async {
9898
final stopWatch = lockTimeout == null ? null : (Stopwatch()..start());
9999
// Private lock to synchronize this with other statements on the same connection,
100100
// to ensure that transactions aren't interleaved.

lib/src/sqlite_database.dart

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,13 +209,15 @@ class SqliteDatabase with SqliteQueries implements SqliteConnection {
209209

210210
@override
211211
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
212-
{Duration? lockTimeout}) {
213-
return _pool.readLock(callback, lockTimeout: lockTimeout);
212+
{Duration? lockTimeout, String? debugContext}) {
213+
return _pool.readLock(callback,
214+
lockTimeout: lockTimeout, debugContext: debugContext);
214215
}
215216

216217
@override
217218
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
218-
{Duration? lockTimeout}) {
219-
return _pool.writeLock(callback, lockTimeout: lockTimeout);
219+
{Duration? lockTimeout, String? debugContext}) {
220+
return _pool.writeLock(callback,
221+
lockTimeout: lockTimeout, debugContext: debugContext);
220222
}
221223
}

lib/src/sqlite_queries.dart

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,30 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection {
1717
[List<Object?> parameters = const []]) async {
1818
return writeLock((ctx) async {
1919
return ctx.execute(sql, parameters);
20-
});
20+
}, debugContext: 'execute()');
2121
}
2222

2323
@override
2424
Future<sqlite.ResultSet> getAll(String sql,
2525
[List<Object?> parameters = const []]) {
2626
return readLock((ctx) async {
2727
return ctx.getAll(sql, parameters);
28-
});
28+
}, debugContext: 'getAll()');
2929
}
3030

3131
@override
3232
Future<sqlite.Row> get(String sql, [List<Object?> parameters = const []]) {
3333
return readLock((ctx) async {
3434
return ctx.get(sql, parameters);
35-
});
35+
}, debugContext: 'get()');
3636
}
3737

3838
@override
3939
Future<sqlite.Row?> getOptional(String sql,
4040
[List<Object?> parameters = const []]) {
4141
return readLock((ctx) async {
4242
return ctx.getOptional(sql, parameters);
43-
});
43+
}, debugContext: 'getOptional()');
4444
}
4545

4646
@override
@@ -103,7 +103,7 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection {
103103
{Duration? lockTimeout}) async {
104104
return readLock((ctx) async {
105105
return await internalReadTransaction(ctx, callback);
106-
}, lockTimeout: lockTimeout);
106+
}, lockTimeout: lockTimeout, debugContext: 'readTransaction()');
107107
}
108108

109109
@override
@@ -112,7 +112,7 @@ mixin SqliteQueries implements SqliteWriteContext, SqliteConnection {
112112
{Duration? lockTimeout}) async {
113113
return writeLock((ctx) async {
114114
return await internalWriteTransaction(ctx, callback);
115-
}, lockTimeout: lockTimeout);
115+
}, lockTimeout: lockTimeout, debugContext: 'writeTransaction()');
116116
}
117117

118118
/// See [SqliteReadContext.computeWithDatabase].

pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name: sqlite_async
22
description: High-performance asynchronous interface for SQLite on Dart and Flutter.
3-
version: 0.2.1
3+
version: 0.3.0
44
repository: https://github.com/journeyapps/sqlite_async.dart
55
environment:
66
sdk: '>=2.19.1 <3.0.0'

0 commit comments

Comments
 (0)