Skip to content

Commit 310d8eb

Browse files
kimgefjon
andauthored
[teams 4/5] SQL authorization (#3525)
Permissions for evaluating SQL/DML are not generally "actions", but more a set of permissions that are checked during evaluation. To make this work with the teams feature, this patch extends `AuthCtx` to allow checking a set of permissions as mandated by the spec. This set is a bit more fine-grained than "is owner", so as to avoid baking in the concept of teams/collaborators, or assumptions about what a role might entail. Both are likely to evolve in the future, so evaluation of permissions / capabilities should be confined to the impl of the `Authorization` trait. Unlike "actions", the `AuthCtx` must be able to evaluate permission checks quickly and without side-effects, nor can it enter an `async` context. In that sense, it is precomputed (if you will), and stored as a closure in the `AuthCtx` for external authorization. A challenge posed is how to thread through the constructed `AuthCtx` for subscriptions. A tempting approach would have been to equip the `HostController` with the ability to summon an `AuthCtx`. That, however, would have created a gnarly circular dependency, because the `HostController` also controls the controldb, which itself demands an `AuthCtx`. Instead, the `AuthCtx` is obtained in the endpoint handler and passed to each method call that requires one. That's less pretty, but more effective. --------- Signed-off-by: Kim Altintop <kim@eagain.io> Co-authored-by: Phoebe Goldman <phoebe@clockworklabs.io>
1 parent 4143c15 commit 310d8eb

File tree

26 files changed

+415
-215
lines changed

26 files changed

+415
-215
lines changed

crates/client-api/src/lib.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,20 @@ pub trait Authorization {
505505
database: Identity,
506506
action: Action,
507507
) -> impl Future<Output = Result<(), Unauthorized>> + Send;
508+
509+
/// Obtain an attenuated [AuthCtx] for `subject` to evaluate SQL against
510+
/// `database`.
511+
///
512+
/// "SQL" includes the sql endpoint, pg wire connections, as well as
513+
/// subscription queries.
514+
///
515+
/// If any SQL should be rejected outright, or the authorization database
516+
/// is not available, return `Err(Unauthorized)`.
517+
fn authorize_sql(
518+
&self,
519+
subject: Identity,
520+
database: Identity,
521+
) -> impl Future<Output = Result<AuthCtx, Unauthorized>> + Send;
508522
}
509523

510524
impl<T: Authorization> Authorization for Arc<T> {
@@ -516,6 +530,14 @@ impl<T: Authorization> Authorization for Arc<T> {
516530
) -> impl Future<Output = Result<(), Unauthorized>> + Send {
517531
(**self).authorize_action(subject, database, action)
518532
}
533+
534+
fn authorize_sql(
535+
&self,
536+
subject: Identity,
537+
database: Identity,
538+
) -> impl Future<Output = Result<AuthCtx, Unauthorized>> + Send {
539+
(**self).authorize_sql(subject, database)
540+
}
519541
}
520542

