Skip to content

Commit f9177e3

Browse files
authored
Merge pull request #74 from powersync-ja/fix-multiple-connections
Fix "Checksum mismatch" when calling connect() multiple times
2 parents 2a2b2ab + ccaf293 commit f9177e3

File tree

4 files changed

+110
-42
lines changed

4 files changed

+110
-42
lines changed

packages/powersync/lib/src/powersync_database.dart

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import 'dart:isolate';
33

44
import 'package:logging/logging.dart';
55
import 'package:powersync/src/log_internal.dart';
6+
import 'package:sqlite_async/mutex.dart';
67
import 'package:sqlite_async/sqlite3.dart' as sqlite;
78
import 'package:sqlite_async/sqlite_async.dart';
89

@@ -69,6 +70,9 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection {
6970
/// null when disconnected, present when connecting or connected
7071
AbortController? _disconnecter;
7172

73+
/// Use to prevent multiple connections from being opened concurrently
74+
final Mutex _connectMutex = Mutex();
75+
7276
/// The Logger used by this [PowerSyncDatabase].
7377
///
7478
/// The default is [autoLogger], which logs to the console in debug builds.
@@ -190,6 +194,13 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection {
190194
/// Throttle time between CRUD operations
191195
/// Defaults to 10 milliseconds.
192196
Duration crudThrottleTime = const Duration(milliseconds: 10)}) async {
197+
_connectMutex.lock(() =>
198+
_connect(connector: connector, crudThrottleTime: crudThrottleTime));
199+
}
200+
201+
Future<void> _connect(
202+
{required PowerSyncBackendConnector connector,
203+
required Duration crudThrottleTime}) async {
193204
await initialize();
194205

195206
// Disconnect if connected

packages/powersync/pubspec.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ dependencies:
1010
flutter:
1111
sdk: flutter
1212

13-
sqlite_async: ^0.6.0
13+
sqlite_async: ^0.6.1
1414
sqlite3_flutter_libs: ^0.5.15
1515
http: ^1.1.0
1616
uuid: ^4.2.0

packages/powersync/test/streaming_sync_test.dart

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import 'dart:async';
2-
import 'dart:io';
32
import 'dart:math';
43

54
import 'package:powersync/powersync.dart';
@@ -44,12 +43,8 @@ void main() {
4443
var server = await createServer();
4544

4645
credentialsCallback() async {
47-
final endpoint = 'http://${server.address.host}:${server.port}';
4846
return PowerSyncCredentials(
49-
endpoint: endpoint,
50-
token: 'token',
51-
userId: 'u1',
52-
expiresAt: DateTime.now());
47+
endpoint: server.endpoint, token: 'token');
5348
}
5449

5550
final pdb = await setupPowerSync(path: path);
@@ -59,12 +54,12 @@ void main() {
5954

6055
await Future.delayed(Duration(milliseconds: random.nextInt(100)));
6156
if (random.nextBool()) {
62-
server.close(force: true).ignore();
57+
server.close();
6358
}
6459

6560
await pdb.close();
6661

67-
server.close(force: true).ignore();
62+
server.close();
6863
}
6964
});
7065

@@ -81,18 +76,13 @@ void main() {
8176
// [PowerSync] WARNING: 2023-06-29 16:10:17.667537: Sync Isolate error
8277
// [Connection closed while receiving data, #0 IOClient.send.<anonymous closure> (package:http/src/io_client.dart:76:13)
8378

84-
HttpServer? server;
79+
TestServer? server;
8580

8681
credentialsCallback() async {
8782
if (server == null) {
8883
throw AssertionError('No active server');
8984
}
90-
final endpoint = 'http://${server.address.host}:${server.port}';
91-
return PowerSyncCredentials(
92-
endpoint: endpoint,
93-
token: 'token',
94-
userId: 'u1',
95-
expiresAt: DateTime.now());
85+
return PowerSyncCredentials(endpoint: server.endpoint, token: 'token');
9686
}
9787

9888
final pdb = await setupPowerSync(path: path);
@@ -107,9 +97,47 @@ void main() {
10797
// 2ms: HttpException: HttpServer is not bound to a socket
10898
// 20ms: Connection closed while receiving data
10999
await Future.delayed(Duration(milliseconds: 20));
110-
server.close(force: true).ignore();
100+
server.close();
111101
}
112102
await pdb.close();
113103
});
104+
105+
test('multiple connect calls', () async {
106+
// Test calling connect() multiple times.
107+
// We check that this does not cause multiple connections to be opened concurrently.
108+
final random = Random();
109+
var server = await createServer();
110+
111+
credentialsCallback() async {
112+
return PowerSyncCredentials(endpoint: server.endpoint, token: 'token');
113+
}
114+
115+
final pdb = await setupPowerSync(path: path);
116+
pdb.retryDelay = Duration(milliseconds: 5000);
117+
var connector = TestConnector(credentialsCallback);
118+
pdb.connect(connector: connector);
119+
pdb.connect(connector: connector);
120+
121+
final watch = Stopwatch()..start();
122+
123+
// Wait for at least one connection
124+
while (server.connectionCount < 1 && watch.elapsedMilliseconds < 500) {
125+
await Future.delayed(Duration(milliseconds: random.nextInt(10)));
126+
}
127+
// Give some time for a second connection if any
128+
await Future.delayed(Duration(milliseconds: random.nextInt(50)));
129+
130+
await pdb.close();
131+
132+
// Give some time for connections to close
133+
while (server.connectionCount != 0 && watch.elapsedMilliseconds < 1000) {
134+
await Future.delayed(Duration(milliseconds: random.nextInt(10)));
135+
}
136+
137+
expect(server.connectionCount, equals(0));
138+
expect(server.maxConnectionCount, equals(1));
139+
140+
server.close();
141+
});
114142
});
115143
}

