Skip to content

Commit 79b0ff9

Browse files
[Alpha WIP] Open Factory SqliteConnections (#30)
Return SqliteConnection from open factory for web
1 parent 96400b3 commit 79b0ff9

24 files changed

+505
-493
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ assets
1111

1212
.idea
1313
.vscode
14+
.devcontainer
1415
*.db
1516
*.db-*
1617
test-db

lib/src/common/abstract_open_factory.dart

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ import 'dart:async';
22
import 'package:meta/meta.dart';
33

44
import 'package:sqlite_async/sqlite3_common.dart' as sqlite;
5+
import 'package:sqlite_async/src/common/mutex.dart';
6+
import 'package:sqlite_async/src/sqlite_connection.dart';
57
import 'package:sqlite_async/src/sqlite_options.dart';
8+
import 'package:sqlite_async/src/update_notification.dart';
69

710
/// Factory to create new SQLite database connections.
811
///
@@ -11,7 +14,11 @@ import 'package:sqlite_async/src/sqlite_options.dart';
1114
abstract class SqliteOpenFactory<Database extends sqlite.CommonDatabase> {
1215
String get path;
1316

17+
/// Opens a direct connection to the SQLite database
1418
FutureOr<Database> open(SqliteOpenOptions options);
19+
20+
/// Opens an asynchronous [SqliteConnection]
21+
FutureOr<SqliteConnection> openConnection(SqliteOpenOptions options);
1522
}
1623

1724
class SqliteOpenOptions {
@@ -21,8 +28,21 @@ class SqliteOpenOptions {
2128
/// Whether this connection is read-only.
2229
final bool readOnly;
2330

31+
/// Mutex to use in [SqliteConnection]s
32+
final Mutex? mutex;
33+
34+
/// Name used in debug logs
35+
final String? debugName;
36+
37+
/// Stream of external update notifications
38+
final Stream<UpdateNotification>? updates;
39+
2440
const SqliteOpenOptions(
25-
{required this.primaryConnection, required this.readOnly});
41+
{required this.primaryConnection,
42+
required this.readOnly,
43+
this.mutex,
44+
this.debugName,
45+
this.updates});
2646

2747
sqlite.OpenMode get openMode {
2848
if (primaryConnection) {
@@ -55,9 +75,14 @@ abstract class AbstractDefaultSqliteOpenFactory<
5575
List<String> pragmaStatements(SqliteOpenOptions options);
5676

5777
@protected
78+
79+
/// Opens a direct connection to a SQLite database connection
5880
FutureOr<Database> openDB(SqliteOpenOptions options);
5981

6082
@override
83+
84+
/// Opens a direct connection to a SQLite database connection
85+
/// and executes setup pragma statements to initialize the DB
6186
FutureOr<Database> open(SqliteOpenOptions options) async {
6287
var db = await openDB(options);
6388

@@ -66,4 +91,10 @@ abstract class AbstractDefaultSqliteOpenFactory<
6691
}
6792
return db;
6893
}
94+
95+
@override
96+
97+
/// Opens an asynchronous [SqliteConnection] to a SQLite database
98+
/// and executes setup pragma statements to initialize the DB
99+
FutureOr<SqliteConnection> openConnection(SqliteOpenOptions options);
69100
}

lib/src/common/isolate_connection_factory.dart

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,11 @@ abstract class IsolateConnectionFactory<Database extends sqlite.CommonDatabase>
3232
factory IsolateConnectionFactory(
3333
{required openFactory,
3434
required mutex,
35-
SerializedPortClient? upstreamPort}) {
35+
required SerializedPortClient upstreamPort}) {
3636
return IsolateConnectionFactoryImpl(
37-
openFactory: openFactory,
38-
mutex: mutex,
39-
upstreamPort: upstreamPort as SerializedPortClient)
40-
as IsolateConnectionFactory<Database>;
37+
openFactory: openFactory,
38+
mutex: mutex,
39+
upstreamPort: upstreamPort) as IsolateConnectionFactory<Database>;
4140
}
4241

4342
/// Open a new SqliteConnection.

lib/src/common/sqlite_database.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import 'dart:async';
22

