Skip to content

Small streaming sync improvements #242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 0 additions & 115 deletions packages/powersync_core/lib/src/bucket_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -344,121 +344,6 @@ class BucketState {
}
}

class SyncDataBatch {
List<SyncBucketData> buckets;

SyncDataBatch(this.buckets);
}

class SyncBucketData {
final String bucket;
final List<OplogEntry> data;
final bool hasMore;
final String? after;
final String? nextAfter;

const SyncBucketData(
{required this.bucket,
required this.data,
this.hasMore = false,
this.after,
this.nextAfter});

SyncBucketData.fromJson(Map<String, dynamic> json)
: bucket = json['bucket'] as String,
hasMore = json['has_more'] as bool? ?? false,
after = json['after'] as String?,
nextAfter = json['next_after'] as String?,
data = (json['data'] as List)
.map((e) => OplogEntry.fromJson(e as Map<String, dynamic>))
.toList();

Map<String, dynamic> toJson() {
return {
'bucket': bucket,
'has_more': hasMore,
'after': after,
'next_after': nextAfter,
'data': data
};
}
}

class OplogEntry {
final String opId;

final OpType? op;

/// rowType + rowId uniquely identifies an entry in the local database.
final String? rowType;
final String? rowId;

/// Together with rowType and rowId, this uniquely identifies a source entry
/// per bucket in the oplog. There may be multiple source entries for a single
/// "rowType + rowId" combination.
final String? subkey;

final String? data;
final int checksum;

const OplogEntry(
{required this.opId,
required this.op,
this.subkey,
this.rowType,
this.rowId,
this.data,
required this.checksum});

OplogEntry.fromJson(Map<String, dynamic> json)
: opId = json['op_id'] as String,
op = OpType.fromJson(json['op'] as String),
rowType = json['object_type'] as String?,
rowId = json['object_id'] as String?,
checksum = json['checksum'] as int,
data = switch (json['data']) {
String data => data,
var other => jsonEncode(other),
},
subkey = switch (json['subkey']) {
String subkey => subkey,
_ => null,
};

Map<String, dynamic>? get parsedData {
return switch (data) {
final data? => jsonDecode(data) as Map<String, dynamic>,
null => null,
};
}

/// Key to uniquely represent a source entry in a bucket.
/// This is used to supersede old entries.
/// Relevant for put and remove ops.
String get key {
return "$rowType/$rowId/$subkey";
}

Map<String, dynamic> toJson() {
return {
'op_id': opId,
'op': op?.toJson(),
'object_type': rowType,
'object_id': rowId,
'checksum': checksum,
'subkey': subkey,
'data': data
};
}
}

class SqliteOp {
String sql;
List<dynamic> args;

SqliteOp(this.sql, this.args);
}

