Skip to content

Commit 940880a

Browse files
committed
Add tests to sqlite3_web
1 parent 15820b8 commit 940880a

File tree

7 files changed

+169
-224
lines changed

7 files changed

+169
-224
lines changed

sqlite3_web/lib/src/client.dart

Lines changed: 41 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ import 'protocol.dart';
1414
import 'shared.dart';
1515
import 'worker.dart';
1616

17+
final class _CommitOrRollbackStream {
18+
StreamSubscription<Notification>? workerSubscription;
19+
final StreamController<void> controller = StreamController.broadcast();
20+
}
21+
1722
final class RemoteDatabase implements Database {
1823
final WorkerConnection connection;
1924
final int databaseId;
@@ -22,10 +27,9 @@ final class RemoteDatabase implements Database {
2227

2328
StreamSubscription<Notification>? _updateNotificationSubscription;
2429
final StreamController<SqliteUpdate> _updates = StreamController.broadcast();
25-
StreamSubscription<Notification>? _rollbackNotificationSubscription;
26-
final StreamController<void> _rollbacks = StreamController.broadcast();
27-
StreamSubscription<Notification>? _commitNotificationSubscription;
28-
final StreamController<void> _commits = StreamController.broadcast();
30+
31+
final _CommitOrRollbackStream _commits = _CommitOrRollbackStream();
32+
final _CommitOrRollbackStream _rollbacks = _CommitOrRollbackStream();
2933

3034
RemoteDatabase({required this.connection, required this.databaseId}) {
3135
_updates
@@ -38,76 +42,54 @@ final class RemoteDatabase implements Database {
3842
}
3943
}
4044
});
41-
_requestUpdates(true);
45+
_requestStreamUpdates(MessageType.updateRequest, true);
4246
})
4347
..onCancel = (() {
4448
_updateNotificationSubscription?.cancel();
4549
_updateNotificationSubscription = null;
46-
_requestUpdates(false);
50+
_requestStreamUpdates(MessageType.updateRequest, false);
4751
});
4852

49-
_rollbacks
50-
..onListen = (() {
51-
_rollbackNotificationSubscription ??=
52-
connection.notifications.stream.listen((notification) {
53-
if (notification case RollbackNotification()) {
54-
if (notification.databaseId == databaseId) {
55-
_rollbacks.add(null);
56-
}
57-
}
58-
});
59-
_requestRollbacks(true);
60-
})
61-
..onCancel = (() {
62-
_rollbackNotificationSubscription?.cancel();
63-
_rollbackNotificationSubscription = null;
64-
_requestRollbacks(false);
65-
});
53+
_setupCommitOrRollbackStream(
54+
_commits, MessageType.commitRequest, MessageType.notifyCommit);
55+
_setupCommitOrRollbackStream(
56+
_rollbacks, MessageType.rollbackRequest, MessageType.notifyRollback);
57+
}
6658

