Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use futures::StreamExt;
use http::StatusCode;
use serde::Deserialize;
use spacetimedb::database_logger::DatabaseLogger;
use spacetimedb::host::module_host::ClientConnectedError;
use spacetimedb::host::ReducerArgs;
use spacetimedb::host::ReducerCallError;
use spacetimedb::host::ReducerOutcome;
Expand Down Expand Up @@ -74,11 +75,34 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
// so generate one.
let connection_id = generate_random_connection_id();

if let Err(e) = module
.call_identity_connected_disconnected(caller_identity, connection_id, true)
.await
{
return Err((StatusCode::NOT_FOUND, format!("{:#}", anyhow::anyhow!(e))).into());
match module.call_identity_connected(caller_identity, connection_id).await {
// If `call_identity_connected` returns `Err(Rejected)`, then the `client_connected` reducer errored,
// meaning the connection was refused. Return 403 forbidden.
Err(ClientConnectedError::Rejected(msg)) => return Err((StatusCode::FORBIDDEN, msg).into()),
// If `call_identity_connected` returns `Err(OutOfEnergy)`,
// then, well, the database is out of energy.
// Return 503 service unavailable.
Err(err @ ClientConnectedError::OutOfEnergy) => {
return Err((StatusCode::SERVICE_UNAVAILABLE, err.to_string()).into())
}
// If `call_identity_connected` returns `Err(ReducerCall)`,
// something went wrong while invoking the `client_connected` reducer.
// I (pgoldman 2025-03-27) am not really sure how this would happen,
// but we returned 404 not found in this case prior to my editing this code,
// so I guess let's keep doing that.
Err(ClientConnectedError::ReducerCall(e)) => {
return Err((StatusCode::NOT_FOUND, format!("{:#}", anyhow::anyhow!(e))).into())
}
// If `call_identity_connected` returns `Err(DBError)`,
// then the module didn't define `client_connected`,
// but something went wrong when we tried to insert into `st_client`.
// That's weird and scary, so return 500 internal error.
Err(e @ ClientConnectedError::DBError(_)) => {
return Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into())
}

// If `call_identity_connected` returns `Ok`, then we can actually call the reducer we want.
Ok(()) => (),
}
let result = match module
.call_reducer(caller_identity, Some(connection_id), None, None, None, &reducer, args)
Expand Down Expand Up @@ -107,11 +131,12 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
}
};

if let Err(e) = module
.call_identity_connected_disconnected(caller_identity, connection_id, false)
.await
{
return Err((StatusCode::NOT_FOUND, format!("{:#}", anyhow::anyhow!(e))).into());
if let Err(e) = module.call_identity_disconnected(caller_identity, connection_id).await {
// If `call_identity_disconnected` errors, something is very wrong:
// it means we tried to delete the `st_client` row but failed.
// Note that `call_identity_disconnected` swallows errors from the `client_disconnected` reducer.
// Slap a 500 on it and pray.
return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("{:#}", anyhow::anyhow!(e))).into());
}

