Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
245d9af
Start adding support for sync streams
simolus3 Jun 19, 2025
65bdf1b
Add protocol changes
simolus3 Jul 1, 2025
73e15ce
Use serde_with
simolus3 Jul 1, 2025
90bcbab
Start with subscription logic
simolus3 Jul 2, 2025
5967d89
Start tracking subscriptions
simolus3 Jul 3, 2025
ae9ea85
Track subscriptions
simolus3 Jul 4, 2025
9ac34ae
Test handling default streams
simolus3 Jul 9, 2025
a0267c4
Update last_synced_at for subscriptions
simolus3 Jul 9, 2025
91eaa6d
Allow subscribing to streams
simolus3 Jul 10, 2025
6bf0bca
Expire subscriptions after TTL
simolus3 Jul 14, 2025
ff219d5
Support unsubscribing
simolus3 Jul 14, 2025
d39e201
Delete outdated subscriptions
simolus3 Jul 14, 2025
3129e4e
Include default ttl
simolus3 Jul 14, 2025
5ce30ca
New protocol format
simolus3 Jul 15, 2025
0f487d5
Fix tests
simolus3 Jul 22, 2025
9652865
More stream management tests
simolus3 Jul 22, 2025
950f448
Remove immediate parameter when unsubscribing
simolus3 Jul 22, 2025
9751005
Increase expires_at only when subscribing again
simolus3 Jul 22, 2025
db83d17
Implement new protocol format
simolus3 Aug 7, 2025
4644a82
Report errors
simolus3 Aug 7, 2025
97d447f
Update TTL behavior
simolus3 Aug 12, 2025
2c803e3
Refresh on keepalive
simolus3 Aug 12, 2025
b947528
Instruction to update expiry
simolus3 Aug 13, 2025
3016ddc
Add correct offline state
simolus3 Aug 13, 2025
e7453b5
Add offline sync state helper function
simolus3 Aug 13, 2025
a42845a
Simplify error reporting
simolus3 Aug 13, 2025
a136226
Improve comment
simolus3 Aug 13, 2025
23481d8
Use set for associated buckets
simolus3 Aug 18, 2025
c42169f
Compute progress in core extension
simolus3 Aug 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Instruction to update expiry
  • Loading branch information
simolus3 committed Aug 19, 2025
commit b947528abbe0b77aa378c61a508f715d701e86d9
29 changes: 24 additions & 5 deletions crates/core/src/sync/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct StartSyncStream {
///
/// We will increase the expiry date for those streams at the time we connect and disconnect.
#[serde(default)]
pub active_streams: Vec<StreamKey>,
pub active_streams: Rc<Vec<StreamKey>>,
}

impl StartSyncStream {
Expand Down Expand Up @@ -89,6 +89,12 @@ pub enum SyncEvent<'a> {
TextLine { data: &'a str },
/// Forward a binary line (BSON) received from the sync service.
BinaryLine { data: &'a [u8] },
/// The active stream subscriptions (as in, `SyncStreamSubscription` instances active right now)
/// have changed.
///
/// The client will compare the new active subscriptions with the current one and will issue a
/// request to restart the sync iteration if necessary.
DidUpdateSubscriptions { active_streams: Rc<Vec<StreamKey>> },
}

/// An instruction sent by the core extension to the SDK.
Expand All @@ -114,13 +120,20 @@ pub enum Instruction {
// These are defined like this because deserializers in Kotlin can't support either an
// object or a literal value
/// Close the websocket / HTTP stream to the sync service.
CloseSyncStream {},
CloseSyncStream(CloseSyncStream),
/// Flush the file-system if it's non-durable (only applicable to the Dart SDK).
FlushFileSystem {},
/// Notify that a sync has been completed, prompting client SDKs to clear earlier errors.
DidCompleteSync {},
}

#[derive(Serialize, Default)]
pub struct CloseSyncStream {
/// Whether clients should hide the brief disconnected status from the public sync status and
/// reconnect immediately.
pub hide_disconnect: bool,
}

#[derive(Serialize)]
pub enum LogSeverity {
DEBUG,
Expand All @@ -136,16 +149,16 @@ pub struct StreamingSyncRequest {
pub binary_data: bool,
pub client_id: String,
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
pub streams: StreamSubscriptionRequest,
pub streams: Rc<StreamSubscriptionRequest>,
}

#[derive(Serialize)]
#[derive(Debug, Serialize, PartialEq)]
pub struct StreamSubscriptionRequest {
pub include_defaults: bool,
pub subscriptions: Vec<RequestedStreamSubscription>,
}

#[derive(Serialize)]
#[derive(Debug, Serialize, PartialEq)]
pub struct RequestedStreamSubscription {
/// The name of the sync stream to subscribe to.
pub stream: String,
Expand Down Expand Up @@ -223,6 +236,12 @@ pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(
}),
"refreshed_token" => SyncControlRequest::SyncEvent(SyncEvent::DidRefreshToken),
"completed_upload" => SyncControlRequest::SyncEvent(SyncEvent::UploadFinished),
"update_subscriptions" => {
SyncControlRequest::SyncEvent(SyncEvent::DidUpdateSubscriptions {
active_streams: serde_json::from_str(payload.text())
.map_err(PowerSyncError::as_argument_error)?,
})
}
"subscriptions" => {
let request = serde_json::from_str(payload.text())
.map_err(PowerSyncError::as_argument_error)?;
Expand Down
16 changes: 8 additions & 8 deletions crates/core/src/sync/storage_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::{assert_matches::debug_assert_matches, fmt::Display};

use alloc::{string::ToString, vec::Vec};
use alloc::{rc::Rc, string::ToString, vec::Vec};
use serde::Serialize;
use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode};

Expand All @@ -13,7 +13,7 @@ use crate::{
sync::{
checkpoint::{ChecksumMismatch, validate_checkpoint},
interface::{RequestedStreamSubscription, StreamSubscriptionRequest},
streaming_sync::OwnedStreamDescription,
streaming_sync::{OwnedStreamDescription, RequestedStreamSubscriptions},
subscriptions::{LocallyTrackedSubscription, StreamKey},
sync_status::SyncPriorityStatus,
},
Expand Down Expand Up @@ -287,7 +287,7 @@ impl StorageAdapter {
pub fn collect_subscription_requests(
&self,
include_defaults: bool,
) -> Result<(StreamSubscriptionRequest, Vec<i64>), PowerSyncError> {
) -> Result<RequestedStreamSubscriptions, PowerSyncError> {
self.delete_outdated_subscriptions()?;

let mut subscriptions: Vec<RequestedStreamSubscription> = Vec::new();
Expand All @@ -310,13 +310,13 @@ impl StorageAdapter {
index_to_local_id.push(subscription.id);
}

Ok((
StreamSubscriptionRequest {
Ok(RequestedStreamSubscriptions {
request: Rc::new(StreamSubscriptionRequest {
include_defaults,
subscriptions,
},
index_to_local_id,
))
}),
subscription_ids: Rc::new(index_to_local_id),
})
}

pub fn now(&self) -> Result<Timestamp, ResultCode> {
Expand Down
Loading