Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions crates/core/src/subscription/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use spacetimedb_subscription::SubscriptionPlan;
use crate::host::module_host::UpdatesRelValue;

/// Evaluate a subscription over a delta update.
/// Returns `None` for empty updates.
///
/// IMPORTANT: This does and must implement bag semantics.
/// That is, we must not remove duplicate rows.
Expand All @@ -18,7 +19,7 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
tx: &'a Tx,
metrics: &mut ExecutionMetrics,
plan: &SubscriptionPlan,
) -> Result<UpdatesRelValue<'a>> {
) -> Result<Option<UpdatesRelValue<'a>>> {
let mut inserts = vec![];
let mut deletes = vec![];

Expand All @@ -32,5 +33,10 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
Ok(())
})?;

Ok(UpdatesRelValue { inserts, deletes })
// Return `None` for empty updates
if inserts.is_empty() && deletes.is_empty() {
return Ok(None);
}

Ok(Some(UpdatesRelValue { inserts, deletes }))
}
48 changes: 48 additions & 0 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,9 @@ mod tests {

let table_update = tables.pop().unwrap();

// We should not be sending empty updates to clients
assert_ne!(table_update.num_rows, 0);

// It should be the table we expect
assert_eq!(table_update.table_id, table_id);

Expand Down Expand Up @@ -1266,6 +1269,51 @@ mod tests {
Ok(())
}

/// Test that we do not send empty updates to clients
#[tokio::test]
async fn test_no_empty_updates() -> anyhow::Result<()> {
// Establish a client connection
let (tx, mut rx) = client_connection(client_id_from_u8(1));

let db = relational_db()?;
let subs = module_subscriptions(db.clone());

let schema = [("x", AlgebraicType::U8)];

let t_id = db.create_table_for_test("t", &schema, &[])?;

// Subscribe to rows of `t` where `x` is 0
subscribe(&subs, "select * from t where x = 0", Arc::new(tx), &mut 0)?;

// Wait to receive the initial subscription message
assert!(matches!(rx.recv().await, Some(SerializableMessage::Subscription(_))));

// Insert a row that does not match the query
let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
db.insert(&mut tx, t_id, &bsatn::to_vec(&product![1_u8])?)?;

assert!(matches!(
subs.commit_and_broadcast_event(None, module_event(), tx),
Ok(Ok(_))
));

// Insert a row that does match the query
let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
db.insert(&mut tx, t_id, &bsatn::to_vec(&product![0_u8])?)?;

assert!(matches!(
subs.commit_and_broadcast_event(None, module_event(), tx),
Ok(Ok(_))
));

let schema = ProductType::from([AlgebraicType::U8]);

// If the server sends empty updates, this assertion will fail,
// because we will receive one for the first transaction.
assert_tx_update_for_table(&mut rx, t_id, &schema, [product![0_u8]], []).await;
Ok(())
}

/// Asserts that a subscription holds a tx handle for the entire length of its evaluation.
#[test]
fn test_tx_subscription_ordering() -> ResultTest<()> {
Expand Down
65 changes: 34 additions & 31 deletions crates/core/src/subscription/module_subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,43 +540,46 @@ impl SubscriptionManager {
(update, num_rows)
}

let updates = eval_delta(tx, &mut metrics, plan)
.map_err(|err| {
let updates = match eval_delta(tx, &mut metrics, plan) {
Err(err) => {
tracing::error!(
message = "Query errored during tx update",
sql = sql,
reason = ?err,
);
self.clients_for_query(hash)
Err(self
.clients_for_query(hash)
.map(|id| (id, err.to_string().into_boxed_str()))
.collect::<Vec<_>>()
})
.map(|delta_updates| {
self.clients_for_query(hash)
.map(|id| {
let client = &self.clients[id].outbound_ref;
let update = match client.config.protocol {
Protocol::Binary => Bsatn(memo_encode::<BsatnFormat>(
&delta_updates,
client,
match client.config.compression {
Compression::Brotli => &mut ops_bin_brotli,
Compression::Gzip => &mut ops_bin_gzip,
Compression::None => &mut ops_bin_none,
},
&mut metrics,
)),
Protocol::Text => Json(memo_encode::<JsonFormat>(
&delta_updates,
client,
&mut ops_json,
&mut metrics,
)),
};
(id, table_id, table_name.clone(), update)
})
.collect::<Vec<_>>()
});
.collect::<Vec<_>>())
}
// The query didn't return any rows to update
Ok(None) => Ok(vec![]),
Ok(Some(delta_updates)) => Ok(self
.clients_for_query(hash)
.map(|id| {
let client = &self.clients[id].outbound_ref;
let update = match client.config.protocol {
Protocol::Binary => Bsatn(memo_encode::<BsatnFormat>(
&delta_updates,
client,
match client.config.compression {
Compression::Brotli => &mut ops_bin_brotli,
Compression::Gzip => &mut ops_bin_gzip,
Compression::None => &mut ops_bin_none,
},
&mut metrics,
)),
Protocol::Text => Json(memo_encode::<JsonFormat>(
&delta_updates,
client,
&mut ops_json,
&mut metrics,
)),
};
(id, table_id, table_name.clone(), update)
})
.collect::<Vec<_>>()),
};

(updates, metrics)
})
Expand Down
Loading