match result {
Expand Down
7 changes: 6 additions & 1 deletion crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use scopeguard::ScopeGuard;
use serde::Deserialize;
use spacetimedb::client::messages::{serialize, IdentityTokenMessage, SerializableMessage};
use spacetimedb::client::{ClientActorId, ClientConfig, ClientConnection, DataMessage, MessageHandleError, Protocol};
use spacetimedb::host::module_host::ClientConnectedError;
use spacetimedb::host::NoSuchModule;
use spacetimedb::util::also_poll;
use spacetimedb::worker_metrics::WORKER_METRICS;
Expand Down Expand Up @@ -148,7 +149,11 @@ where
let client = match ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor).await
{
Ok(s) => s,
Err(e) => {
Err(e @ (ClientConnectedError::Rejected(_) | ClientConnectedError::OutOfEnergy)) => {
log::info!("{e}");
return;
}
Err(e @ (ClientConnectedError::DBError(_) | ClientConnectedError::ReducerCall(_))) => {
log::warn!("ModuleHost died while we were connecting: {e:#}");
return;
}
Expand Down
7 changes: 3 additions & 4 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::time::Instant;
use super::messages::{OneOffQueryResponseMessage, SerializableMessage};
use super::{message_handlers, ClientActorId, MessageHandleError};
use crate::error::DBError;
use crate::host::module_host::ClientConnectedError;
use crate::host::{ModuleHost, NoSuchModule, ReducerArgs, ReducerCallError, ReducerCallResult};
use crate::messages::websocket::Subscribe;
use crate::util::prometheus_handle::IntGaugeExt;
Expand Down Expand Up @@ -171,7 +172,7 @@ impl ClientConnection {
replica_id: u64,
mut module_rx: watch::Receiver<ModuleHost>,
actor: impl FnOnce(ClientConnection, mpsc::Receiver<SerializableMessage>) -> Fut,
) -> Result<ClientConnection, ReducerCallError>
) -> Result<ClientConnection, ClientConnectedError>
where
Fut: Future<Output = ()> + Send + 'static,
{
Expand All @@ -180,9 +181,7 @@ impl ClientConnection {
// logically subscribed to the database, not any particular replica. We should handle failover for
// them and stuff. Not right now though.
let module = module_rx.borrow_and_update().clone();
module
.call_identity_connected_disconnected(id.identity, id.connection_id, true)
.await?;
module.call_identity_connected(id.identity, id.connection_id).await?;

let (sendtx, sendrx) = mpsc::channel::<SerializableMessage>(CLIENT_CHANNEL_CAPACITY);

Expand Down
43 changes: 37 additions & 6 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use super::{
SharedMutexGuard, SharedWriteGuard,
};
use crate::db::datastore::system_tables::{
with_sys_table_buf, StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StFields as _, StIndexFields,
StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, StScheduledFields, StScheduledRow, StSequenceFields,
StSequenceRow, StTableFields, StTableRow, SystemTable, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID,
ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID,
with_sys_table_buf, StClientFields, StClientRow, StColumnFields, StColumnRow, StConstraintFields, StConstraintRow,
StFields as _, StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, StScheduledFields,
StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, SystemTable, ST_CLIENT_ID,
ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID,
ST_TABLE_ID,
};
use crate::db::datastore::traits::{RowTypeForTable, TxData};
use crate::db::datastore::{
Expand All @@ -34,9 +35,14 @@ use core::ops::RangeBounds;
use core::{iter, ops::Bound};
use smallvec::SmallVec;
use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore};
use spacetimedb_lib::db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP};
use spacetimedb_lib::{db::raw_def::v9::RawSql, metrics::ExecutionMetrics};
use spacetimedb_primitives::{ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId};
use spacetimedb_lib::{
db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP},
ConnectionId, Identity,
};
use spacetimedb_primitives::{
col_list, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId,
};
use spacetimedb_sats::{
bsatn::{self, to_writer, DecodeError, Deserializer},
de::{DeserializeSeed, WithBound},
Expand Down Expand Up @@ -1230,6 +1236,31 @@ impl<'a> Iterator for IndexScanFilterDeleted<'a> {
}

impl MutTxId {
pub(crate) fn insert_st_client(&mut self, identity: Identity, connection_id: ConnectionId) -> Result<()> {
let row = &StClientRow {
identity: identity.into(),
connection_id: connection_id.into(),
};
self.insert_via_serialize_bsatn(ST_CLIENT_ID, row).map(|_| ())
}

pub(crate) fn delete_st_client(&mut self, identity: Identity, connection_id: ConnectionId) -> Result<()> {
let row = &StClientRow {
identity: identity.into(),
connection_id: connection_id.into(),
};
let ptr = self
.iter_by_col_eq(
ST_CLIENT_ID,
col_list![StClientFields::Identity, StClientFields::ConnectionId],
&AlgebraicValue::product(row),
)?
.next()
.expect("the client should be connected")
.pointer();
self.delete(ST_CLIENT_ID, ptr).map(drop)
}

pub(crate) fn insert_via_serialize_bsatn<'a, T: Serialize>(
&'a mut self,
table_id: TableId,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ impl Host {
// Disconnect dangling clients.
for (identity, connection_id) in connected_clients {
module_host
.call_identity_connected_disconnected(identity, connection_id, false)
.call_identity_disconnected(identity, connection_id)
.await
.with_context(|| {
format!(
Expand Down
Loading
Loading