Skip to content

Fix multi-tab sync issues #206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ class PowerSyncDatabaseImpl
// duplicating work across tabs.
try {
sync = await SyncWorkerHandle.start(
this,
connector,
crudThrottleTime.inMilliseconds,
Uri.base.resolve('/powersync_sync.worker.js'),
);
database: this,
connector: connector,
crudThrottleTimeMs: crudThrottleTime.inMilliseconds,
workerUri: Uri.base.resolve('/powersync_sync.worker.js'),
syncParams: params);
} catch (e) {
logger.warning(
'Could not use shared worker for synchronization, falling back to locks.',
Expand Down
42 changes: 27 additions & 15 deletions packages/powersync/lib/src/web/sync_controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,29 @@ import '../streaming_sync.dart';
import 'sync_worker_protocol.dart';

class SyncWorkerHandle implements StreamingSync {
final PowerSyncDatabaseImpl _database;
final PowerSyncBackendConnector _connector;
final int _crudThrottleTimeMs;
final PowerSyncDatabaseImpl database;
final PowerSyncBackendConnector connector;
final int crudThrottleTimeMs;
final Map<String, dynamic>? syncParams;

late final WorkerCommunicationChannel _channel;

final StreamController<SyncStatus> _status = StreamController.broadcast();

SyncWorkerHandle._(this._database, this._connector, this._crudThrottleTimeMs,
MessagePort sendToWorker, SharedWorker worker) {
SyncWorkerHandle._(
{required this.database,
required this.connector,
required this.crudThrottleTimeMs,
required MessagePort sendToWorker,
required SharedWorker worker,
this.syncParams}) {
_channel = WorkerCommunicationChannel(
port: sendToWorker,
errors: EventStreamProviders.errorEvent.forTarget(worker),
requestHandler: (type, payload) async {
switch (type) {
case SyncWorkerMessageType.requestEndpoint:
final endpoint = await (_database.database as WebSqliteConnection)
final endpoint = await (database.database as WebSqliteConnection)
.exposeEndpoint();

return (
Expand All @@ -38,18 +44,18 @@ class SyncWorkerHandle implements StreamingSync {
[endpoint.connectPort].toJS
);
case SyncWorkerMessageType.uploadCrud:
await _connector.uploadData(_database);
await connector.uploadData(database);
return (JSObject(), null);
case SyncWorkerMessageType.invalidCredentialsCallback:
final credentials = await _connector.fetchCredentials();
final credentials = await connector.fetchCredentials();
return (
credentials != null
? SerializedCredentials.from(credentials)
: null,
null
);
case SyncWorkerMessageType.credentialsCallback:
final credentials = await _connector.getCredentialsCached();
final credentials = await connector.getCredentialsCached();
return (
credentials != null
? SerializedCredentials.from(credentials)
Expand All @@ -71,13 +77,19 @@ class SyncWorkerHandle implements StreamingSync {
}

static Future<SyncWorkerHandle> start(
PowerSyncDatabaseImpl database,
PowerSyncBackendConnector connector,
int crudThrottleTimeMs,
Uri workerUri) async {
{required PowerSyncDatabaseImpl database,
required PowerSyncBackendConnector connector,
required int crudThrottleTimeMs,
required Uri workerUri,
Map<String, dynamic>? syncParams}) async {
final worker = SharedWorker(workerUri.toString().toJS);
final handle = SyncWorkerHandle._(
database, connector, crudThrottleTimeMs, worker.port, worker);
database: database,
connector: connector,
crudThrottleTimeMs: crudThrottleTimeMs,
sendToWorker: worker.port,
worker: worker,
syncParams: syncParams);

// Make sure that the worker is working, or throw immediately.
await handle._channel.ping();
Expand All @@ -101,6 +113,6 @@ class SyncWorkerHandle implements StreamingSync {
@override
Future<void> streamingSync() async {
await _channel.startSynchronization(
_database.openFactory.path, _crudThrottleTimeMs);
database.openFactory.path, crudThrottleTimeMs, syncParams);
}
}
91 changes: 69 additions & 22 deletions packages/powersync/lib/src/web/sync_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
library;

import 'dart:async';
import 'dart:convert';
import 'dart:js_interop';

import 'package:async/async.dart';
Expand Down Expand Up @@ -41,12 +42,15 @@ class _SyncWorker {
});
}

_SyncRunner referenceSyncTask(String databaseIdentifier,
int crudThrottleTimeMs, _ConnectedClient client) {
_SyncRunner referenceSyncTask(
String databaseIdentifier,
int crudThrottleTimeMs,
String? syncParamsEncoded,
_ConnectedClient client) {
return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () {
return _SyncRunner(databaseIdentifier, crudThrottleTimeMs);
return _SyncRunner(databaseIdentifier);
})
..registerClient(client);
..registerClient(client, crudThrottleTimeMs, syncParamsEncoded);
}
}

Expand All @@ -64,11 +68,11 @@ class _ConnectedClient {
switch (type) {
case SyncWorkerMessageType.startSynchronization:
final request = payload as StartSynchronization;
_runner = _worker.referenceSyncTask(
request.databaseName, request.crudThrottleTimeMs, this);
_runner = _worker.referenceSyncTask(request.databaseName,
request.crudThrottleTimeMs, request.syncParamsEncoded, this);
return (JSObject(), null);
case SyncWorkerMessageType.abortSynchronization:
_runner?.unregisterClient(this);
_runner?.disconnectClient(this);
_runner = null;
return (JSObject(), null);
default:
Expand Down Expand Up @@ -105,7 +109,8 @@ class _ConnectedClient {

class _SyncRunner {
final String identifier;
final int crudThrottleTimeMs;
int crudThrottleTimeMs = 1;
String? syncParamsEncoded;

final StreamGroup<_RunnerEvent> _group = StreamGroup();
final StreamController<_RunnerEvent> _mainEvents = StreamController();
Expand All @@ -114,24 +119,46 @@ class _SyncRunner {
_ConnectedClient? databaseHost;
final connections = <_ConnectedClient>[];

_SyncRunner(this.identifier, this.crudThrottleTimeMs) {
_SyncRunner(this.identifier) {
_group.add(_mainEvents.stream);

Future(() async {
await for (final event in _group.stream) {
try {
switch (event) {
case _AddConnection(:final client):
case _AddConnection(
:final client,
:final crudThrottleTimeMs,
:final syncParamsEncoded
):
connections.add(client);
var reconnect = false;
if (this.crudThrottleTimeMs != crudThrottleTimeMs) {
this.crudThrottleTimeMs = crudThrottleTimeMs;
reconnect = true;
}
if (this.syncParamsEncoded != syncParamsEncoded) {
this.syncParamsEncoded = syncParamsEncoded;
reconnect = true;
}
if (sync == null) {
await _requestDatabase(client);
} else if (reconnect) {
// Parameters changed - reconnect.
sync?.abort();
sync = null;
await _requestDatabase(client);
}
case _RemoveConnection(:final client):
connections.remove(client);
if (connections.isEmpty) {
await sync?.abort();
sync = null;
}
case _DisconnectClient(:final client):
connections.remove(client);
await sync?.abort();
sync = null;
case _ActiveDatabaseClosed():
_logger.info('Remote database closed, finding a new client');
sync?.abort();
Expand Down Expand Up @@ -226,16 +253,20 @@ class _SyncRunner {
);
}

final syncParams = syncParamsEncoded == null
? null
: jsonDecode(syncParamsEncoded!) as Map<String, dynamic>;

sync = StreamingSyncImplementation(
adapter: BucketStorage(database),
credentialsCallback: client.channel.credentialsCallback,
invalidCredentialsCallback: client.channel.invalidCredentialsCallback,
uploadCrud: client.channel.uploadCrud,
crudUpdateTriggerStream: crudStream,
retryDelay: Duration(seconds: 3),
client: FetchClient(mode: RequestMode.cors),
identifier: identifier,
);
adapter: BucketStorage(database),
credentialsCallback: client.channel.credentialsCallback,
invalidCredentialsCallback: client.channel.invalidCredentialsCallback,
uploadCrud: client.channel.uploadCrud,
crudUpdateTriggerStream: crudStream,
retryDelay: Duration(seconds: 3),
client: FetchClient(mode: RequestMode.cors),
identifier: identifier,
syncParameters: syncParams);
sync!.statusStream.listen((event) {
_logger.fine('Broadcasting sync event: $event');
for (final client in connections) {
Expand All @@ -246,21 +277,31 @@ class _SyncRunner {
sync!.streamingSync();
}

void registerClient(_ConnectedClient client) {
_mainEvents.add(_AddConnection(client));
void registerClient(_ConnectedClient client, int currentCrudThrottleTimeMs,
String? currentSyncParamsEncoded) {
_mainEvents.add(_AddConnection(
client, currentCrudThrottleTimeMs, currentSyncParamsEncoded));
}

/// Remove a client, disconnecting if no clients remain..
void unregisterClient(_ConnectedClient client) {
_mainEvents.add(_RemoveConnection(client));
}

/// Remove a client, and immediately disconnect.
void disconnectClient(_ConnectedClient client) {
_mainEvents.add(_DisconnectClient(client));
}
}

sealed class _RunnerEvent {}

final class _AddConnection implements _RunnerEvent {
final _ConnectedClient client;
final int crudThrottleTimeMs;
final String? syncParamsEncoded;

_AddConnection(this.client);
_AddConnection(this.client, this.crudThrottleTimeMs, this.syncParamsEncoded);
}

final class _RemoveConnection implements _RunnerEvent {
Expand All @@ -269,6 +310,12 @@ final class _RemoveConnection implements _RunnerEvent {
_RemoveConnection(this.client);
}

final class _DisconnectClient implements _RunnerEvent {
final _ConnectedClient client;

_DisconnectClient(this.client);
}

final class _ActiveDatabaseClosed implements _RunnerEvent {
const _ActiveDatabaseClosed();
}
24 changes: 15 additions & 9 deletions packages/powersync/lib/src/web/sync_worker_protocol.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:convert';
import 'dart:js_interop';

import 'package:web/web.dart';
Expand All @@ -13,10 +14,12 @@ enum SyncWorkerMessageType {

/// Sent from client to the sync worker to request the synchronization
/// starting.
/// If parameters change, the sync worker reconnects.
startSynchronization,

/// Te [SyncWorkerMessage.payload] for the request is a numeric id, the
/// The [SyncWorkerMessage.payload] for the request is a numeric id, the
/// response can be anything (void).
/// This disconnects immediately, even if other clients are still open.
abortSynchronization,

/// Sent from the sync worker to the client when it needs an endpoint to
Expand Down Expand Up @@ -60,15 +63,16 @@ extension type SyncWorkerMessage._(JSObject _) implements JSObject {

@anonymous
extension type StartSynchronization._(JSObject _) implements JSObject {
external factory StartSynchronization({
required String databaseName,
required int crudThrottleTimeMs,
required int requestId,
});
external factory StartSynchronization(
{required String databaseName,
required int crudThrottleTimeMs,
required int requestId,
String? syncParamsEncoded});

external String get databaseName;
external int get requestId;
external int get crudThrottleTimeMs;
external String? get syncParamsEncoded;
}

@anonymous
Expand Down Expand Up @@ -315,15 +319,17 @@ final class WorkerCommunicationChannel {
await _numericRequest(SyncWorkerMessageType.ping);
}

Future<void> startSynchronization(
String databaseName, int crudThrottleTimeMs) async {
Future<void> startSynchronization(String databaseName, int crudThrottleTimeMs,
Map<String, dynamic>? syncParams) async {
final (id, completion) = _newRequest();
port.postMessage(SyncWorkerMessage(
type: SyncWorkerMessageType.startSynchronization.name,
payload: StartSynchronization(
databaseName: databaseName,
crudThrottleTimeMs: crudThrottleTimeMs,
requestId: id),
requestId: id,
syncParamsEncoded:
syncParams == null ? null : jsonEncode(syncParams)),
));
await completion;
}
Expand Down
Loading