Skip to content

[Fix] CRUD Upload on Reconnect #203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion packages/powersync/lib/src/streaming_sync.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:async';
import 'dart:convert' as convert;

import 'package:async/async.dart';
import 'package:http/http.dart' as http;
import 'package:powersync/src/abort_controller.dart';
import 'package:powersync/src/exceptions.dart';
Expand All @@ -27,6 +28,13 @@ class StreamingSyncImplementation {

final Future<void> Function() uploadCrud;

// An internal controller which is used to trigger CRUD uploads internally
// e.g. when reconnecting.
// This is only a broadcast controller since the `crudLoop` method is public
// and could potentially be called multiple times externally.
final StreamController<Null> _internalCrudTriggerController =
StreamController<Null>.broadcast();

final Stream crudUpdateTriggerStream;

final StreamController<SyncStatus> _statusStreamController =
Expand Down Expand Up @@ -92,6 +100,9 @@ class StreamingSyncImplementation {
if (_safeToClose) {
_client.close();
}

await _internalCrudTriggerController.close();

// wait for completeAbort() to be called
await future;

Expand Down Expand Up @@ -155,7 +166,13 @@ class StreamingSyncImplementation {
Future<void> crudLoop() async {
await uploadAllCrud();

await for (var _ in crudUpdateTriggerStream) {
// Trigger a CRUD upload whenever the upstream trigger fires
// as-well-as whenever the sync stream reconnects.
// This has the potential (in rare cases) to affect the crudThrottleTime,
// but it should not result in excessive uploads since the
// sync reconnects are also throttled.
await for (var _ in StreamGroup.merge(
[crudUpdateTriggerStream, _internalCrudTriggerController.stream])) {
if (_abort?.aborted == true) {
break;
}
Expand Down Expand Up @@ -298,6 +315,9 @@ class StreamingSyncImplementation {
Future<void>? credentialsInvalidation;
bool haveInvalidated = false;

// Trigger a CRUD upload on reconnect
_internalCrudTriggerController.add(null);

await for (var line in merged) {
if (aborted) {
break;
Expand Down
128 changes: 128 additions & 0 deletions packages/powersync/test/connected_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
@TestOn('!browser')
// This test uses a local server which is possible to control in Web via hybrid main,
// but this makes the test significantly more complex.
import 'dart:async';

import 'package:powersync/powersync.dart';
import 'package:test/test.dart';

import 'server/sync_server/mock_sync_server.dart';
import 'streaming_sync_test.dart';
import 'utils/abstract_test_utils.dart';
import 'utils/test_utils_impl.dart';

final testUtils = TestUtils();

void main() {
late TestHttpServerHelper testServer;
late String path;

setUp(() async {
path = testUtils.dbPath();
testServer = TestHttpServerHelper();
await testServer.start();
});

tearDown(() async {
await testUtils.cleanDb(path: path);
await testServer.stop();
});

test('should connect to mock PowerSync instance', () async {
final connector = TestConnector(() async {
return PowerSyncCredentials(
endpoint: testServer.uri.toString(),
token: 'token not used here',
expiresAt: DateTime.now());
});

final db = PowerSyncDatabase.withFactory(
await testUtils.testFactory(path: path),
schema: defaultSchema,
maxReaders: 3);
await db.initialize();

final connectedCompleter = Completer();

db.statusStream.listen((status) {
if (status.connected) {
connectedCompleter.complete();
}
});

// Add a basic command for the test server to send
testServer.addEvent('{"token_expires_in": 3600}\n');

await db.connect(connector: connector);
await connectedCompleter.future;

expect(db.connected, isTrue);
});

test('should trigger uploads when connection is re-established', () async {
int uploadCounter = 0;
Completer uploadTriggeredCompleter = Completer();

final connector = TestConnector(() async {
return PowerSyncCredentials(
endpoint: testServer.uri.toString(),
token: 'token not used here',
expiresAt: DateTime.now());
}, uploadData: (database) async {
uploadCounter++;
uploadTriggeredCompleter.complete();
throw Exception('No uploads occur here');
});

final db = PowerSyncDatabase.withFactory(
await testUtils.testFactory(path: path),
schema: defaultSchema,
maxReaders: 3);
await db.initialize();

// Create an item which should trigger an upload.
await db.execute(
'INSERT INTO customers (id, name) VALUES (uuid(), ?)', ['steven']);

// Create a new completer to await the next upload
uploadTriggeredCompleter = Completer();

// Connect the PowerSync instance
final connectedCompleter = Completer();
// The first connection attempt will fail
final connectedErroredCompleter = Completer();

db.statusStream.listen((status) {
if (status.connected) {
connectedCompleter.complete();
}
if (status.downloadError != null &&
!connectedErroredCompleter.isCompleted) {
connectedErroredCompleter.complete();
}
});

// The first command will not be valid, this simulates a failed connection
testServer.addEvent('asdf\n');
await db.connect(connector: connector);

// The connect operation should have triggered an upload (even though it fails to connect)
await uploadTriggeredCompleter.future;
expect(uploadCounter, equals(1));
// Create a new completer for the next iteration
uploadTriggeredCompleter = Completer();

// Connection attempt should initially fail
await connectedErroredCompleter.future;
expect(db.currentStatus.anyError, isNotNull);

// Now send a valid command. Which will result in successful connection
await testServer.clearEvents();
testServer.addEvent('{"token_expires_in": 3600}\n');
await connectedCompleter.future;
expect(db.connected, isTrue);

await uploadTriggeredCompleter.future;
expect(uploadCounter, equals(2));
});
}
54 changes: 54 additions & 0 deletions packages/powersync/test/server/sync_server/mock_sync_server.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';

import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as io;
import 'package:shelf_router/shelf_router.dart';

// A basic Mock PowerSync service server which queues commands
// which clients can receive via connecting to the `/sync/stream` route.
// This assumes only one client will ever be connected at a time.
class TestHttpServerHelper {
// Use a queued stream to make tests easier.
StreamController<String> _controller = StreamController<String>();
late HttpServer _server;
Uri get uri => Uri.parse('http://localhost:${_server.port}');

Future<void> start() async {
final router = Router()
..post('/sync/stream', (Request request) async {
// Respond immediately with a stream
return Response.ok(_controller.stream.transform(utf8.encoder),
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Transfer-Encoding': 'identity',
},
context: {
"shelf.io.buffer_output": false
});
});

_server = await io.serve(router, 'localhost', 0);
print('Test server running at ${_server.address}:${_server.port}');
}

// Queue events which will be sent to connected clients.
void addEvent(String data) {
_controller.add(data);
}

// Clear events. We rely on a buffered controller here. Create a new controller
// in order to clear the buffer.
Future<void> clearEvents() async {
await _controller.close();
_controller = StreamController<String>();
}

Future<void> stop() async {
await _controller.close();
await _server.close();
}
}
9 changes: 7 additions & 2 deletions packages/powersync/test/streaming_sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,21 @@ final testUtils = TestUtils();

class TestConnector extends PowerSyncBackendConnector {
final Function _fetchCredentials;
final Future<void> Function(PowerSyncDatabase)? _uploadData;

TestConnector(this._fetchCredentials);
TestConnector(this._fetchCredentials,
{Future<void> Function(PowerSyncDatabase)? uploadData})
: _uploadData = uploadData;

@override
Future<PowerSyncCredentials?> fetchCredentials() {
return _fetchCredentials();
}

@override
Future<void> uploadData(PowerSyncDatabase database) async {}
Future<void> uploadData(PowerSyncDatabase database) async {
await _uploadData?.call(database);
}
}

void main() {
Expand Down