Skip to content

Commit fa9fb0e

Browse files
committed
Add shared sync worker
1 parent 38f9ff3 commit fa9fb0e

File tree

8 files changed

+682
-19
lines changed

8 files changed

+682
-19
lines changed

packages/powersync/lib/src/database/powersync_db_mixin.dart

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
7171
@protected
7272
Future<void> baseInit() async {
7373
statusStream = statusStreamController.stream;
74-
updates = database.updates
75-
.map((update) =>
76-
PowerSyncUpdateNotification.fromUpdateNotification(update))
77-
.where((update) => update.isNotEmpty)
78-
.cast<UpdateNotification>();
74+
updates = powerSyncUpdateNotifications(database.updates);
7975

8076
await database.initialize();
8177
await database.execute('SELECT powersync_init()');
@@ -413,3 +409,12 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
413409
return database.getAutoCommit();
414410
}
415411
}
412+
413+
Stream<UpdateNotification> powerSyncUpdateNotifications(
414+
Stream<UpdateNotification> inner) {
415+
return inner
416+
.map((update) =>
417+
PowerSyncUpdateNotification.fromUpdateNotification(update))
418+
.where((update) => update.isNotEmpty)
419+
.cast<UpdateNotification>();
420+
}

packages/powersync/lib/src/database/web/web_powersync_database.dart

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import 'package:powersync/src/streaming_sync.dart';
1515
import 'package:sqlite_async/sqlite_async.dart';
1616
import 'package:powersync/src/schema_logic.dart' as schema_logic;
1717

18+
import '../../web/sync_controller.dart';
19+
1820
/// A PowerSync managed database.
1921
///
2022
/// Web implementation for [PowerSyncDatabase]
@@ -93,7 +95,8 @@ class PowerSyncDatabaseImpl
9395
Logger? logger}) {
9496
final db = SqliteDatabase.withFactory(openFactory, maxReaders: 1);
9597
return PowerSyncDatabaseImpl.withDatabase(
96-
schema: schema, logger: logger, database: db);
98+
schema: schema, logger: logger, database: db)
99+
..openFactory = openFactory;
97100
}
98101

