Skip to content

Commit c01e637

Browse files
committed
Rewrite connection pooling queue.
1 parent 1af625b commit c01e637

File tree

1 file changed

+90
-83
lines changed

1 file changed

+90
-83
lines changed

lib/src/connection_pool.dart

Lines changed: 90 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import 'dart:async';
2+
import 'dart:collection';
23

34
import 'mutex.dart';
45
import 'port_channel.dart';
@@ -12,7 +13,9 @@ import 'update_notification.dart';
1213
class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
1314
SqliteConnection? _writeConnection;
1415

15-
final List<SqliteConnectionImpl> _readConnections = [];
16+
final Set<SqliteConnectionImpl> _allReadConnections = {};
17+
final Queue<SqliteConnectionImpl> _availableReadConnections = Queue();
18+
final Queue<_PendingItem> _queue = Queue();
1619

1720
final SqliteOpenFactory _factory;
1821
final SerializedPortClient _upstreamPort;
@@ -58,62 +61,60 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
5861
return await _writeConnection!.getAutoCommit();
5962
}
6063

61-
@override
62-
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
63-
{Duration? lockTimeout, String? debugContext}) async {
64-
await _expandPool();
65-
66-
return _runZoned(() async {
67-
bool haveLock = false;
68-
var completer = Completer<T>();
69-
70-
var futures = _readConnections.sublist(0).map((connection) async {
71-
if (connection.closed) {
72-
_readConnections.remove(connection);
73-
}
74-
try {
75-
return await connection.readLock((ctx) async {
76-
if (haveLock) {
77-
// Already have a different lock - release this one.
78-
return false;
79-
}
80-
haveLock = true;
81-
82-
var future = callback(ctx);
83-
completer.complete(future);
84-
85-
// We have to wait for the future to complete before we can release the
86-
// lock.
87-
try {
88-
await future;
89-
} catch (_) {
90-
// Ignore
91-
}
92-
93-
return true;
94-
}, lockTimeout: lockTimeout, debugContext: debugContext);
95-
} on TimeoutException {
96-
return false;
97-
} on ClosedException {
98-
return false;
99-
}
100-
});
101-
102-
final stream = Stream<bool>.fromFutures(futures);
103-
var gotAny = await stream.any((element) => element);
104-
105-
if (!gotAny) {
106-
// All TimeoutExceptions
107-
throw TimeoutException('Failed to get a read connection', lockTimeout);
64+
void _nextRead() {
65+
if (_queue.isEmpty) {
66+
// Wait for queue item
67+
return;
68+
} else if (closed) {
69+
while (_queue.isNotEmpty) {
70+
final nextItem = _queue.removeFirst();
71+
nextItem.completer.completeError(const ClosedException());
10872
}
73+
return;
74+
}
75+
76+
while (_availableReadConnections.isNotEmpty &&
77+
_availableReadConnections.last.closed) {
78+
// Remove connections that may have errored
79+
final connection = _availableReadConnections.removeLast();
80+
_allReadConnections.remove(connection);
81+
}
82+
83+
if (_availableReadConnections.isEmpty &&
84+
_allReadConnections.length == maxReaders) {
85+
// Wait for available connection
86+
return;
87+
}
10988

89+
final nextItem = _queue.removeFirst();
90+
nextItem.completer.complete(Future.sync(() async {
91+
final nextConnection = _availableReadConnections.isEmpty
92+
? await _expandPool()
93+
: _availableReadConnections.removeLast();
11094
try {
111-
return await completer.future;
112-
} catch (e) {
113-
// throw e;
114-
rethrow;
95+
final result = await nextConnection.readLock(nextItem.callback);
96+
return result;
97+
} finally {
98+
_availableReadConnections.add(nextConnection);
99+
_nextRead();
115100
}
116-
}, debugContext: debugContext ?? 'get*()');
101+
}));
102+
}
103+
104+
@override
105+
Future<T> readLock<T>(ReadCallback<T> callback,
106+
{Duration? lockTimeout, String? debugContext}) async {
107+
if (closed) {
108+
throw ClosedException();
109+
}
110+
final zone = _getZone(debugContext: debugContext ?? 'get*()');
111+
final item = _PendingItem((ctx) {
112+
return zone.runUnary(callback, ctx);
113+
});
114+
_queue.add(item);
115+
_nextRead();
116+
117+
return await item.completer.future;
117118
}
118119

119120
@override
@@ -146,41 +147,38 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
146147
/// connection (with a different lock).
147148
/// 2. Give a more specific error message when it happens.
148149
T _runZoned<T>(T Function() callback, {required String debugContext}) {
150+
return _getZone(debugContext: debugContext).run(callback);
151+
}
152+
153+
Zone _getZone({required String debugContext}) {
149154
if (Zone.current[this] != null) {
150155
throw LockError(
151156
'Recursive lock is not allowed. Use `tx.$debugContext` instead of `db.$debugContext`.');
152157
}
153-
var zone = Zone.current.fork(zoneValues: {this: true});
154-
return zone.run(callback);
158+
return Zone.current.fork(zoneValues: {this: true});
155159
}
156160

157-
Future<void> _expandPool() async {
158-
if (closed || _readConnections.length >= maxReaders) {
159-
return;
160-
}
161-
bool hasCapacity = _readConnections
162-
.any((connection) => !connection.locked && !connection.closed);
163-
if (!hasCapacity) {
164-
var name = debugName == null
165-
? null
166-
: '$debugName-${_readConnections.length + 1}';
167-
var connection = SqliteConnectionImpl(
168-
upstreamPort: _upstreamPort,
169-
primary: false,
170-
updates: updates,
171-
debugName: name,
172-
mutex: mutex,
173-
readOnly: true,
174-
openFactory: _factory);
175-
_readConnections.add(connection);
176-
177-
// Edge case:
178-
// If we don't await here, there is a chance that a different connection
179-
// is used for the transaction, and that it finishes and deletes the database
180-
// while this one is still opening. This is specifically triggered in tests.
181-
// To avoid that, we wait for the connection to be ready.
182-
await connection.ready;
183-
}
161+
Future<SqliteConnectionImpl> _expandPool() async {
162+
var name = debugName == null
163+
? null
164+
: '$debugName-${_allReadConnections.length + 1}';
165+
var connection = SqliteConnectionImpl(
166+
upstreamPort: _upstreamPort,
167+
primary: false,
168+
updates: updates,
169+
debugName: name,
170+
mutex: mutex,
171+
readOnly: true,
172+
openFactory: _factory);
173+
_allReadConnections.add(connection);
174+
175+
// Edge case:
176+
// If we don't await here, there is a chance that a different connection
177+
// is used for the transaction, and that it finishes and deletes the database
178+
// while this one is still opening. This is specifically triggered in tests.
179+
// To avoid that, we wait for the connection to be ready.
180+
await connection.ready;
181+
return connection;
184182
}
185183

186184
@override
@@ -190,7 +188,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
190188
// It is possible that `readLock()` removes connections from the pool while we're
191189
// closing connections, but not possible for new connections to be added.
192190
// Create a copy of the list, to avoid this triggering "Concurrent modification during iteration"
193-
final toClose = _readConnections.sublist(0);
191+
final toClose = _allReadConnections.toList();
194192
for (var connection in toClose) {
195193
// Wait for connection initialization, so that any existing readLock()
196194
// requests go through before closing.
@@ -203,3 +201,12 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
203201
await _writeConnection?.close();
204202
}
205203
}
204+
205+
typedef ReadCallback<T> = Future<T> Function(SqliteReadContext tx);
206+
207+
class _PendingItem {
208+
ReadCallback<dynamic> callback;
209+
Completer<dynamic> completer = Completer.sync();
210+
211+
_PendingItem(this.callback);
212+
}

0 commit comments

Comments
 (0)