Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
245d9af
Start adding support for sync streams
simolus3 Jun 19, 2025
65bdf1b
Add protocol changes
simolus3 Jul 1, 2025
73e15ce
Use serde_with
simolus3 Jul 1, 2025
90bcbab
Start with subscription logic
simolus3 Jul 2, 2025
5967d89
Start tracking subscriptions
simolus3 Jul 3, 2025
ae9ea85
Track subscriptions
simolus3 Jul 4, 2025
9ac34ae
Test handling default streams
simolus3 Jul 9, 2025
a0267c4
Update last_synced_at for subscriptions
simolus3 Jul 9, 2025
91eaa6d
Allow subscribing to streams
simolus3 Jul 10, 2025
6bf0bca
Expire subscriptions after TTL
simolus3 Jul 14, 2025
ff219d5
Support unsubscribing
simolus3 Jul 14, 2025
d39e201
Delete outdated subscriptions
simolus3 Jul 14, 2025
3129e4e
Include default ttl
simolus3 Jul 14, 2025
5ce30ca
New protocol format
simolus3 Jul 15, 2025
0f487d5
Fix tests
simolus3 Jul 22, 2025
9652865
More stream management tests
simolus3 Jul 22, 2025
950f448
Remove immediate parameter when unsubscribing
simolus3 Jul 22, 2025
9751005
Increase expires_at only when subscribing again
simolus3 Jul 22, 2025
db83d17
Implement new protocol format
simolus3 Aug 7, 2025
4644a82
Report errors
simolus3 Aug 7, 2025
97d447f
Update TTL behavior
simolus3 Aug 12, 2025
2c803e3
Refresh on keepalive
simolus3 Aug 12, 2025
b947528
Instruction to update expiry
simolus3 Aug 13, 2025
3016ddc
Add correct offline state
simolus3 Aug 13, 2025
e7453b5
Add offline sync state helper function
simolus3 Aug 13, 2025
a42845a
Simplify error reporting
simolus3 Aug 13, 2025
a136226
Improve comment
simolus3 Aug 13, 2025
23481d8
Use set for associated buckets
simolus3 Aug 18, 2025
c42169f
Compute progress in core extension
simolus3 Aug 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Start tracking subscriptions
  • Loading branch information
simolus3 committed Aug 19, 2025
commit 5967d89281f413da2a19feb4962d3958aaf4899c
6 changes: 3 additions & 3 deletions crates/core/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,11 @@ CREATE TABLE ps_stream_subscriptions (
local_params TEXT,
ttl INTEGER
) STRICT;
ALTER TABLE ps_buckets ADD COLUMN derived_from INTEGER REFERENCES ps_streams (id);
ALTER TABLE ps_buckets ADD COLUMN from_subscriptions TEXT NOT NULL DEFAULT '[null]';

INSERT INTO ps_migration(id, down_migrations) VALUES(9, json_array(
INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array(
json_object('sql', 'todo down migration'),
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 10')
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 11')
));
";
local_db.exec_safe(stmt)?;
Expand Down
21 changes: 20 additions & 1 deletion crates/core/src/sync/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,31 @@ use sqlite_nostd::{self as sqlite, ColumnType};
use sqlite_nostd::{Connection, Context};

/// Payload provided by SDKs when requesting a sync iteration.
#[derive(Default, Deserialize)]
#[derive(Deserialize)]
pub struct StartSyncStream {
/// Bucket parameters to include in the request when opening a sync stream.
#[serde(default)]
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
#[serde(default)]
pub schema: Schema,
#[serde(default = "StartSyncStream::include_defaults_by_default")]
pub include_defaults: bool,
}

impl StartSyncStream {
pub const fn include_defaults_by_default() -> bool {
true
}
}

impl Default for StartSyncStream {
fn default() -> Self {
Self {
parameters: Default::default(),
schema: Default::default(),
include_defaults: Self::include_defaults_by_default(),
}
}
}

/// A request sent from a client SDK to the [SyncClient] with a `powersync_control` invocation.
Expand Down Expand Up @@ -107,6 +125,7 @@ pub struct StreamingSyncRequest {
pub binary_data: bool,
pub client_id: String,
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
pub streams: StreamSubscriptionRequest,
}

#[derive(Serialize)]
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/sync/line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ pub struct BucketChecksum<'a> {
pub priority: Option<BucketPriority>,
#[serde(default)]
pub count: Option<i64>,
#[serde_as(as = "Option<Vec<DisplayFromStr>>")]
#[serde_as(as = "Vec<Option<DisplayFromStr>>")]
#[serde(default)]
pub subscriptions: Option<Vec<i64>>,
pub subscriptions: Vec<Option<i64>>,
// #[serde(default)]
// #[serde(deserialize_with = "deserialize_optional_string_to_i64")]
// pub last_op_id: Option<i64>,
Expand Down
13 changes: 13 additions & 0 deletions crates/core/src/sync/storage_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
state::DatabaseState,
sync::{
checkpoint::{ChecksumMismatch, validate_checkpoint},
interface::{RequestedStreamSubscription, StreamSubscriptionRequest},
sync_status::SyncPriorityStatus,
},
sync_local::{PartialSyncOperation, SyncOperation},
Expand Down Expand Up @@ -268,6 +269,18 @@ impl StorageAdapter {
}
}

pub fn collect_subscription_requests(
&self,
include_defaults: bool,
) -> Result<StreamSubscriptionRequest, PowerSyncError> {
let mut subscriptions: Vec<RequestedStreamSubscription> = Vec::new();

Ok(StreamSubscriptionRequest {
include_defaults,
subscriptions,
})
}

pub fn now(&self) -> Result<Timestamp, ResultCode> {
self.time_stmt.step()?;
let res = Timestamp(self.time_stmt.column_int64(0));
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/sync/streaming_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,9 @@ impl StreamingSyncIteration {
binary_data: true,
client_id: client_id(self.db)?,
parameters: self.options.parameters.take(),
streams: self
.adapter
.collect_subscription_requests(self.options.include_defaults)?,
};

event
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/sync/subscriptions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::time::Duration;

use alloc::string::String;
use alloc::{boxed::Box, string::String};
use serde::Deserialize;
use serde_with::{serde_as, DurationSeconds};

Expand All @@ -16,7 +16,7 @@ pub enum SubscriptionChangeRequest {
#[derive(Deserialize)]
pub struct SubscribeToStream {
pub stream: String,
pub params: Option<serde_json::value::RawValue>,
pub params: Option<Box<serde_json::value::RawValue>>,
#[serde_as(as = "Option<DurationSeconds>")]
pub ttl: Option<Duration>,
pub priority: Option<BucketPriority>,
Expand All @@ -25,6 +25,6 @@ pub struct SubscribeToStream {
#[derive(Deserialize)]
pub struct UnsubscribeFromStream {
pub stream: String,
pub params: Option<serde_json::value::RawValue>,
pub params: Option<Box<serde_json::value::RawValue>>,
pub immediate: bool,
}