99102
/// Open a PowerSyncDatabase on an existing [SqliteDatabase].
@@ -119,14 +122,15 @@ class PowerSyncDatabaseImpl
119122
/// The connection is automatically re-opened if it fails for any reason.
120123
///
121124
/// Status changes are reported on [statusStream].
122-
baseConnect(
123-
{required PowerSyncBackendConnector connector,
124-
125-
/// Throttle time between CRUD operations
126-
/// Defaults to 10 milliseconds.
127-
required Duration crudThrottleTime,
128-
required Future<void> Function() reconnect,
129-
Map<String, dynamic>? params}) async {
125+
baseConnect({
126+
required PowerSyncBackendConnector connector,
127+
128+
/// Throttle time between CRUD operations
129+
/// Defaults to 10 milliseconds.
130+
required Duration crudThrottleTime,
131+
required Future<void> Function() reconnect,
132+
Map<String, dynamic>? params,
133+
}) async {
130134
await initialize();
131135

132136
// Disconnect if connected
@@ -135,9 +139,23 @@ class PowerSyncDatabaseImpl
135139

136140
await isInitialized;
137141

138-
// TODO better multitab support
139142
final storage = BucketStorage(database);
140-
final sync = StreamingSyncImplementation(
143+
StreamingSync sync;
144+
// Try using a shared worker for the synchronization implementation to avoid
145+
// duplicate work across tabs.
146+
try {
147+
sync = await SyncWorkerHandle.start(
148+
this,
149+
connector,
150+
Uri.base.resolve('/powersync_sync.worker.js'),
151+
);
152+
} catch (e) {
153+
logger.warning(
154+
'Could not use shared worker for synchronization, falling back to locks.',
155+
e,
156+
);
157+
158+
sync = StreamingSyncImplementation(
141159
adapter: storage,
142160
credentialsCallback: connector.getCredentialsCached,
143161
invalidCredentialsCallback: connector.fetchCredentials,
@@ -148,7 +166,10 @@ class PowerSyncDatabaseImpl
148166
syncParameters: params,
149167
// Only allows 1 sync implementation to run at a time per database
150168
// This should be global (across tabs) when using Navigator locks.
151-
identifier: database.openFactory.path);
169+
identifier: database.openFactory.path,
170+
);
171+
}
172+
152173
sync.statusStream.listen((event) {
153174
setStatus(event);
154175
});

packages/powersync/lib/src/streaming_sync.dart

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,16 @@ import 'sync_types.dart';
1616
/// a different value to indicate "no error".
1717
const _noError = Object();
1818

19-
class StreamingSyncImplementation {
19+
abstract interface class StreamingSync {
20+
Stream<SyncStatus> get statusStream;
21+
22+
Future<void> streamingSync();
23+
24+
/// Close any active streams.
25+
Future<void> abort();
26+
}
27+
28+
class StreamingSyncImplementation implements StreamingSync {
2029
BucketStorage adapter;
2130

2231
final Future<PowerSyncCredentials?> Function() credentialsCallback;
@@ -28,6 +37,8 @@ class StreamingSyncImplementation {
2837

2938
final StreamController<SyncStatus> _statusStreamController =
3039
StreamController<SyncStatus>.broadcast();
40+
41+
@override
3142
late final Stream<SyncStatus> statusStream;
3243

3344
late final http.Client _client;
@@ -66,7 +77,7 @@ class StreamingSyncImplementation {
6677
statusStream = _statusStreamController.stream;
6778
}
6879

69-
/// Close any active streams.
80+
@override
7081
Future<void> abort() async {
7182
// If streamingSync() hasn't been called yet, _abort will be null.
7283
var future = _abort?.abort();
@@ -95,6 +106,7 @@ class StreamingSyncImplementation {
95106
return _abort?.aborted ?? false;
96107
}
97108

109+
@override
98110
Future<void> streamingSync() async {
99111
try {
100112
_abort = AbortController();
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import 'dart:async';
2+
import 'dart:js_interop';
3+
4+
import 'package:powersync/powersync.dart';
5+
import 'package:sqlite_async/web.dart';
6+
import 'package:web/web.dart';
7+
8+
import '../database/web/web_powersync_database.dart';
9+
import '../streaming_sync.dart';
10+
import 'sync_worker_protocol.dart';
11+
12+
class SyncWorkerHandle implements StreamingSync {
13+
final PowerSyncDatabaseImpl _database;
14+
final PowerSyncBackendConnector _connector;
15+
16+
late final WorkerCommunicationChannel _channel;
17+
18+
final StreamController<SyncStatus> _status = StreamController.broadcast();
19+
20+
SyncWorkerHandle._(this._database, this._connector, MessagePort sendToWorker,
21+
SharedWorker worker) {
22+
_channel = WorkerCommunicationChannel(
23+
port: sendToWorker,
24+
errors: EventStreamProviders.errorEvent.forTarget(worker),
25+
requestHandler: (type, payload) async {
26+
switch (type) {
27+
case SyncWorkerMessageType.requestEndpoint:
28+
final endpoint = await (_database.database as WebSqliteConnection)
29+
.exposeEndpoint();
30+
31+
return (
32+
WebEndpoint(
33+
databaseName: endpoint.connectName,
34+
databasePort: endpoint.connectPort,
35+
lockName: endpoint.lockName,
36+
),
37+
[endpoint.connectPort].toJS
38+
);
39+
case SyncWorkerMessageType.uploadCrud:
40+
await _connector.uploadData(_database);
41+
return (JSObject(), null);
42+
case SyncWorkerMessageType.invalidCredentialsCallback:
43+
final credentials = await _connector.fetchCredentials();
44+
return (
45+
credentials != null
46+
? SerializedCredentials.from(credentials)
47+
: null,
48+
null
49+
);
50+
case SyncWorkerMessageType.credentialsCallback:
51+
final credentials = await _connector.getCredentialsCached();
52+
return (
53+
credentials != null
54+
? SerializedCredentials.from(credentials)
55+
: null,
56+
null
57+
);
58+
default:
59+
throw StateError('Unexpected message type $type');
60+
}
61+
},
62+
);
63+
64+
_channel.events.listen((data) {
65+
final (type, payload) = data;
66+
if (type == SyncWorkerMessageType.notifySyncStatus) {
67+
_status.add((payload as SerializedSyncStatus).asSyncStatus());
68+
}
69+
});
70+
}
71+
72+
static Future<SyncWorkerHandle> start(PowerSyncDatabaseImpl database,
73+
PowerSyncBackendConnector connector, Uri workerUri) async {
74+
final worker = SharedWorker(workerUri.toString().toJS);
75+
final handle = SyncWorkerHandle._(database, connector, worker.port, worker);
76+
77+
// Make sure that the worker is working, or throw immediately.
78+
await handle._channel.ping();
79+
80+
return handle;
81+
}
82+
83+
Future<void> close() async {
84+
await abort();
85+
await _channel.close();
86+
}
87+
88+
@override
89+
Future<void> abort() async {
90+
await _channel.abortSynchronization();
91+
}
92+
93+
@override
94+
Stream<SyncStatus> get statusStream => _status.stream;
95+
96+
@override
97+
Future<void> streamingSync() async {
98+
await _channel.startSynchronization(_database.openFactory.path);
99+
}
100+
}

0 commit comments

Comments
 (0)