Skip to content

Commit bac8e28

Browse files
committed
Use in-memory sync tests
1 parent ee1848f commit bac8e28

File tree

4 files changed

+104
-68
lines changed

4 files changed

+104
-68
lines changed

packages/powersync_core/lib/src/sync/instruction.dart

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,19 @@ final class CoreSyncStatus {
9999
}
100100

101101
final class DownloadProgress {
102-
final Map<String, BucketProgress> progress;
102+
final Map<String, BucketProgress> buckets;
103103

104-
DownloadProgress(this.progress);
104+
DownloadProgress(this.buckets);
105105

106106
factory DownloadProgress.fromJson(Map<String, Object?> line) {
107-
return DownloadProgress(line.map((k, v) =>
108-
MapEntry(k, _bucketProgressFromJson(v as Map<String, Object?>))));
107+
final rawBuckets = line['buckets'] as Map<String, Object?>;
108+
109+
return DownloadProgress(rawBuckets.map((k, v) {
110+
return MapEntry(
111+
k,
112+
_bucketProgressFromJson(v as Map<String, Object?>),
113+
);
114+
}));
109115
}
110116

111117
static BucketProgress _bucketProgressFromJson(Map<String, Object?> json) {

packages/powersync_core/lib/src/sync/mutable_sync_status.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ final class MutableSyncStatus {
8787
priorityStatusEntries = status.priorityStatus;
8888
downloadProgress = switch (status.downloading) {
8989
null => null,
90-
final downloading => InternalSyncDownloadProgress(downloading.progress),
90+
final downloading => InternalSyncDownloadProgress(downloading.buckets),
9191
};
9292
lastSyncedAt = status.priorityStatus
9393
.firstWhereOrNull((s) => s.priority == BucketPriority.fullSyncPriority)

packages/powersync_core/lib/src/sync/streaming_sync.dart

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ class StreamingSyncImplementation implements StreamingSync {
142142
// Protect sync iterations with exclusivity (if a valid Mutex is provided)
143143
await syncMutex.lock(() {
144144
switch (options.source.syncImplementation) {
145+
// ignore: deprecated_member_use_from_same_package
145146
case SyncClientImplementation.dart:
146147
return _dartStreamingSyncIteration();
147148
case SyncClientImplementation.rust:
@@ -574,7 +575,7 @@ String _syncErrorMessage(Object? error) {
574575
} else if (error is PowerSyncProtocolException) {
575576
return 'Protocol error';
576577
} else {
577-
return '${error.runtimeType}';
578+
return '${error.runtimeType}: $error';
578579
}
579580
}
580581

@@ -598,7 +599,7 @@ final class _ActiveRustStreamingIteration {
598599
assert(_completedStream.isCompleted, 'Should have started streaming');
599600
await _completedStream.future;
600601
} finally {
601-
_isActive = true;
602+
_isActive = false;
602603
_completedUploads?.cancel();
603604
await _stop();
604605
}
@@ -614,6 +615,10 @@ final class _ActiveRustStreamingIteration {
614615

615616
loop:
616617
await for (final event in events) {
618+
if (!_isActive || sync.aborted) {
619+
break;
620+
}
621+
617622
switch (event) {
618623
case ReceivedLine(line: final Uint8List line):
619624
await _control('line_binary', line);
@@ -629,7 +634,9 @@ final class _ActiveRustStreamingIteration {
629634
}
630635
}
631636

632-
Future<void> _stop() => _control('stop');
637+
Future<void> _stop() {
638+
return _control('stop');
639+
}
633640

634641
Future<void> _control(String operation, [Object? payload]) async {
635642
final rawResponse = await sync.adapter.control(operation, payload);
@@ -668,7 +675,10 @@ final class _ActiveRustStreamingIteration {
668675
});
669676
}
670677
case CloseSyncStream():
671-
sync._nonLineSyncEvents.add(const AbortCurrentIteration());
678+
if (!sync.aborted) {
679+
_isActive = false;
680+
sync._nonLineSyncEvents.add(const AbortCurrentIteration());
681+
}
672682
case FlushFileSystem():
673683
await sync.adapter.flushFileSystem();
674684
case DidCompleteSync():

packages/powersync_core/test/in_memory_sync_test.dart

Lines changed: 79 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,26 @@ import 'utils/in_memory_http.dart';
1515
import 'utils/test_utils_impl.dart';
1616

1717
void main() {
18+
_declareTests(
19+
'dart sync client',
20+
SyncOptions(
21+
// ignore: deprecated_member_use_from_same_package
22+
syncImplementation: SyncClientImplementation.dart,
23+
retryDelay: Duration(milliseconds: 200)),
24+
);
25+
26+
_declareTests(
27+
'rust sync client',
28+
SyncOptions(
29+
syncImplementation: SyncClientImplementation.rust,
30+
retryDelay: Duration(milliseconds: 200)),
31+
);
32+
}
33+
34+
void _declareTests(String name, SyncOptions options) {
1835
final ignoredLogger = Logger.detached('powersync.test')..level = Level.OFF;
1936

20-
group('in-memory sync tests', () {
37+
group(name, () {
2138
late final testUtils = TestUtils();
2239

2340
late TestPowerSyncFactory factory;
@@ -44,8 +61,7 @@ void main() {
4461
expiresAt: DateTime.now(),
4562
);
4663
}, uploadData: (db) => uploadData(db)),
47-
options: const SyncOptions(retryDelay: Duration(milliseconds: 200)),
48-
logger: logger,
64+
options: options,
4965
);
5066

5167
addTearDown(() async {
@@ -113,6 +129,7 @@ void main() {
113129
});
114130
await expectLater(
115131
status, emits(isSyncStatus(downloading: false, hasSynced: true)));
132+
await syncClient.abort();
116133

117134
final independentDb = factory.wrapRaw(raw, logger: ignoredLogger);
118135
addTearDown(independentDb.close);
@@ -128,65 +145,68 @@ void main() {
128145
isTrue);
129146
});
130147

131-
test('can save independent buckets in same transaction', () async {
132-
final status = await waitForConnection();
133-
134-
syncService.addLine({
135-
'checkpoint': Checkpoint(
136-
lastOpId: '0',
137-
writeCheckpoint: null,
138-
checksums: [
139-
BucketChecksum(bucket: 'a', checksum: 0, priority: 3),
140-
BucketChecksum(bucket: 'b', checksum: 0, priority: 3),
141-
],
142-
)
143-
});
144-
await expectLater(status, emits(isSyncStatus(downloading: true)));
145-
146-
var commits = 0;
147-
raw.commits.listen((_) => commits++);
148+
// ignore: deprecated_member_use_from_same_package
149+
if (options.syncImplementation == SyncClientImplementation.dart) {
150+
test('can save independent buckets in same transaction', () async {
151+
final status = await waitForConnection();
148152

149-
syncService
150-
..addLine({
151-
'data': {
152-
'bucket': 'a',
153-
'data': <Map<String, Object?>>[
154-
{
155-
'op_id': '1',
156-
'op': 'PUT',
157-
'object_type': 'a',
158-
'object_id': '1',
159-
'checksum': 0,
160-
'data': {},
161-
}
162-
],
163-
}
164-
})
165-
..addLine({
166-
'data': {
167-
'bucket': 'b',
168-
'data': <Map<String, Object?>>[
169-
{
170-
'op_id': '2',
171-
'op': 'PUT',
172-
'object_type': 'b',
173-
'object_id': '1',
174-
'checksum': 0,
175-
'data': {},
176-
}
153+
syncService.addLine({
154+
'checkpoint': Checkpoint(
155+
lastOpId: '0',
156+
writeCheckpoint: null,
157+
checksums: [
158+
BucketChecksum(bucket: 'a', checksum: 0, priority: 3),
159+
BucketChecksum(bucket: 'b', checksum: 0, priority: 3),
177160
],
178-
}
161+
)
179162
});
163+
await expectLater(status, emits(isSyncStatus(downloading: true)));
180164

181-
// Wait for the operations to be inserted.
182-
while (raw.select('SELECT * FROM ps_oplog;').length < 2) {
183-
await pumpEventQueue();
184-
}
165+
var commits = 0;
166+
raw.commits.listen((_) => commits++);
185167

186-
// The two buckets should have been inserted in a single transaction
187-
// because the messages were received in quick succession.
188-
expect(commits, 1);
189-
});
168+
syncService
169+
..addLine({
170+
'data': {
171+
'bucket': 'a',
172+
'data': <Map<String, Object?>>[
173+
{
174+
'op_id': '1',
175+
'op': 'PUT',
176+
'object_type': 'a',
177+
'object_id': '1',
178+
'checksum': 0,
179+
'data': {},
180+
}
181+
],
182+
}
183+
})
184+
..addLine({
185+
'data': {
186+
'bucket': 'b',
187+
'data': <Map<String, Object?>>[
188+
{
189+
'op_id': '2',
190+
'op': 'PUT',
191+
'object_type': 'b',
192+
'object_id': '1',
193+
'checksum': 0,
194+
'data': {},
195+
}
196+
],
197+
}
198+
});
199+
200+
// Wait for the operations to be inserted.
201+
while (raw.select('SELECT * FROM ps_oplog;').length < 2) {
202+
await pumpEventQueue();
203+
}
204+
205+
// The two buckets should have been inserted in a single transaction
206+
// because the messages were received in quick succession.
207+
expect(commits, 1);
208+
});
209+
}
190210

191211
group('partial sync', () {
192212
test('updates sync state incrementally', () async {
@@ -287,6 +307,7 @@ void main() {
287307
});
288308
await database.waitForFirstSync(priority: BucketPriority(1));
289309
expect(database.currentStatus.hasSynced, isFalse);
310+
await syncClient.abort();
290311

291312
final independentDb = factory.wrapRaw(raw, logger: ignoredLogger);
292313
addTearDown(independentDb.close);
@@ -491,7 +512,7 @@ void main() {
491512
}) async {
492513
await expectLater(
493514
status,
494-
emits(isSyncStatus(
515+
emitsThrough(isSyncStatus(
495516
downloading: true,
496517
downloadProgress: isSyncDownloadProgress(
497518
progress: total,
@@ -679,7 +700,6 @@ void main() {
679700
await checkProgress(progress(8, 8), progress(10, 14));
680701

681702
addCheckpointComplete(0);
682-
await checkProgress(progress(8, 8), progress(10, 14));
683703

684704
addDataLine('b', 4);
685705
await checkProgress(progress(8, 8), progress(14, 14));

0 commit comments

Comments
 (0)