67-
_commits
59+
void _setupCommitOrRollbackStream(
60+
_CommitOrRollbackStream stream,
61+
MessageType requestSubscription,
62+
MessageType notificationType,
63+
) {
64+
stream.controller
6865
..onListen = (() {
69-
_commitNotificationSubscription ??=
66+
stream.workerSubscription ??=
7067
connection.notifications.stream.listen((notification) {
71-
if (notification case CommitNotification()) {
72-
if (notification.databaseId == databaseId) {
73-
_commits.add(null);
68+
if (notification case EmptyNotification(type: final type)) {
69+
if (notification.databaseId == databaseId &&
70+
type == notificationType) {
71+
stream.controller.add(null);
7472
}
7573
}
7674
});
77-
_requestCommits(true);
75+
_requestStreamUpdates(requestSubscription, true);
7876
})
7977
..onCancel = (() {
80-
_commitNotificationSubscription?.cancel();
81-
_commitNotificationSubscription = null;
82-
_requestCommits(false);
78+
stream.workerSubscription?.cancel();
79+
stream.workerSubscription = null;
80+
_requestStreamUpdates(requestSubscription, false);
8381
});
8482
}
8583

86-
void _requestUpdates(bool sendUpdates) {
84+
void _requestStreamUpdates(MessageType streamType, bool subscribe) {
8785
if (!_isClosed) {
8886
connection.sendRequest(
89-
UpdateStreamRequest(
90-
action: sendUpdates, requestId: 0, databaseId: databaseId),
91-
MessageType.simpleSuccessResponse,
92-
);
93-
}
94-
}
95-
96-
void _requestRollbacks(bool sendRollbacks) {
97-
if (!_isClosed) {
98-
connection.sendRequest(
99-
RollbackStreamRequest(
100-
action: sendRollbacks, requestId: 1, databaseId: databaseId),
101-
MessageType.simpleSuccessResponse,
102-
);
103-
}
104-
}
105-
106-
void _requestCommits(bool sendCommits) {
107-
if (!_isClosed) {
108-
connection.sendRequest(
109-
CommitStreamRequest(
110-
action: sendCommits, requestId: 2, databaseId: databaseId),
87+
StreamRequest(
88+
type: streamType,
89+
action: subscribe,
90+
requestId: 0, // filled out in sendRequest
91+
databaseId: databaseId,
92+
),
11193
MessageType.simpleSuccessResponse,
11294
);
11395
}
@@ -123,8 +105,8 @@ final class RemoteDatabase implements Database {
123105
_isClosed = true;
124106
await (
125107
_updates.close(),
126-
_rollbacks.close(),
127-
_commits.close(),
108+
_rollbacks.controller.close(),
109+
_commits.controller.close(),
128110
connection.sendRequest(
129111
CloseDatabase(requestId: 0, databaseId: databaseId),
130112
MessageType.simpleSuccessResponse)
@@ -190,10 +172,10 @@ final class RemoteDatabase implements Database {
190172
Stream<SqliteUpdate> get updates => _updates.stream;
191173

192174
@override
193-
Stream<void> get rollbacks => _rollbacks.stream;
175+
Stream<void> get rollbacks => _rollbacks.controller.stream;
194176

195177
@override
196-
Stream<void> get commits => _commits.stream;
178+
Stream<void> get commits => _commits.controller.stream;
197179

198180
@override
199181
Future<int> get userVersion async {

sqlite3_web/lib/src/protocol.dart

Lines changed: 33 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,18 @@ enum MessageType<T extends Message> {
2525
fileSystemFlush<FileSystemFlushRequest>(),
2626
connect<ConnectRequest>(),
2727
startFileSystemServer<StartFileSystemServer>(),
28-
updateRequest<UpdateStreamRequest>(),
29-
rollbackRequest<RollbackStreamRequest>(),
30-
commitRequest<CommitStreamRequest>(),
28+
updateRequest<StreamRequest>(),
29+
rollbackRequest<StreamRequest>(),
30+
commitRequest<StreamRequest>(),
3131
simpleSuccessResponse<SimpleSuccessResponse>(),
3232
rowsResponse<RowsResponse>(),
3333
errorResponse<ErrorResponse>(),
3434
endpointResponse<EndpointResponse>(),
3535
closeDatabase<CloseDatabase>(),
3636
openAdditionalConnection<OpenAdditonalConnection>(),
3737
notifyUpdate<UpdateNotification>(),
38-
notifyRollback<RollbackNotification>(),
39-
notifyCommit<CommitNotification>(),
38+
notifyRollback<EmptyNotification>(),
39+
notifyCommit<EmptyNotification>(),
4040
;
4141

4242
static final Map<String, MessageType> byName = values.asNameMap();
@@ -97,17 +97,19 @@ sealed class Message {
9797
MessageType.closeDatabase => CloseDatabase.deserialize(object),
9898
MessageType.openAdditionalConnection =>
9999
OpenAdditonalConnection.deserialize(object),
100-
MessageType.updateRequest => UpdateStreamRequest.deserialize(object),
101-
MessageType.rollbackRequest => RollbackStreamRequest.deserialize(object),
102-
MessageType.commitRequest => CommitStreamRequest.deserialize(object),
100+
MessageType.updateRequest ||
101+
MessageType.rollbackRequest ||
102+
MessageType.commitRequest =>
103+
StreamRequest.deserialize(type, object),
103104
MessageType.simpleSuccessResponse =>
104105
SimpleSuccessResponse.deserialize(object),
105106
MessageType.endpointResponse => EndpointResponse.deserialize(object),
106107
MessageType.rowsResponse => RowsResponse.deserialize(object),
107108
MessageType.errorResponse => ErrorResponse.deserialize(object),
108109
MessageType.notifyUpdate => UpdateNotification.deserialize(object),
109-
MessageType.notifyRollback => RollbackNotification.deserialize(object),
110-
MessageType.notifyCommit => CommitNotification.deserialize(object),
110+
MessageType.notifyRollback ||
111+
MessageType.notifyCommit =>
112+
EmptyNotification.deserialize(type, object),
111113
};
112114
}
113115

@@ -637,92 +639,33 @@ final class ErrorResponse extends Response {
637639
}
638640
}
639641

640-
final class UpdateStreamRequest extends Request {
642+
final class StreamRequest extends Request {
641643
/// When true, the client is requesting to be informed about updates happening
642644
/// on the database identified by this request.
643645
///
644646
/// When false, the client is requesting to no longer be informed about these
645647
/// updates.
646648
final bool action;
647649

648-
UpdateStreamRequest(
649-
{required this.action,
650-
required super.requestId,
651-
required super.databaseId});
650+
final MessageType<Message> type;
652651

653-
factory UpdateStreamRequest.deserialize(JSObject object) {
654-
return UpdateStreamRequest(
655-
action: (object[_UniqueFieldNames.action] as JSBoolean).toDart,
656-
requestId: object.requestId,
657-
databaseId: object.databaseId,
658-
);
659-
}
660-
661-
@override
662-
MessageType<Message> get type => MessageType.updateRequest;
663-
664-
@override
665-
void serialize(JSObject object, List<JSObject> transferred) {
666-
super.serialize(object, transferred);
667-
object[_UniqueFieldNames.action] = action.toJS;
668-
}
669-
}
670-
671-
final class RollbackStreamRequest extends Request {
672-
/// When true, the client is requesting to be informed about rollbacks
673-
/// happening on the database identified by this request.
674-
///
675-
/// When false, the client is requesting to no longer be informed about these
676-
/// updates.
677-
final bool action;
678-
679-
RollbackStreamRequest(
680-
{required this.action,
681-
required super.requestId,
682-
required super.databaseId});
683-
684-
factory RollbackStreamRequest.deserialize(JSObject object) {
685-
return RollbackStreamRequest(
686-
action: (object[_UniqueFieldNames.action] as JSBoolean).toDart,
687-
requestId: object.requestId,
688-
databaseId: object.databaseId,
689-
);
690-
}
691-
692-
@override
693-
MessageType<Message> get type => MessageType.rollbackRequest;
694-
695-
@override
696-
void serialize(JSObject object, List<JSObject> transferred) {
697-
super.serialize(object, transferred);
698-
object[_UniqueFieldNames.action] = action.toJS;
699-
}
700-
}
701-
702-
final class CommitStreamRequest extends Request {
703-
/// When true, the client is requesting to be informed about rollbacks
704-
/// happening on the database identified by this request.
705-
///
706-
/// When false, the client is requesting to no longer be informed about these
707-
/// updates.
708-
final bool action;
709-
710-
CommitStreamRequest(
711-
{required this.action,
712-
required super.requestId,
713-
required super.databaseId});
652+
StreamRequest({
653+
required this.type,
654+
required this.action,
655+
required super.requestId,
656+
required super.databaseId,
657+
});
714658

715-
factory CommitStreamRequest.deserialize(JSObject object) {
716-
return CommitStreamRequest(
659+
factory StreamRequest.deserialize(
660+
MessageType<Message> type, JSObject object) {
661+
return StreamRequest(
662+
type: type,
717663
action: (object[_UniqueFieldNames.action] as JSBoolean).toDart,
718664
requestId: object.requestId,
719665
databaseId: object.databaseId,
720666
);
721667
}
722668

723-
@override
724-
MessageType<Message> get type => MessageType.rollbackRequest;
725-
726669
@override
727670
void serialize(JSObject object, List<JSObject> transferred) {
728671
super.serialize(object, transferred);
@@ -886,41 +829,23 @@ final class UpdateNotification extends Notification {
886829
}
887830
}
888831

889-
final class RollbackNotification extends Notification {
832+
/// Used as a notification without a payload, e.g. for commit or rollback
833+
/// events.
834+
final class EmptyNotification extends Notification {
890835
final int databaseId;
891-
892-
RollbackNotification({required this.databaseId});
893-
894-
factory RollbackNotification.deserialize(JSObject object) {
895-
return RollbackNotification(
896-
databaseId: object.databaseId,
897-
);
898-
}
899-
900-
@override
901-
MessageType<Message> get type => MessageType.notifyUpdate;
902-
903836
@override
904-
void serialize(JSObject object, List<JSObject> transferred) {
905-
super.serialize(object, transferred);
906-
object[_UniqueFieldNames.databaseId] = databaseId.toJS;
907-
}
908-
}
909-
910-
final class CommitNotification extends Notification {
911-
final int databaseId;
837+
final MessageType<Message> type;
912838

913-
CommitNotification({required this.databaseId});
839+
EmptyNotification({required this.type, required this.databaseId});
914840

915-
factory CommitNotification.deserialize(JSObject object) {
916-
return CommitNotification(
841+
factory EmptyNotification.deserialize(
842+
MessageType<Message> type, JSObject object) {
843+
return EmptyNotification(
844+
type: type,
917845
databaseId: object.databaseId,
918846
);
919847
}
920848

921-
@override
922-
MessageType<Message> get type => MessageType.notifyCommit;
923-
924849
@override
925850
void serialize(JSObject object, List<JSObject> transferred) {
926851
super.serialize(object, transferred);

0 commit comments

Comments
 (0)