521543
pub fn log_and_500(e: impl std::fmt::Display) -> ErrorResponse {

crates/client-api/src/routes/database.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use spacetimedb::host::UpdateDatabaseResult;
2929
use spacetimedb::host::{FunctionArgs, MigratePlanResult};
3030
use spacetimedb::host::{ModuleHost, ReducerOutcome};
3131
use spacetimedb::host::{ProcedureCallError, ReducerCallError};
32-
use spacetimedb::identity::{AuthCtx, Identity};
32+
use spacetimedb::identity::Identity;
3333
use spacetimedb::messages::control_db::{Database, HostType};
3434
use spacetimedb_client_api_messages::http::SqlStmtResult;
3535
use spacetimedb_client_api_messages::name::{
@@ -530,15 +530,16 @@ pub async fn sql_direct<S>(
530530
sql: String,
531531
) -> axum::response::Result<Vec<SqlStmtResult<ProductValue>>>
532532
where
533-
S: NodeDelegate + ControlStateDelegate,
533+
S: NodeDelegate + ControlStateDelegate + Authorization,
534534
{
535535
// Anyone is authorized to execute SQL queries. The SQL engine will determine
536536
// which queries this identity is allowed to execute against the database.
537537

538538
let (host, database) = find_leader_and_database(&worker_ctx, name_or_identity).await?;
539539

540-
let auth = AuthCtx::new(database.owner_identity, caller_identity);
541-
log::debug!("auth: {auth:?}");
540+
let auth = worker_ctx
541+
.authorize_sql(caller_identity, database.database_identity)
542+
.await?;
542543

543544
host.exec_sql(auth, database, confirmed, sql).await
544545
}
@@ -551,7 +552,7 @@ pub async fn sql<S>(
551552
body: String,
552553
) -> axum::response::Result<impl IntoResponse>
553554
where
554-
S: NodeDelegate + ControlStateDelegate,
555+
S: NodeDelegate + ControlStateDelegate + Authorization,
555556
{
556557
let json = sql_direct(worker_ctx, name_or_identity, params, auth.claims.identity, body).await?;
557558

crates/client-api/src/routes/subscribe.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use crate::util::websocket::{
4949
CloseCode, CloseFrame, Message as WsMessage, WebSocketConfig, WebSocketStream, WebSocketUpgrade, WsError,
5050
};
5151
use crate::util::{NameOrIdentity, XForwardedFor};
52-
use crate::{log_and_500, ControlStateDelegate, NodeDelegate};
52+
use crate::{log_and_500, Authorization, ControlStateDelegate, NodeDelegate};
5353

5454
#[allow(clippy::declare_interior_mutable_const)]
5555
pub const TEXT_PROTOCOL: HeaderValue = HeaderValue::from_static(ws_api::TEXT_PROTOCOL);
@@ -106,7 +106,7 @@ pub async fn handle_websocket<S>(
106106
ws: WebSocketUpgrade,
107107
) -> axum::response::Result<impl IntoResponse>
108108
where
109-
S: NodeDelegate + ControlStateDelegate + HasWebSocketOptions,
109+
S: NodeDelegate + ControlStateDelegate + HasWebSocketOptions + Authorization,
110110
{
111111
if connection_id.is_some() {
112112
// TODO: Bump this up to `log::warn!` after removing the client SDKs' uses of that parameter.
@@ -125,6 +125,7 @@ where
125125
}
126126

127127
let db_identity = name_or_identity.resolve(&ctx).await?;
128+
let sql_auth = ctx.authorize_sql(auth.claims.identity, db_identity).await?;
128129

129130
let (res, ws_upgrade, protocol) =
130131
ws.select_protocol([(BIN_PROTOCOL, Protocol::Binary), (TEXT_PROTOCOL, Protocol::Text)]);
@@ -218,6 +219,7 @@ where
218219
let client = ClientConnection::spawn(
219220
client_id,
220221
auth.into(),
222+
sql_auth,
221223
client_config,
222224
leader.replica_id,
223225
module_rx,

crates/core/src/client/client_connection.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use spacetimedb_client_api_messages::websocket::{
2929
UnsubscribeMulti,
3030
};
3131
use spacetimedb_durability::{DurableOffset, TxOffset};
32-
use spacetimedb_lib::identity::RequestId;
32+
use spacetimedb_lib::identity::{AuthCtx, RequestId};
3333
use spacetimedb_lib::metrics::ExecutionMetrics;
3434
use spacetimedb_lib::Identity;
3535
use tokio::sync::mpsc::error::{SendError, TrySendError};
@@ -424,6 +424,7 @@ pub struct ClientConnection {
424424
sender: Arc<ClientConnectionSender>,
425425
pub replica_id: u64,
426426
module_rx: watch::Receiver<ModuleHost>,
427+
auth: AuthCtx,
427428
}
428429

429430
impl Deref for ClientConnection {
@@ -675,9 +676,11 @@ impl ClientConnection {
675676
/// to verify that the database at `module_rx` approves of this connection,
676677
/// and should not invoke this method if that call returns an error,
677678
/// and pass the returned [`Connected`] as `_proof_of_client_connected_call`.
679+
#[allow(clippy::too_many_arguments)]
678680
pub async fn spawn<Fut>(
679681
id: ClientActorId,
680682
auth: ConnectionAuthCtx,
683+
sql_auth: AuthCtx,
681684
config: ClientConfig,
682685
replica_id: u64,
683686
mut module_rx: watch::Receiver<ModuleHost>,
@@ -735,6 +738,7 @@ impl ClientConnection {
735738
sender,
736739
replica_id,
737740
module_rx,
741+
auth: sql_auth,
738742
};
739743

740744
let actor_fut = actor(this.clone(), receiver);
@@ -750,10 +754,12 @@ impl ClientConnection {
750754
replica_id: u64,
751755
module_rx: watch::Receiver<ModuleHost>,
752756
) -> Self {
757+
let auth = AuthCtx::new(module_rx.borrow().database_info().database_identity, id.identity);
753758
Self {
754759
sender: Arc::new(ClientConnectionSender::dummy(id, config, module_rx.clone())),
755760
replica_id,
756761
module_rx,
762+
auth,
757763
}
758764
}
759765

@@ -868,7 +874,7 @@ impl ClientConnection {
868874
.on_module_thread_async("subscribe_single", async move || {
869875
let host = me.module();
870876
host.subscriptions()
871-
.add_single_subscription(Some(&host), me.sender, subscription, timer, None)
877+
.add_single_subscription(Some(&host), me.sender, me.auth.clone(), subscription, timer, None)
872878
.await
873879
})
874880
.await?
@@ -879,7 +885,7 @@ impl ClientConnection {
879885
asyncify(move || {
880886
me.module()
881887
.subscriptions()
882-
.remove_single_subscription(me.sender, request, timer)
888+
.remove_single_subscription(me.sender, me.auth.clone(), request, timer)
883889
})
884890
.await
885891
}
@@ -894,7 +900,7 @@ impl ClientConnection {
894900
.on_module_thread_async("subscribe_multi", async move || {
895901
let host = me.module();
896902
host.subscriptions()
897-
.add_multi_subscription(Some(&host), me.sender, request, timer, None)
903+
.add_multi_subscription(Some(&host), me.sender, me.auth.clone(), request, timer, None)
898904
.await
899905
})
900906
.await?
@@ -910,7 +916,7 @@ impl ClientConnection {
910916
.on_module_thread("unsubscribe_multi", move || {
911917
me.module()
912918
.subscriptions()
913-
.remove_multi_subscription(me.sender, request, timer)
919+
.remove_multi_subscription(me.sender, me.auth.clone(), request, timer)
914920
})
915921
.await?
916922
}
@@ -921,7 +927,7 @@ impl ClientConnection {
921927
.on_module_thread_async("subscribe", async move || {
922928
let host = me.module();
923929
host.subscriptions()
924-
.add_legacy_subscriber(Some(&host), me.sender, subscription, timer, None)
930+
.add_legacy_subscriber(Some(&host), me.sender, me.auth.clone(), subscription, timer, None)
925931
.await
926932
})
927933
.await?
@@ -935,7 +941,7 @@ impl ClientConnection {
935941
) -> Result<(), anyhow::Error> {
936942
self.module()
937943
.one_off_query::<JsonFormat>(
938-
self.id.identity,
944+
self.auth.clone(),
939945
query.to_owned(),
940946
self.sender.clone(),
941947
message_id.to_owned(),
@@ -953,7 +959,7 @@ impl ClientConnection {
953959
) -> Result<(), anyhow::Error> {
954960
self.module()
955961
.one_off_query::<BsatnFormat>(
956-
self.id.identity,
962+
self.auth.clone(),
957963
query.to_owned(),
958964
self.sender.clone(),
959965
message_id.to_owned(),

crates/core/src/db/relational_db.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,10 @@ impl RelationalDB {
629629
self.database_identity
630630
}
631631

632+
pub fn owner_identity(&self) -> Identity {
633+
self.owner_identity
634+
}
635+
632636
/// The number of bytes on disk occupied by the durability layer.
633637
///
634638
/// If this is an in-memory instance, `Ok(0)` is returned.

crates/core/src/host/host_controller.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -553,12 +553,7 @@ async fn make_replica_ctx(
553553
send_worker_queue.clone(),
554554
)));
555555
let downgraded = Arc::downgrade(&subscriptions);
556-
let subscriptions = ModuleSubscriptions::new(
557-
relational_db.clone(),
558-
subscriptions,
559-
send_worker_queue,
560-
database.owner_identity,
561-
);
556+
let subscriptions = ModuleSubscriptions::new(relational_db.clone(), subscriptions, send_worker_queue);
562557

563558
// If an error occurs when evaluating a subscription,
564559
// we mark each client that was affected,

crates/core/src/host/module_host.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1715,7 +1715,7 @@ impl ModuleHost {
17151715
#[tracing::instrument(level = "trace", skip_all)]
17161716
pub async fn one_off_query<F: BuildableWebsocketFormat>(
17171717
&self,
1718-
caller_identity: Identity,
1718+
auth: AuthCtx,
17191719
query: String,
17201720
client: Arc<ClientConnectionSender>,
17211721
message_id: Vec<u8>,
@@ -1726,7 +1726,6 @@ impl ModuleHost {
17261726
let replica_ctx = self.replica_ctx();
17271727
let db = replica_ctx.relational_db.clone();
17281728
let subscriptions = replica_ctx.subscriptions.clone();
1729-
let auth = AuthCtx::new(replica_ctx.owner_identity, caller_identity);
17301729
log::debug!("One-off query: {query}");
17311730
let metrics = self
17321731
.on_module_thread("one_off_query", move || {

crates/core/src/sql/ast.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
66
use spacetimedb_datastore::system_tables::{StRowLevelSecurityFields, ST_ROW_LEVEL_SECURITY_ID};
77
use spacetimedb_expr::check::SchemaView;
88
use spacetimedb_expr::statement::compile_sql_stmt;
9-
use spacetimedb_lib::db::auth::StAccess;
109
use spacetimedb_lib::identity::AuthCtx;
1110
use spacetimedb_primitives::{ColId, TableId};
1211
use spacetimedb_sats::{AlgebraicType, AlgebraicValue};
@@ -492,22 +491,20 @@ impl<T> Deref for SchemaViewer<'_, T> {
492491

493492
impl<T: StateView> SchemaView for SchemaViewer<'_, T> {
494493
fn table_id(&self, name: &str) -> Option<TableId> {
495-
let AuthCtx { owner, caller } = self.auth;
496494
// Get the schema from the in-memory state instead of fetching from the database for speed
497495
self.tx
498496
.table_id_from_name(name)
499497
.ok()
500498
.flatten()
501499
.and_then(|table_id| self.schema_for_table(table_id))
502-
.filter(|schema| schema.table_access == StAccess::Public || caller == owner)
500+
.filter(|schema| self.auth.has_read_access(schema.table_access))
503501
.map(|schema| schema.table_id)
504502
}
505503

506504
fn schema_for_table(&self, table_id: TableId) -> Option<Arc<TableOrViewSchema>> {
507-
let AuthCtx { owner, caller } = self.auth;
508505
self.tx
509506
.get_schema(table_id)
510-
.filter(|schema| schema.table_access == StAccess::Public || caller == owner)
507+
.filter(|schema| self.auth.has_read_access(schema.table_access))
511508
.map(Arc::clone)
512509
.map(TableOrViewSchema::from)
513510
.map(Arc::new)

0 commit comments

Comments
 (0)