Skip to content

Recover from some errors within sync iteration #97

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use alloc::{
boxed::Box,
string::{String, ToString},
};
use num_traits::FromPrimitive;
use sqlite_nostd::{context, sqlite3, Connection, Context, ResultCode};
use thiserror::Error;

Expand Down Expand Up @@ -129,6 +130,20 @@ impl PowerSyncError {
Internal { .. } => ResultCode::INTERNAL,
}
}

pub fn can_retry(&self) -> bool {
match self.inner.as_ref() {
RawPowerSyncError::Sqlite(cause) => {
let base_error = ResultCode::from_i32((cause.code as i32) & 0xFF);
if base_error == Some(ResultCode::BUSY) || base_error == Some(ResultCode::LOCKED) {
true
} else {
false
}
}
_ => false,
}
}
}

impl Display for PowerSyncError {
Expand Down
453 changes: 293 additions & 160 deletions crates/core/src/sync/streaming_sync.rs

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions crates/core/src/sync/sync_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,12 @@ impl SyncDownloadProgress {
);
}

// Ignore errors here - SQLite seems to report errors from an earlier statement iteration
// sometimes.
let _ = adapter.progress_stmt.reset();

// Go through local bucket states to detect pending progress from previous sync iterations
// that may have been interrupted.
adapter.progress_stmt.reset()?;

