Skip to content

Commit 726197f

Browse files
committed
Re-implement lockTimeout.
1 parent c01e637 commit 726197f

File tree

2 files changed

+64
-6
lines changed

2 files changed

+64
-6
lines changed

lib/src/connection_pool.dart

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,17 +86,29 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
8686
return;
8787
}
8888

89-
final nextItem = _queue.removeFirst();
89+
var nextItem = _queue.removeFirst();
90+
while (nextItem.completer.isCompleted) {
91+
// This item already timed out - try the next one if available
92+
if (_queue.isEmpty) {
93+
return;
94+
}
95+
nextItem = _queue.removeFirst();
96+
}
97+
98+
nextItem.lockTimer?.cancel();
99+
90100
nextItem.completer.complete(Future.sync(() async {
91101
final nextConnection = _availableReadConnections.isEmpty
92102
? await _expandPool()
93103
: _availableReadConnections.removeLast();
94104
try {
105+
// At this point the connection is expected to be available immediately.
106+
// No need to calculate a new lockTimeout here.
95107
final result = await nextConnection.readLock(nextItem.callback);
96108
return result;
97109
} finally {
98110
_availableReadConnections.add(nextConnection);
99-
_nextRead();
111+
Timer.run(_nextRead);
100112
}
101113
}));
102114
}
@@ -110,11 +122,11 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
110122
final zone = _getZone(debugContext: debugContext ?? 'get*()');
111123
final item = _PendingItem((ctx) {
112124
return zone.runUnary(callback, ctx);
113-
});
125+
}, lockTimeout: lockTimeout);
114126
_queue.add(item);
115127
_nextRead();
116128

117-
return await item.completer.future;
129+
return (await item.future) as T;
118130
}
119131

120132
@override
@@ -207,6 +219,28 @@ typedef ReadCallback<T> = Future<T> Function(SqliteReadContext tx);
207219
class _PendingItem {
208220
ReadCallback<dynamic> callback;
209221
Completer<dynamic> completer = Completer.sync();
210-
211-
_PendingItem(this.callback);
222+
late Future<dynamic> future = completer.future;
223+
DateTime? deadline;
224+
final Duration? lockTimeout;
225+
late final Timer? lockTimer;
226+
227+
_PendingItem(this.callback, {this.lockTimeout}) {
228+
if (lockTimeout != null) {
229+
deadline = DateTime.now().add(lockTimeout!);
230+
lockTimer = Timer(lockTimeout!, () {
231+
// Note: isCompleted is true when `nextItem.completer.complete` is called, not when the result is available.
232+
// This matches the behavior we need for a timeout on the lock, but not the entire operation.
233+
if (!completer.isCompleted) {
234+
// completer.completeError(
235+
// TimeoutException('Failed to get a read connection', lockTimeout));
236+
completer.complete(Future.sync(() async {
237+
throw TimeoutException(
238+
'Failed to get a read connection', lockTimeout);
239+
}));
240+
}
241+
});
242+
} else {
243+
lockTimer = null;
244+
}
245+
}
212246
}

test/basic_test.dart

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import 'dart:async';
22
import 'dart:math';
3+
import 'dart:typed_data';
34

45
import 'package:sqlite3/sqlite3.dart' as sqlite;
56
import 'package:sqlite_async/mutex.dart';
@@ -399,6 +400,29 @@ void main() {
399400
await future1;
400401
await future2;
401402
});
403+
404+
test('lockTimeout', () async {
405+
final db =
406+
SqliteDatabase.withFactory(testFactory(path: path), maxReaders: 2);
407+
await db.initialize();
408+
409+
final f1 = db.readTransaction((tx) async {
410+
await tx.get('select test_sleep(100)');
411+
}, lockTimeout: const Duration(milliseconds: 200));
412+
413+
final f2 = db.readTransaction((tx) async {
414+
await tx.get('select test_sleep(100)');
415+
}, lockTimeout: const Duration(milliseconds: 200));
416+
417+
// At this point, both read connections are in use
418+
await expectLater(() async {
419+
await db.readLock((tx) async {
420+
await tx.get('select test_sleep(10)');
421+
}, lockTimeout: const Duration(milliseconds: 2));
422+
}, throwsA((e) => e is TimeoutException));
423+
424+
await Future.wait([f1, f2]);
425+
});
402426
});
403427
}
404428

0 commit comments

Comments
 (0)