Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions crates/core/src/schema/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use core::slice;

use alloc::{string::String, vec::Vec};
use serde::Deserialize;

use crate::schema::{
Column, CommonTableOptions, RawTable, Table, raw_table::InferredTableStructure,
};

/// Utility to wrap both PowerSync-managed JSON tables and raw tables (with their schema snapshot
/// inferred from reading `pragma_table_info`) into a common implementation.
pub enum SchemaTable<'a> {
Json(&'a Table),
Raw {
definition: &'a RawTable,
schema: &'a InferredTableStructure,
},
}

impl<'a> SchemaTable<'a> {
pub fn common_options(&self) -> &CommonTableOptions {
match self {
Self::Json(table) => &table.options,
Self::Raw {
definition,
schema: _,
} => &definition.schema.options,
}
}

/// Iterates over defined column names in this table (not including the `id` column).
pub fn column_names(&self) -> impl Iterator<Item = &'a str> {
match self {
Self::Json(table) => SchemaTableColumnIterator::Json(table.columns.iter()),
Self::Raw {
definition: _,
schema,
} => SchemaTableColumnIterator::Raw(schema.columns.iter()),
}
}
}

impl<'a> From<&'a Table> for SchemaTable<'a> {
fn from(value: &'a Table) -> Self {
Self::Json(value)
}
}

enum SchemaTableColumnIterator<'a> {
Json(slice::Iter<'a, Column>),
Raw(slice::Iter<'a, String>),
}

impl<'a> Iterator for SchemaTableColumnIterator<'a> {
type Item = &'a str;

fn next(&mut self) -> Option<Self::Item> {
Some(match self {
Self::Json(iter) => &iter.next()?.name,
Self::Raw(iter) => iter.next()?.as_ref(),
})
}
}

#[derive(Default)]
pub struct ColumnFilter {
sorted_names: Vec<String>,
}

impl From<Vec<String>> for ColumnFilter {
fn from(mut value: Vec<String>) -> Self {
value.sort();
Self {
sorted_names: value,
}
}
}

impl ColumnFilter {
/// Whether this filter matches the given column name.
pub fn matches(&self, column: &str) -> bool {
self.sorted_names
.binary_search_by(|item| item.as_str().cmp(column))
.is_ok()
}
}

impl AsRef<Vec<String>> for ColumnFilter {
fn as_ref(&self) -> &Vec<String> {
&self.sorted_names
}
}

impl<'de> Deserialize<'de> for ColumnFilter {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(Self::from(Vec::<String>::deserialize(deserializer)?))
}
}
58 changes: 54 additions & 4 deletions crates/core/src/schema/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
mod common;
pub mod inspection;
mod management;
mod raw_table;
mod table_info;

use alloc::{rc::Rc, vec::Vec};
use powersync_sqlite_nostd as sqlite;
pub use common::{ColumnFilter, SchemaTable};
use powersync_sqlite_nostd::{self as sqlite, Connection, Context, Value, args};
use serde::Deserialize;
use sqlite::ResultCode;
pub use table_info::{
Column, DiffIncludeOld, PendingStatement, PendingStatementValue, RawTable, Table,
Column, CommonTableOptions, PendingStatement, PendingStatementValue, RawTable, Table,
TableInfoFlags,
};

use crate::state::DatabaseState;
use crate::{
error::{PSResult, PowerSyncError},
schema::raw_table::generate_raw_table_trigger,
state::DatabaseState,
utils::WriteType,
};

