Skip to content

Commit 0de8910

Browse files
Atomic view update (#3624)
# Description of Changes Updates views atomically on commit, but before downgrading to a read-only transaction for subscription evaluation. What this patch does: 1. Renames `ViewId` to `ViewFnPtr` 2. Renames `ViewDatabaseId` to `ViewId` 3. Removes the `module_rx` module watcher from the subscription manager 4. Refactors read sets to only track table scans (index key tracking will be added later) 5. Drops read sets and removes rows from `st_view_sub` when dropping a view in an auto-migrate 6. Re-evaluates and updates views (`call_views_with_tx`) from `call_reducer_with_tx` for any view whose read set overlaps with the reducer's write set 7. Does the same for sql dml # API and ABI breaking changes None # Expected complexity level and risk 3 It's a bit of a messy diff. # Testing - [x] Integrate with #3616 --------- Signed-off-by: joshua-spacetime <josh@clockworklabs.io> Co-authored-by: Shubham Mishra <shivam828787@gmail.com>
1 parent 6bf3efc commit 0de8910

File tree

32 files changed

+730
-662
lines changed

32 files changed

+730
-662
lines changed

crates/client-api-messages/src/energy.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl fmt::Debug for EnergyBalance {
126126
/// In contrast to [`EnergyQuanta`], this is represented by a 64-bit integer. This makes energy handling
127127
/// for reducers easier, while still providing a unlikely-to-ever-be-reached maximum value (e.g. for wasmtime:
128128
/// `(u64::MAX eV / 1000 eV/instruction) * 3 ns/instruction = 640 days`)
129-
#[derive(Copy, Clone, From, Add, Sub)]
129+
#[derive(Copy, Clone, From, Add, Sub, AddAssign, SubAssign)]
130130
pub struct FunctionBudget(u64);
131131

132132
impl FunctionBudget {

crates/core/src/db/relational_db.rs

Lines changed: 102 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use crate::db::MetricsRecorderQueue;
22
use crate::error::{DBError, DatabaseError, RestoreSnapshotError};
3-
use crate::host::ArgsTuple;
43
use crate::messages::control_db::HostType;
54
use crate::subscription::ExecutionCounters;
65
use crate::util::{asyncify, spawn_rayon};
@@ -9,7 +8,6 @@ use anyhow::{anyhow, Context};
98
use bytes::Bytes;
109
use enum_map::EnumMap;
1110
use fs2::FileExt;
12-
use log::trace;
1311
use spacetimedb_commitlog::repo::OnNewSegmentFn;
1412
use spacetimedb_commitlog::{self as commitlog, SizeOnDisk};
1513
use spacetimedb_data_structures::map::IntSet;
@@ -46,7 +44,9 @@ use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath};
4644
use spacetimedb_primitives::*;
4745
use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type;
4846
use spacetimedb_sats::memory_usage::MemoryUsage;
49-
use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, AlgebraicValue, ProductType, ProductValue, Typespace};
47+
use spacetimedb_sats::{
48+
AlgebraicType, AlgebraicTypeRef, AlgebraicValue, ProductType, ProductValue, Typespace, WithTypespace,
49+
};
5050
use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef};
5151
use spacetimedb_schema::schema::{
5252
ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema,
@@ -1078,11 +1078,11 @@ impl RelationalDB {
10781078
tx: &mut MutTx,
10791079
module_def: &ModuleDef,
10801080
view_def: &ViewDef,
1081-
) -> Result<(ViewDatabaseId, TableId), DBError> {
1081+
) -> Result<(ViewId, TableId), DBError> {
10821082
Ok(tx.create_view(module_def, view_def)?)
10831083
}
10841084

1085-
pub fn drop_view(&self, tx: &mut MutTx, view_id: ViewDatabaseId) -> Result<(), DBError> {
1085+
pub fn drop_view(&self, tx: &mut MutTx, view_id: ViewId) -> Result<(), DBError> {
10861086
Ok(tx.drop_view(view_id)?)
10871087
}
10881088

@@ -1173,7 +1173,7 @@ impl RelationalDB {
11731173
Ok(self.inner.rename_table_mut_tx(tx, table_id, new_name)?)
11741174
}
11751175

1176-
pub fn view_id_from_name_mut(&self, tx: &MutTx, view_name: &str) -> Result<Option<ViewDatabaseId>, DBError> {
1176+
pub fn view_id_from_name_mut(&self, tx: &MutTx, view_name: &str) -> Result<Option<ViewId>, DBError> {
11771177
Ok(self.inner.view_id_from_name_mut_tx(tx, view_name)?)
11781178
}
11791179

@@ -1509,93 +1509,122 @@ impl RelationalDB {
15091509
})
15101510
}
15111511

1512-
/// Materialize View backing table.
1512+
/// Write `bytes` into a (sender) view's backing table.
15131513
///
15141514
/// # Process
1515-
/// 1. Serializes view arguments into `ST_VIEW_ARG_ID`
1516-
/// 2. Deletes stale rows matching the view arguments
1517-
/// 3. Deserializes the new view execution results
1518-
/// 4. Inserts fresh rows with the corresponding arg_id
1515+
/// 1. Delete all rows for `sender` from the view's backing table
1516+
/// 2. Deserialize `bytes`
1517+
/// 3. Insert the new rows into the backing table
15191518
///
15201519
/// # Arguments
15211520
/// * `tx` - Mutable transaction context
1522-
/// * `view` - Name of the view to update
1523-
/// * `args` - Arguments passed to the view call
1524-
/// * `return_type` - Expected return type of the view
1525-
/// * `bytes` - Serialized (bsatn encoded) return value from view execution
1521+
/// * `table_id` - The id of the view's backing table
1522+
/// * `sender` - The calling identity of the view being updated
1523+
/// * `row_type` - Expected return type of the view
1524+
/// * `bytes` - An array of product values (bsatn encoded)
15261525
/// * `typespace` - Type information for deserialization
1527-
/// * `caller_identity` - Identity of the caller (for non-anonymous views)
15281526
#[allow(clippy::too_many_arguments)]
15291527
pub fn materialize_view(
15301528
&self,
15311529
tx: &mut MutTxId,
1532-
view: &str,
1533-
args: ArgsTuple,
1534-
return_type: AlgebraicTypeRef,
1530+
table_id: TableId,
1531+
sender: Identity,
1532+
row_type: AlgebraicTypeRef,
15351533
bytes: Bytes,
15361534
typespace: &Typespace,
1537-
caller_identity: Identity,
15381535
) -> Result<(), DBError> {
1539-
// Fetch view metadata
1540-
let st_view_row = tx.lookup_st_view_by_name(view)?;
1541-
let table_id = st_view_row.table_id.expect("View table must exist for materialization");
1542-
let view_id = st_view_row.view_id;
1543-
let is_anonymous = st_view_row.is_anonymous;
1544-
let arg_id = tx.get_or_insert_st_view_arg(args.get_bsatn())?;
1545-
1546-
// Build the filter key for identifying rows to update
1547-
let mut input_args = Vec::new();
1548-
if !is_anonymous {
1549-
input_args.push(AlgebraicValue::OptionSome(caller_identity.into()));
1550-
}
1551-
if !tx.is_view_parameterized(view_id)? {
1552-
input_args.push(AlgebraicValue::U64(arg_id));
1553-
}
1554-
let input_args = ProductValue {
1555-
elements: input_args.into_boxed_slice(),
1556-
};
1557-
let col_list: ColList = (0..input_args.elements.len()).collect();
1558-
1559-
// Remove stale View entries
1560-
let rows_to_delete: Vec<_> = self
1561-
.iter_by_col_eq_mut(tx, table_id, col_list, &input_args.clone().into())?
1536+
// Delete rows for `sender` from the backing table
1537+
let rows_to_delete = self
1538+
.iter_by_col_eq_mut(tx, table_id, ColId(0), &sender.into())?
15621539
.map(|res| res.pointer())
1563-
.collect();
1564-
1565-
let deleted_count = self.delete(tx, table_id, rows_to_delete);
1566-
trace!("Deleted {deleted_count} stale rows from view table {table_id} for arg_id {arg_id}");
1567-
1568-
// Deserialize the return value
1569-
let seed = spacetimedb_sats::WithTypespace::new(typespace, &return_type).resolve(return_type);
1570-
let return_val = seed
1540+
.collect::<Vec<_>>();
1541+
self.delete(tx, table_id, rows_to_delete);
1542+
1543+
// Deserialize the return rows.
1544+
// The return type is expected to be an array of products.
1545+
let row_type = typespace.resolve(row_type);
1546+
let ret_type = AlgebraicType::array(row_type.ty().clone());
1547+
let seed = WithTypespace::new(typespace, &ret_type);
1548+
let rows = seed
15711549
.deserialize(bsatn::Deserializer::new(&mut &bytes[..]))
15721550
.map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?;
15731551

1574-
// Extract products from return value (must be array)
1575-
let products: Vec<ProductValue> = return_val
1552+
// Insert new rows into the backing table
1553+
for product in rows
15761554
.into_array()
1577-
.expect("Expected return_val to be an array")
1555+
.map_err(|_| ViewError::SerializeRow)
1556+
.map_err(DatastoreError::from)?
15781557
.into_iter()
1579-
.map(|v| v.into_product().expect("Expected array elements to be ProductValue"))
1580-
.collect();
1581-
1582-
// Insert fresh results into the view table
1583-
let mut elements: Vec<AlgebraicValue> =
1584-
Vec::with_capacity(input_args.elements.len() + products.first().map_or(0, |p| p.elements.len()));
1585-
for product in products {
1586-
elements.clear();
1587-
elements.extend_from_slice(&input_args.elements);
1588-
elements.extend_from_slice(&product.elements);
1558+
{
1559+
let product = product
1560+
.into_product()
1561+
.map_err(|_| ViewError::SerializeRow)
1562+
.map_err(DatastoreError::from)?;
1563+
self.insert(
1564+
tx,
1565+
table_id,
1566+
&ProductValue::from_iter(std::iter::once(sender.into()).chain(product.elements))
1567+
.to_bsatn_vec()
1568+
.map_err(|_| ViewError::SerializeRow)
1569+
.map_err(DatastoreError::from)?,
1570+
)?;
1571+
}
15891572

1590-
let row = ProductValue {
1591-
elements: elements.as_slice().into(),
1592-
};
1573+
Ok(())
1574+
}
15931575

1594-
let row_bytes = row
1595-
.to_bsatn_vec()
1596-
.map_err(|_| DatastoreError::from(ViewError::SerializeRow))?;
1576+
/// Write `bytes` into an anonymous view's backing table.
1577+
///
1578+
/// # Process
1579+
/// 1. Clear the view's backing table
1580+
/// 2. Deserialize `bytes`
1581+
/// 3. Insert the new rows into the backing table
1582+
///
1583+
/// # Arguments
1584+
/// * `tx` - Mutable transaction context
1585+
/// * `table_id` - The id of the view's backing table
1586+
/// * `row_type` - Expected return type of the view
1587+
/// * `bytes` - An array of product values (bsatn encoded)
1588+
/// * `typespace` - Type information for deserialization
1589+
#[allow(clippy::too_many_arguments)]
1590+
pub fn materialize_anonymous_view(
1591+
&self,
1592+
tx: &mut MutTxId,
1593+
table_id: TableId,
1594+
row_type: AlgebraicTypeRef,
1595+
bytes: Bytes,
1596+
typespace: &Typespace,
1597+
) -> Result<(), DBError> {
1598+
// Clear entire backing table
1599+
self.clear_table(tx, table_id)?;
1600+
1601+
// Deserialize the return rows.
1602+
// The return type is expected to be an array of products.
1603+
let row_type = typespace.resolve(row_type);
1604+
let ret_type = AlgebraicType::array(row_type.ty().clone());
1605+
let seed = WithTypespace::new(typespace, &ret_type);
1606+
let rows = seed
1607+
.deserialize(bsatn::Deserializer::new(&mut &bytes[..]))
1608+
.map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?;
15971609

1598-
self.insert(tx, table_id, &row_bytes)?;
1610+
// Insert new rows into the backing table
1611+
for product in rows
1612+
.into_array()
1613+
.map_err(|_| ViewError::SerializeRow)
1614+
.map_err(DatastoreError::from)?
1615+
.into_iter()
1616+
{
1617+
self.insert(
1618+
tx,
1619+
table_id,
1620+
&product
1621+
.into_product()
1622+
.map_err(|_| ViewError::SerializeRow)
1623+
.map_err(DatastoreError::from)?
1624+
.to_bsatn_vec()
1625+
.map_err(|_| ViewError::SerializeRow)
1626+
.map_err(DatastoreError::from)?,
1627+
)?;
15991628
}
16001629

16011630
Ok(())
@@ -2195,7 +2224,7 @@ pub mod tests_utils {
21952224
name: &str,
21962225
schema: &[(&str, AlgebraicType)],
21972226
is_anonymous: bool,
2198-
) -> Result<(ViewDatabaseId, TableId), DBError> {
2227+
) -> Result<(ViewId, TableId), DBError> {
21992228
let mut builder = RawModuleDefV9Builder::new();
22002229

22012230
// Add the view's product type to the typespace
@@ -2314,18 +2343,16 @@ mod tests {
23142343
use super::tests_utils::begin_mut_tx;
23152344
use super::*;
23162345
use crate::db::relational_db::tests_utils::{
2317-
begin_tx, create_view_for_test, insert, make_snapshot, with_auto_commit, with_read_only, TestDB,
2346+
begin_tx, insert, make_snapshot, with_auto_commit, with_read_only, TestDB,
23182347
};
23192348
use anyhow::bail;
2320-
use bytes::Bytes;
23212349
use commitlog::payload::txdata;
23222350
use commitlog::Commitlog;
23232351
use durability::EmptyHistory;
23242352
use pretty_assertions::{assert_eq, assert_matches};
23252353
use spacetimedb_data_structures::map::IntMap;
23262354
use spacetimedb_datastore::error::{DatastoreError, IndexError};
23272355
use spacetimedb_datastore::execution_context::ReducerContext;
2328-
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, ViewCall};
23292356
use spacetimedb_datastore::system_tables::{
23302357
system_tables, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, ST_CONSTRAINT_ID, ST_INDEX_ID,
23312358
ST_SEQUENCE_ID, ST_TABLE_ID,
@@ -2958,57 +2985,6 @@ mod tests {
29582985
Ok(())
29592986
}
29602987

2961-
#[test]
2962-
fn test_is_materialized() -> anyhow::Result<()> {
2963-
let stdb = TestDB::in_memory()?;
2964-
let schema = [("col1", AlgebraicType::I64), ("col2", AlgebraicType::I64)];
2965-
let table_schema = table("MyTable", ProductType::from(schema), |b| b);
2966-
2967-
let view_schema = [("view_col", AlgebraicType::I64)];
2968-
let view_name = "MyView";
2969-
let args: Bytes = vec![].into();
2970-
let sender = Identity::ZERO;
2971-
let (view_id, _) = create_view_for_test(&stdb, view_name, &view_schema, true)?;
2972-
2973-
let mut tx = begin_mut_tx(&stdb);
2974-
let table_id = stdb.create_table(&mut tx, table_schema)?;
2975-
2976-
assert!(
2977-
!tx.is_materialized(view_name, args.clone(), sender)?.0,
2978-
"view should not be materialized as read set is not recorded yet"
2979-
);
2980-
2981-
let view_call = FuncCallType::View(ViewCall::anonymous(view_id, args));
2982-
tx.record_table_scan(&view_call, table_id);
2983-
assert!(
2984-
tx.is_materialized(view_name, vec![].into(), sender)?.0,
2985-
"view should be materialized as read set is recorded"
2986-
);
2987-
stdb.commit_tx(tx)?;
2988-
2989-
let tx = begin_mut_tx(&stdb);
2990-
assert!(
2991-
tx.is_materialized(view_name, vec![].into(), sender)?.0,
2992-
"view should be materialized after commit"
2993-
);
2994-
stdb.commit_tx(tx)?;
2995-
2996-
let mut tx = begin_mut_tx(&stdb);
2997-
stdb.insert(
2998-
&mut tx,
2999-
table_id,
3000-
&product![AlgebraicValue::I64(1), AlgebraicValue::I64(2)].to_bsatn_vec()?,
3001-
)?;
3002-
stdb.commit_tx(tx)?;
3003-
3004-
let tx = begin_mut_tx(&stdb);
3005-
assert!(
3006-
!tx.is_materialized(view_name, vec![].into(), sender)?.0,
3007-
"view should not be materialized after table modification"
3008-
);
3009-
Ok(())
3010-
}
3011-
30122988
#[test]
30132989
/// Test that iteration yields each row only once
30142990
/// in the edge case where a row is committed and has been deleted and re-inserted within the iterating TX.

crates/core/src/host/host_controller.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -171,22 +171,6 @@ impl From<&EventStatus> for ReducerOutcome {
171171
}
172172
}
173173

174-
pub enum ViewOutcome {
175-
Success,
176-
Failed(String),
177-
BudgetExceeded,
178-
}
179-
180-
impl From<EventStatus> for ViewOutcome {
181-
fn from(status: EventStatus) -> Self {
182-
match status {
183-
EventStatus::Committed(_) => ViewOutcome::Success,
184-
EventStatus::Failed(e) => ViewOutcome::Failed(e),
185-
EventStatus::OutOfEnergy => ViewOutcome::BudgetExceeded,
186-
}
187-
}
188-
}
189-
190174
#[derive(Clone, Debug)]
191175
pub struct ProcedureCallResult {
192176
pub return_val: AlgebraicValue,

0 commit comments

Comments
 (0)