packages/powersync/test/test_server.dart

Lines changed: 54 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,67 @@
11
import 'dart:async';
22
import 'dart:convert' as convert;
33
import 'dart:io';
4+
import 'dart:math';
45

56
import 'package:http/http.dart' show ByteStream;
67
import 'package:shelf/shelf.dart';
78
import 'package:shelf/shelf_io.dart' as shelf_io;
89
import 'package:shelf_router/shelf_router.dart';
910

10-
Future<HttpServer> createServer() async {
11-
var app = Router();
11+
class TestServer {
12+
late HttpServer server;
13+
Router app = Router();
14+
int connectionCount = 0;
15+
int maxConnectionCount = 0;
16+
int tokenExpiresIn;
1217

13-
app.post('/sync/stream', handleSyncStream);
14-
// Open on an arbitrary open port
15-
var server = await shelf_io.serve(app.call, 'localhost', 0);
18+
TestServer({this.tokenExpiresIn = 65});
19+
20+
Future<void> init() async {
21+
app.post('/sync/stream', handleSyncStream);
22+
// Open on an arbitrary open port
23+
server = await shelf_io.serve(app.call, 'localhost', 0);
24+
}
25+
26+
String get endpoint {
27+
return 'http://${server.address.host}:${server.port}';
28+
}
29+
30+
Future<Response> handleSyncStream(Request request) async {
31+
connectionCount += 1;
32+
maxConnectionCount = max(connectionCount, maxConnectionCount);
33+
34+
stream() async* {
35+
try {
36+
var blob = "*" * 5000;
37+
for (var i = 0; i < 50; i++) {
38+
yield {"token_expires_in": tokenExpiresIn, "blob": blob};
39+
await Future.delayed(Duration(microseconds: 1));
40+
}
41+
} finally {
42+
connectionCount -= 1;
43+
}
44+
}
45+
46+
return Response.ok(
47+
encodeNdjson(stream()),
48+
headers: {
49+
'Content-Type': 'application/x-ndjson',
50+
},
51+
context: {
52+
'shelf.io.buffer_output': false,
53+
},
54+
);
55+
}
56+
57+
void close() {
58+
server.close(force: true).ignore();
59+
}
60+
}
61+
62+
Future<TestServer> createServer() async {
63+
var server = TestServer();
64+
await server.init();
1665
return server;
1766
}
1867

@@ -22,23 +71,3 @@ ByteStream encodeNdjson(Stream<Object> jsonInput) {
2271
final byteInput = stringInput.transform(convert.utf8.encoder);
2372
return ByteStream(byteInput);
2473
}
25-
26-
Future<Response> handleSyncStream(Request request) async {
27-
stream() async* {
28-
var blob = "*" * 5000;
29-
for (var i = 0; i < 50; i++) {
30-
yield {"token_expires_in": 5, "blob": blob};
31-
await Future.delayed(Duration(microseconds: 1));
32-
}
33-
}
34-
35-
return Response.ok(
36-
encodeNdjson(stream()),
37-
headers: {
38-
'Content-Type': 'application/x-ndjson',
39-
},
40-
context: {
41-
'shelf.io.buffer_output': false,
42-
},
43-
);
44-
}

0 commit comments

Comments
 (0)