while let Some(row) = adapter.step_progress()? {
let Some(progress) = buckets.get_mut(row.bucket) else {
continue;
Expand Down
56 changes: 32 additions & 24 deletions dart/pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ packages:
dependency: transitive
description:
name: _fe_analyzer_shared
sha256: e55636ed79578b9abca5fecf9437947798f5ef7456308b5cb85720b793eac92f
sha256: da0d9209ca76bde579f2da330aeb9df62b6319c834fa7baae052021b0462401f
url: "https://pub.dev"
source: hosted
version: "82.0.0"
version: "85.0.0"
analyzer:
dependency: transitive
description:
name: analyzer
sha256: "13c1e6c6fd460522ea840abec3f677cc226f5fec7872c04ad7b425517ccf54f7"
sha256: f6154230675c44a191f2e20d16eeceb4aa18b30ca732db4efaf94c6a7d43cfa6
url: "https://pub.dev"
source: hosted
version: "7.4.4"
version: "7.5.2"
args:
dependency: transitive
description:
Expand Down Expand Up @@ -45,10 +45,10 @@ packages:
dependency: "direct main"
description:
name: bson
sha256: "9b761248a3494fea594aecf5d6f369b5f04d7b082aa2b8c06579ade77f1a7e47"
sha256: f8c80be7a62a88f4add7c48cc83567c36a77532de107224df8328ef71f125045
url: "https://pub.dev"
source: hosted
version: "5.0.6"
version: "5.0.7"
cli_config:
dependency: transitive
description:
Expand Down Expand Up @@ -85,10 +85,10 @@ packages:
dependency: transitive
description:
name: coverage
sha256: "9086475ef2da7102a0c0a4e37e1e30707e7fb7b6d28c209f559a9c5f8ce42016"
sha256: aa07dbe5f2294c827b7edb9a87bba44a9c15a3cc81bc8da2ca19b37322d30080
url: "https://pub.dev"
source: hosted
version: "1.12.0"
version: "1.14.1"
crypto:
dependency: transitive
description:
Expand All @@ -101,10 +101,10 @@ packages:
dependency: transitive
description:
name: decimal
sha256: "28239b8b929c1bd8618702e6dbc96e2618cf99770bbe9cb040d6cf56a11e4ec3"
sha256: fc706a5618b81e5b367b01dd62621def37abc096f2b46a9bd9068b64c1fa36d0
url: "https://pub.dev"
source: hosted
version: "3.2.1"
version: "3.2.4"
fake_async:
dependency: "direct dev"
description:
Expand Down Expand Up @@ -213,10 +213,10 @@ packages:
dependency: "direct dev"
description:
name: meta
sha256: e3641ec5d63ebf0d9b41bd43201a66e3fc79a65db5f61fc181f04cd27aab950c
sha256: "23f08335362185a5ea2ad3a4e597f1375e78bce8a040df5c600c8d3552ef2394"
url: "https://pub.dev"
source: hosted
version: "1.16.0"
version: "1.17.0"
mime:
dependency: transitive
description:
Expand Down Expand Up @@ -405,26 +405,34 @@ packages:
dependency: "direct dev"
description:
name: test
sha256: "301b213cd241ca982e9ba50266bd3f5bd1ea33f1455554c5abb85d1be0e2d87e"
sha256: "65e29d831719be0591f7b3b1a32a3cda258ec98c58c7b25f7b84241bc31215bb"
url: "https://pub.dev"
source: hosted
version: "1.25.15"
version: "1.26.2"
test_api:
dependency: transitive
description:
name: test_api
sha256: fb31f383e2ee25fbbfe06b40fe21e1e458d14080e3c67e7ba0acfde4df4e0bbd
sha256: "522f00f556e73044315fa4585ec3270f1808a4b186c936e612cab0b565ff1e00"
url: "https://pub.dev"
source: hosted
version: "0.7.4"
version: "0.7.6"
test_core:
dependency: transitive
description:
name: test_core
sha256: "84d17c3486c8dfdbe5e12a50c8ae176d15e2a771b96909a9442b40173649ccaa"
sha256: "80bf5a02b60af04b09e14f6fe68b921aad119493e26e490deaca5993fef1b05a"
url: "https://pub.dev"
source: hosted
version: "0.6.8"
version: "0.6.11"
test_descriptor:
dependency: "direct dev"
description:
name: test_descriptor
sha256: "9ce468c97ae396e8440d26bb43763f84e2a2a5331813ee5a397cb4da481aaf9a"
url: "https://pub.dev"
source: hosted
version: "2.0.2"
typed_data:
dependency: transitive
description:
Expand All @@ -445,18 +453,18 @@ packages:
dependency: transitive
description:
name: vm_service
sha256: ddfa8d30d89985b96407efce8acbdd124701f96741f2d981ca860662f1c0dc02
sha256: "45caa6c5917fa127b5dbcfbd1fa60b14e583afdc08bfc96dda38886ca252eb60"
url: "https://pub.dev"
source: hosted
version: "15.0.0"
version: "15.0.2"
watcher:
dependency: transitive
description:
name: watcher
sha256: "69da27e49efa56a15f8afe8f4438c4ec02eff0a117df1b22ea4aad194fe1c104"
sha256: "0b7fd4a0bbc4b92641dbf20adfd7e3fd1398fe17102d94b674234563e110088a"
url: "https://pub.dev"
source: hosted
version: "1.1.1"
version: "1.1.2"
web:
dependency: transitive
description:
Expand All @@ -469,10 +477,10 @@ packages:
dependency: transitive
description:
name: web_socket
sha256: bfe6f435f6ec49cb6c01da1e275ae4228719e59a6b067048c51e72d9d63bcc4b
sha256: "34d64019aa8e36bf9842ac014bb5d2f5586ca73df5e4d9bf5c936975cae6982c"
url: "https://pub.dev"
source: hosted
version: "1.0.0"
version: "1.0.1"
web_socket_channel:
dependency: transitive
description:
Expand Down
1 change: 1 addition & 0 deletions dart/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies:

dev_dependencies:
test: ^1.25.0
test_descriptor: ^2.0.2
file: ^7.0.1
sqlite3_test: ^0.1.1
fake_async: ^1.3.3
Expand Down
7 changes: 1 addition & 6 deletions dart/test/error_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:convert';
import 'package:sqlite3/common.dart';
import 'package:test/test.dart';

import 'utils/matchers.dart';
import 'utils/native_test_utils.dart';

void main() {
Expand Down Expand Up @@ -67,9 +68,3 @@ void main() {
});
});
}

Matcher isSqliteException(int code, dynamic message) {
return isA<SqliteException>()
.having((e) => e.extendedResultCode, 'extendedResultCode', code)
.having((e) => e.message, 'message', message);
}
137 changes: 129 additions & 8 deletions dart/test/sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import 'package:sqlite3/common.dart';
import 'package:sqlite3/sqlite3.dart';
import 'package:sqlite3_test/sqlite3_test.dart';
import 'package:test/test.dart';
import 'package:test_descriptor/test_descriptor.dart' as d;
import 'package:path/path.dart';

import 'utils/matchers.dart';
import 'utils/native_test_utils.dart';