3+
import 'package:meta/meta.dart';
34
import 'package:sqlite_async/src/common/abstract_open_factory.dart';
45
import 'package:sqlite_async/src/common/isolate_connection_factory.dart';
56
import 'package:sqlite_async/src/impl/sqlite_database_impl.dart';
@@ -26,6 +27,7 @@ mixin SqliteDatabaseMixin implements SqliteConnection, SqliteQueries {
2627
final StreamController<UpdateNotification> updatesController =
2728
StreamController.broadcast();
2829

30+
@protected
2931
Future<void> get isInitialized;
3032

3133
/// Wait for initialization to complete.

lib/src/impl/stub_sqlite_database.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import 'package:meta/meta.dart';
12
import 'package:sqlite_async/src/common/isolate_connection_factory.dart';
23
import 'package:sqlite_async/src/common/abstract_open_factory.dart';
34
import 'package:sqlite_async/src/common/sqlite_database.dart';
@@ -31,6 +32,7 @@ class SqliteDatabaseImpl
3132
}
3233

3334
@override
35+
@protected
3436
Future<void> get isInitialized => throw UnimplementedError();
3537

3638
@override

lib/src/impl/stub_sqlite_open_factory.dart

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import 'dart:async';
2+
13
import 'package:sqlite_async/sqlite3_common.dart';
24
import 'package:sqlite_async/src/common/abstract_open_factory.dart';
5+
import 'package:sqlite_async/src/sqlite_connection.dart';
36
import 'package:sqlite_async/src/sqlite_options.dart';
47

58
class DefaultSqliteOpenFactory extends AbstractDefaultSqliteOpenFactory {
@@ -16,4 +19,9 @@ class DefaultSqliteOpenFactory extends AbstractDefaultSqliteOpenFactory {
1619
List<String> pragmaStatements(SqliteOpenOptions options) {
1720
throw UnimplementedError();
1821
}
22+
23+
@override
24+
FutureOr<SqliteConnection> openConnection(SqliteOpenOptions options) {
25+
throw UnimplementedError();
26+
}
1927
}

lib/src/native/database/connection_pool.dart

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,26 @@
11
import 'dart:async';
22

3-
import 'package:sqlite_async/src/common/abstract_open_factory.dart';
4-
import 'package:sqlite_async/src/common/mutex.dart';
5-
import 'package:sqlite_async/src/common/port_channel.dart';
3+
import 'package:sqlite_async/sqlite_async.dart';
64
import 'package:sqlite_async/src/native/database/native_sqlite_connection_impl.dart';
75
import 'package:sqlite_async/src/native/native_isolate_mutex.dart';
8-
import 'package:sqlite_async/src/sqlite_connection.dart';
9-
import 'package:sqlite_async/src/sqlite_queries.dart';
10-
import 'package:sqlite_async/src/update_notification.dart';
116

127
/// A connection pool with a single write connection and multiple read connections.
138
class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
14-
SqliteConnection? _writeConnection;
9+
final StreamController<UpdateNotification> updatesController =
10+
StreamController.broadcast();
11+
12+
@override
13+
14+
/// The write connection might be recreated if it's closed
15+
/// This will allow the update stream remain constant even
16+
/// after using a new write connection.
17+
late final Stream<UpdateNotification> updates = updatesController.stream;
18+
19+
SqliteConnectionImpl? _writeConnection;
1520

1621
final List<SqliteConnectionImpl> _readConnections = [];
1722

1823
final AbstractDefaultSqliteOpenFactory _factory;
19-
final SerializedPortClient _upstreamPort;
20-
21-
@override
22-
final Stream<UpdateNotification>? updates;
2324

2425
final int maxReaders;
2526

@@ -41,14 +42,14 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
4142
/// Read connections are opened in read-only mode, and will reject any statements
4243
/// that modify the database.
4344
SqliteConnectionPool(this._factory,
44-
{this.updates,
45-
this.maxReaders = 5,
46-
SqliteConnection? writeConnection,
45+
{this.maxReaders = 5,
46+
SqliteConnectionImpl? writeConnection,
4747
this.debugName,
48-
required this.mutex,
49-
required SerializedPortClient upstreamPort})
50-
: _writeConnection = writeConnection,
51-
_upstreamPort = upstreamPort;
48+
required this.mutex})
49+
: _writeConnection = writeConnection {
50+
// Use the write connection's updates
51+
_writeConnection?.updates?.forEach(updatesController.add);
52+
}
5253

5354
/// Returns true if the _write_ connection is currently in autocommit mode.
5455
@override
@@ -117,21 +118,24 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
117118