#[derive(Deserialize, Default)]
pub struct Schema {
Expand All @@ -21,5 +29,47 @@ pub struct Schema {
}

pub fn register(db: *mut sqlite::sqlite3, state: Rc<DatabaseState>) -> Result<(), ResultCode> {
management::register(db, state)
management::register(db, state)?;

{
fn create_trigger(
context: *mut sqlite::context,
args: &[*mut sqlite::value],
) -> Result<(), PowerSyncError> {
// Args: Table (JSON), trigger_name, write_type
let table: RawTable =
serde_json::from_str(args[0].text()).map_err(PowerSyncError::as_argument_error)?;
let trigger_name = args[1].text();
let write_type: WriteType = args[2].text().parse()?;

let db = context.db_handle();
let create_trigger_stmt =
generate_raw_table_trigger(db, &table, trigger_name, write_type)?;
db.exec_safe(&create_trigger_stmt).into_db_result(db)?;
Ok(())
}

extern "C" fn create_raw_trigger_sqlite(
context: *mut sqlite::context,
argc: i32,
args: *mut *mut sqlite::value,
) {
let args = args!(argc, args);
if let Err(e) = create_trigger(context, args) {
e.apply_to_ctx("powersync_create_raw_table_crud_trigger", context);
}
}

db.create_function_v2(
"powersync_create_raw_table_crud_trigger",
3,
sqlite::UTF8,
None,
Some(create_raw_trigger_sqlite),
None,
None,
None,
)?;
}
Ok(())
}
165 changes: 165 additions & 0 deletions crates/core/src/schema/raw_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use core::fmt::{self, Formatter, Write, from_fn};

use alloc::{
format,
string::{String, ToString},
vec,
vec::Vec,
};
use powersync_sqlite_nostd::{Connection, Destructor, ResultCode};

use crate::{
error::PowerSyncError,
schema::{ColumnFilter, RawTable, SchemaTable},
utils::{InsertIntoCrud, SqlBuffer, WriteType},
views::table_columns_to_json_object,
};

pub struct InferredTableStructure {
pub columns: Vec<String>,
}

impl InferredTableStructure {
pub fn read_from_database(
table_name: &str,
db: impl Connection,
ignored_local_columns: &ColumnFilter,
) -> Result<Option<Self>, PowerSyncError> {
let stmt = db.prepare_v2("select name from pragma_table_info(?)")?;
stmt.bind_text(1, table_name, Destructor::STATIC)?;

let mut has_id_column = false;
let mut columns = vec![];

while let ResultCode::ROW = stmt.step()? {
let name = stmt.column_text(0)?;
if name == "id" {
has_id_column = true;
} else if !ignored_local_columns.matches(name) {
columns.push(name.to_string());
}
}

if !has_id_column && columns.is_empty() {
Ok(None)
} else if !has_id_column {
Err(PowerSyncError::argument_error(format!(
"Table {table_name} has no id column."
)))
} else {
Ok(Some(Self { columns }))
}
}
}

/// Generates a `CREATE TRIGGER` statement to capture writes on raw tables and to forward them to
/// ps-crud.
pub fn generate_raw_table_trigger(
db: impl Connection,
table: &RawTable,
trigger_name: &str,
write: WriteType,
) -> Result<String, PowerSyncError> {
let Some(local_table_name) = table.schema.table_name.as_ref() else {
return Err(PowerSyncError::argument_error("Table has no local name"));
};

let local_only_columns = &table.schema.local_only_columns;
let Some(resolved_table) =
InferredTableStructure::read_from_database(local_table_name, db, local_only_columns)?
else {
return Err(PowerSyncError::argument_error(format!(
"Could not find {} in local schema",
local_table_name
)));
};

let as_schema_table = SchemaTable::Raw {
definition: table,
schema: &resolved_table,
};

let mut buffer = SqlBuffer::new();
buffer.create_trigger("", trigger_name);
buffer.trigger_after(write, local_table_name);
// Skip the trigger for writes during sync_local, these aren't crud writes.
buffer.push_str("WHEN NOT powersync_in_sync_operation()");

if write == WriteType::Update && !local_only_columns.as_ref().is_empty() {
buffer.push_str(" AND\n(");
// If we have local-only columns, we want to add additional WHEN clauses to ensure the
// trigger runs for updates on synced columns.
for (i, name) in as_schema_table.column_names().enumerate() {
if i != 0 {
buffer.push_str(" OR ");
}

// Generate OLD."column" IS NOT NEW."column"
buffer.push_str("OLD.");
let _ = buffer.identifier().write_str(name);
buffer.push_str(" IS NOT NEW.");
let _ = buffer.identifier().write_str(name);
}
buffer.push_str(")");
}

buffer.push_str(" BEGIN\n");

if table.schema.options.flags.insert_only() {
if write != WriteType::Insert {
// Prevent illegal writes to a table marked as insert-only by raising errors here.
buffer.push_str("SELECT RAISE(FAIL, 'Unexpected update on insert-only table');\n");
} else {
// Write directly to powersync_crud_ to skip writing the $local bucket for insert-only
// tables.
let fragment = table_columns_to_json_object("NEW", &as_schema_table)?;
buffer.powersync_crud_manual_put(&table.name, &fragment);
}
} else {
if write == WriteType::Update {
// Updates must not change the id.
buffer.check_id_not_changed();
}

let json_fragment_new = table_columns_to_json_object("NEW", &as_schema_table)?;
let json_fragment_old = if write == WriteType::Update {
Some(table_columns_to_json_object("OLD", &as_schema_table)?)
} else {
None
};

let write_data = from_fn(|f: &mut Formatter| -> fmt::Result {
write!(f, "json(powersync_diff(")?;

if let Some(ref old) = json_fragment_old {
f.write_str(old)?;
} else {
// We don't have OLD values for inserts, we diff from an empty JSON object
// instead.
f.write_str("'{}'")?;
};

write!(f, ", {json_fragment_new}))")
});

buffer.insert_into_powersync_crud(InsertIntoCrud {
op: write,
table: &as_schema_table,
id_expr: if write == WriteType::Delete {
"OLD.id"
} else {
"NEW.id"
},
type_name: &table.name,
data: match write {
// There is no data for deleted rows.
WriteType::Delete => None,
_ => Some(&write_data),
},
metadata: None::<&'static str>,
})?;
}

buffer.trigger_end();
Ok(buffer.sql)
}
Loading