class SyncLocalDatabaseResult {
final bool ready;
final bool checkpointValid;
Expand Down
7 changes: 0 additions & 7 deletions packages/powersync_core/lib/src/stream_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,6 @@ Stream<Object?> ndjson(ByteStream input) {
return jsonInput;
}

/// Given a raw ByteStream, parse each line as JSON.
Stream<String> newlines(ByteStream input) {
final textInput = input.transform(convert.utf8.decoder);
final lineInput = textInput.transform(const convert.LineSplitter());
return lineInput;
}

void pauseAll(List<StreamSubscription<void>> subscriptions) {
for (var sub in subscriptions) {
sub.pause();
Expand Down
199 changes: 101 additions & 98 deletions packages/powersync_core/lib/src/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class StreamingSyncImplementation implements StreamingSync {

late final http.Client _client;

final StreamController<String?> _localPingController =
final StreamController<Null> _localPingController =
StreamController.broadcast();

final Duration retryDelay;
Expand Down Expand Up @@ -340,96 +340,19 @@ class StreamingSyncImplementation implements StreamingSync {
}

_updateStatus(connected: true, connecting: false);
if (line is Checkpoint) {
targetCheckpoint = line;
final Set<String> bucketsToDelete = {...bucketSet};
final Set<String> newBuckets = {};
for (final checksum in line.checksums) {
newBuckets.add(checksum.bucket);
bucketsToDelete.remove(checksum.bucket);
}
bucketSet = newBuckets;
await adapter.removeBuckets([...bucketsToDelete]);
_updateStatus(downloading: true);
} else if (line is StreamingSyncCheckpointComplete) {
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
if (!result.checkpointValid) {
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
// await new Promise((resolve) => setTimeout(resolve, 50));
return false;
} else if (!result.ready) {
// Checksums valid, but need more data for a consistent checkpoint.
// Continue waiting.
} else {
appliedCheckpoint = targetCheckpoint;

_updateStatus(
downloading: false,
downloadError: _noError,
lastSyncedAt: DateTime.now());
}

validatedCheckpoint = targetCheckpoint;
} else if (line is StreamingSyncCheckpointDiff) {
// TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint
if (targetCheckpoint == null) {
throw PowerSyncProtocolException(
'Checkpoint diff without previous checkpoint');
}
_updateStatus(downloading: true);
final diff = line;
final Map<String, BucketChecksum> newBuckets = {};
for (var checksum in targetCheckpoint.checksums) {
newBuckets[checksum.bucket] = checksum;
}
for (var checksum in diff.updatedBuckets) {
newBuckets[checksum.bucket] = checksum;
}
for (var bucket in diff.removedBuckets) {
newBuckets.remove(bucket);
}

final newCheckpoint = Checkpoint(
lastOpId: diff.lastOpId,
checksums: [...newBuckets.values],
writeCheckpoint: diff.writeCheckpoint);
targetCheckpoint = newCheckpoint;

bucketSet = Set.from(newBuckets.keys);
await adapter.removeBuckets(diff.removedBuckets);
adapter.setTargetCheckpoint(targetCheckpoint);
} else if (line is SyncBucketData) {
_updateStatus(downloading: true);
await adapter.saveSyncData(SyncDataBatch([line]));
} else if (line is StreamingSyncKeepalive) {
if (line.tokenExpiresIn == 0) {
// Token expired already - stop the connection immediately
invalidCredentialsCallback?.call().ignore();
break;
} else if (line.tokenExpiresIn <= 30) {
// Token expires soon - refresh it in the background
if (credentialsInvalidation == null &&
invalidCredentialsCallback != null) {
credentialsInvalidation = invalidCredentialsCallback!().then((_) {
// Token has been refreshed - we should restart the connection.
haveInvalidated = true;
// trigger next loop iteration ASAP, don't wait for another
// message from the server.
_localPingController.add(null);
}, onError: (_) {
// Token refresh failed - retry on next keepalive.
credentialsInvalidation = null;
});
switch (line) {
case Checkpoint():
targetCheckpoint = line;
final Set<String> bucketsToDelete = {...bucketSet};
final Set<String> newBuckets = {};
for (final checksum in line.checksums) {
newBuckets.add(checksum.bucket);
bucketsToDelete.remove(checksum.bucket);
}
}
} else {
if (targetCheckpoint == appliedCheckpoint) {
_updateStatus(
downloading: false,
downloadError: _noError,
lastSyncedAt: DateTime.now());
} else if (validatedCheckpoint == targetCheckpoint) {
bucketSet = newBuckets;
await adapter.removeBuckets([...bucketsToDelete]);
_updateStatus(downloading: true);
case StreamingSyncCheckpointComplete():
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
if (!result.checkpointValid) {
// This means checksums failed. Start again with a new checkpoint.
Expand All @@ -447,7 +370,88 @@ class StreamingSyncImplementation implements StreamingSync {
downloadError: _noError,
lastSyncedAt: DateTime.now());
}
}

validatedCheckpoint = targetCheckpoint;
case StreamingSyncCheckpointDiff():
// TODO: It may be faster to just keep track of the diff, instead of
// the entire checkpoint
if (targetCheckpoint == null) {
throw PowerSyncProtocolException(
'Checkpoint diff without previous checkpoint');
}
_updateStatus(downloading: true);
final diff = line;
final Map<String, BucketChecksum> newBuckets = {};
for (var checksum in targetCheckpoint.checksums) {
newBuckets[checksum.bucket] = checksum;
}
for (var checksum in diff.updatedBuckets) {
newBuckets[checksum.bucket] = checksum;
}
for (var bucket in diff.removedBuckets) {
newBuckets.remove(bucket);
}

final newCheckpoint = Checkpoint(
lastOpId: diff.lastOpId,
checksums: [...newBuckets.values],
writeCheckpoint: diff.writeCheckpoint);
targetCheckpoint = newCheckpoint;

bucketSet = Set.from(newBuckets.keys);
await adapter.removeBuckets(diff.removedBuckets);
adapter.setTargetCheckpoint(targetCheckpoint);
case SyncDataBatch():
_updateStatus(downloading: true);
await adapter.saveSyncData(line);
case StreamingSyncKeepalive(:final tokenExpiresIn):
if (tokenExpiresIn == 0) {
// Token expired already - stop the connection immediately
invalidCredentialsCallback?.call().ignore();
break;
} else if (tokenExpiresIn <= 30) {
// Token expires soon - refresh it in the background
if (credentialsInvalidation == null &&
invalidCredentialsCallback != null) {
credentialsInvalidation = invalidCredentialsCallback!().then((_) {
// Token has been refreshed - we should restart the connection.
haveInvalidated = true;
// trigger next loop iteration ASAP, don't wait for another
// message from the server.
_localPingController.add(null);
}, onError: (_) {
// Token refresh failed - retry on next keepalive.
credentialsInvalidation = null;
});
}
}
case UnknownSyncLine(:final rawData):
isolateLogger.fine('Unknown sync line: $rawData');
case null: // Local ping
if (targetCheckpoint == appliedCheckpoint) {
_updateStatus(
downloading: false,
downloadError: _noError,
lastSyncedAt: DateTime.now());
} else if (validatedCheckpoint == targetCheckpoint) {
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
if (!result.checkpointValid) {
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
// await new Promise((resolve) => setTimeout(resolve, 50));
return false;
} else if (!result.ready) {
// Checksums valid, but need more data for a consistent checkpoint.
// Continue waiting.
} else {
appliedCheckpoint = targetCheckpoint;

_updateStatus(
downloading: false,
downloadError: _noError,
lastSyncedAt: DateTime.now());
}
}
}

if (haveInvalidated) {
Expand All @@ -458,7 +462,8 @@ class StreamingSyncImplementation implements StreamingSync {
return true;
}

Stream<Object?> streamingSyncRequest(StreamingSyncRequest data) async* {
Stream<StreamingSyncLine> streamingSyncRequest(
StreamingSyncRequest data) async* {
final credentials = await credentialsCallback();
if (credentials == null) {
throw CredentialsException('Not logged in');
Expand Down Expand Up @@ -494,12 +499,10 @@ class StreamingSyncImplementation implements StreamingSync {
}

// Note: The response stream is automatically closed when this loop errors
await for (var line in ndjson(res.stream)) {
if (aborted) {
break;
}
yield parseStreamingSyncLine(line as Map<String, dynamic>);
}
yield* ndjson(res.stream)
.cast<Map<String, dynamic>>()
.transform(StreamingSyncLine.reader)
.takeWhile((_) => !aborted);
}

/// Delays the standard `retryDelay` Duration, but exits early if
Expand Down
Loading