@isTest
Expand Down Expand Up @@ -50,17 +52,24 @@ void _syncTests<T>({

List<Object?> invokeControlRaw(String operation, Object? data) {
db.execute('begin');
final [row] =
db.select('SELECT powersync_control(?, ?)', [operation, data]);
ResultSet result;

// Make sure that powersync_control doesn't leave any busy statements
// behind.
// TODO: Re-enable after we can guarantee sqlite_stmt being available
// const statement = 'SELECT * FROM sqlite_stmt WHERE busy AND sql != ?;';
// final busy = db.select(statement, [statement]);
// expect(busy, isEmpty);
try {
result = db.select('SELECT powersync_control(?, ?)', [operation, data]);

// Make sure that powersync_control doesn't leave any busy statements
// behind.
// TODO: Re-enable after we can guarantee sqlite_stmt being available
// const statement = 'SELECT * FROM sqlite_stmt WHERE busy AND sql != ?;';
// final busy = db.select(statement, [statement]);
// expect(busy, isEmpty);
} catch (e) {
db.execute('rollback');
rethrow;
}

db.execute('commit');
final [row] = result;
return jsonDecode(row.columnAt(0));
}

Expand Down Expand Up @@ -683,6 +692,118 @@ void _syncTests<T>({
// Should delete bucket with checksum mismatch
expect(db.select('SELECT * FROM ps_buckets'), isEmpty);
});

group('recoverable', () {
late CommonDatabase secondary;
final checkpoint = {
'checkpoint': {
'last_op_id': '1',
'write_checkpoint': null,
'buckets': [
{
'bucket': 'a',
'checksum': 0,
'priority': 3,
'count': 1,
}
],
},
};

setUp(() {
final fileName = d.path('test.db');

db = openTestDatabase(fileName: fileName)
..select('select powersync_init();')
..select('select powersync_replace_schema(?)', [json.encode(_schema)])
..execute('update ps_kv set value = ?2 where key = ?1',
['client_id', 'test-test-test-test']);

secondary = openTestDatabase(fileName: fileName);
});

test('starting checkpoints', () {
db.execute('INSERT INTO ps_buckets (name) VALUES (?)', ['unrelated']);
invokeControl('start', null);

// Lock the db so that the checkpoint line can't delete the unrelated
// bucket.
secondary.execute('begin exclusive');
expect(
() => syncLine(checkpoint),
throwsA(
isSqliteException(
5, 'powersync_control: internal SQLite call returned BUSY'),
),
);
secondary.execute('commit');
expect(db.select('SELECT name FROM ps_buckets'), [
{'name': 'unrelated'}
]);

syncLine(checkpoint);
expect(db.select('SELECT name FROM ps_buckets'), isEmpty);
});

test('saving oplog data', () {
invokeControl('start', null);
syncLine(checkpoint);

// Lock the database before the data line
secondary.execute('begin exclusive');

// This should make powersync_control unable to save oplog data.
expect(
() => pushSyncData('a', '1', '1', 'PUT', {'col': 'hi'}),
throwsA(isSqliteException(
5, 'powersync_control: internal SQLite call returned BUSY')),
);

// But we should be able to retry
secondary.execute('commit');

expect(pushSyncData('a', '1', '1', 'PUT', {'col': 'hi'}), [
containsPair(
'UpdateSyncStatus',
containsPair(
'status',
containsPair(
'downloading',
{
'buckets': {
'a': {
'priority': 3,
'at_last': 0,
'since_last': 1,
'target_count': 1
},
}
},
),
),
)
]);
});

test('applying local changes', () {
invokeControl('start', null);
syncLine(checkpoint);
pushSyncData('a', '1', '1', 'PUT', {'col': 'hi'});

secondary.execute('begin exclusive');
expect(
() => pushCheckpointComplete(),
throwsA(
isSqliteException(
5, 'powersync_control: internal SQLite call returned BUSY'),
),
);
secondary.execute('commit');

pushCheckpointComplete();
expect(db.select('SELECT * FROM items'), hasLength(1));
});
});
});

syncTest('sets powersync_in_sync_operation', (_) {
Expand Down
8 changes: 8 additions & 0 deletions dart/test/utils/matchers.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import 'package:sqlite3/common.dart';
import 'package:test/test.dart';

Matcher isSqliteException(int code, dynamic message) {
return isA<SqliteException>()
.having((e) => e.extendedResultCode, 'extendedResultCode', code)
.having((e) => e.message, 'message', message);
}