118119
@override
119120
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
120-
{Duration? lockTimeout, String? debugContext}) {
121+
{Duration? lockTimeout, String? debugContext}) async {
121122
if (closed) {
122123
throw AssertionError('Closed');
123124
}
124125
if (_writeConnection?.closed == true) {
125126
_writeConnection = null;
126127
}
127-
_writeConnection ??= SqliteConnectionImpl(
128-
upstreamPort: _upstreamPort,
129-
primary: false,
130-
updates: updates,
131-
debugName: debugName != null ? '$debugName-writer' : null,
132-
mutex: mutex,
133-
readOnly: false,
134-
openFactory: _factory);
128+
129+
if (_writeConnection == null) {
130+
_writeConnection = (await _factory.openConnection(SqliteOpenOptions(
131+
primaryConnection: true,
132+
debugName: debugName != null ? '$debugName-writer' : null,
133+
mutex: mutex,
134+
readOnly: false))) as SqliteConnectionImpl;
135+
// Expose the new updates on the connection pool
136+
_writeConnection!.updates?.forEach(updatesController.add);
137+
}
138+
135139
return _runZoned(() {
136140
return _writeConnection!.writeLock(callback,
137141
lockTimeout: lockTimeout, debugContext: debugContext);
@@ -163,7 +167,7 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
163167
? null
164168
: '$debugName-${_readConnections.length + 1}';
165169
var connection = SqliteConnectionImpl(
166-
upstreamPort: _upstreamPort,
170+
upstreamPort: _writeConnection?.upstreamPort,
167171
primary: false,
168172
updates: updates,
169173
debugName: name,
@@ -181,6 +185,10 @@ class SqliteConnectionPool with SqliteQueries implements SqliteConnection {
181185
}
182186
}
183187

188+
SerializedPortClient? get upstreamPort {
189+
return _writeConnection?.upstreamPort;
190+
}
191+
184192
@override
185193
Future<void> close() async {
186194
closed = true;

lib/src/native/database/native_sqlite_connection_impl.dart

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,22 @@ import 'package:sqlite_async/src/sqlite_queries.dart';
1212
import 'package:sqlite_async/src/update_notification.dart';
1313
import 'package:sqlite_async/src/utils/shared_utils.dart';
1414

15+
import 'upstream_updates.dart';
16+
1517
typedef TxCallback<T> = Future<T> Function(CommonDatabase db);
1618

1719
/// Implements a SqliteConnection using a separate isolate for the database
1820
/// operations.
19-
class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
21+
class SqliteConnectionImpl
22+
with SqliteQueries, UpStreamTableUpdates
23+
implements SqliteConnection {
2024
/// Private to this connection
2125
final SimpleMutex _connectionMutex = SimpleMutex();
2226
final Mutex _writeMutex;
2327

2428
/// Must be a broadcast stream
2529
@override
26-
final Stream<UpdateNotification>? updates;
30+
late final Stream<UpdateNotification>? updates;
2731
final ParentPortClient _isolateClient = ParentPortClient();
2832
late final Isolate _isolate;
2933
final String? debugName;
@@ -32,13 +36,16 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
3236
SqliteConnectionImpl(
3337
{required openFactory,
3438
required Mutex mutex,
35-
required SerializedPortClient upstreamPort,
36-
this.updates,
39+
SerializedPortClient? upstreamPort,
40+
Stream<UpdateNotification>? updates,
3741
this.debugName,
3842
this.readOnly = false,
3943
bool primary = false})
4044
: _writeMutex = mutex {
41-
_open(openFactory, primary: primary, upstreamPort: upstreamPort);
45+
this.upstreamPort = upstreamPort ?? listenForEvents();
46+
// Accept an incoming stream of updates, or expose one if not given.
47+
this.updates = updates ?? updatesController.stream;
48+
_open(openFactory, primary: primary, upstreamPort: this.upstreamPort);
4249
}
4350

4451
Future<void> get ready async {
@@ -81,13 +88,14 @@ class SqliteConnectionImpl with SqliteQueries implements SqliteConnection {
8188
paused: true);
8289
_isolateClient.tieToIsolate(_isolate);
8390
_isolate.resume(_isolate.pauseCapability!);
84-
91+
isInitialized = _isolateClient.ready;
8592
await _isolateClient.ready;
8693
});
8794
}
8895

8996
@override
9097
Future<void> close() async {
98+
eventsPort?.close();
9199
await _connectionMutex.lock(() async {
92100
if (readOnly) {
93101
await _isolateClient.post(const _SqliteIsolateConnectionClose());

0 commit comments

Comments
 (0)