Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/client-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ derive_more = "0.99.17"
uuid.workspace = true
jsonwebtoken.workspace = true
scopeguard.workspace = true
spacetimedb-paths.workspace = true

[dev-dependencies]
jsonwebtoken.workspace = true
130 changes: 120 additions & 10 deletions crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,17 @@ use http::StatusCode;

use spacetimedb::client::ClientActorIndex;
use spacetimedb::energy::{EnergyBalance, EnergyQuanta};
use spacetimedb::host::{HostController, UpdateDatabaseResult};
use spacetimedb::identity::Identity;
use spacetimedb::execution_context::Workload;
use spacetimedb::host::{HostController, ModuleHost, NoSuchModule, UpdateDatabaseResult};
use spacetimedb::identity::{AuthCtx, Identity};
use spacetimedb::json::client_api::StmtResultJson;
use spacetimedb::messages::control_db::{Database, HostType, Node, Replica};
use spacetimedb::sql;
use spacetimedb::sql::execute::translate_col;
use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, RegisterTldResult, Tld};
use spacetimedb_lib::ProductTypeElement;
use spacetimedb_paths::server::ModuleLogsDir;
use tokio::sync::watch;

pub mod auth;
pub mod routes;
Expand All @@ -20,13 +27,110 @@ pub mod util;
///
/// Types returned here should be considered internal state and **never** be
/// surfaced to the API.
#[async_trait]
pub trait NodeDelegate: Send + Sync {
fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily>;
fn host_controller(&self) -> &HostController;
fn client_actor_index(&self) -> &ClientActorIndex;

type JwtAuthProviderT: auth::JwtAuthProvider;
fn jwt_auth_provider(&self) -> &Self::JwtAuthProviderT;
/// Return the leader [`Host`] of `database_id`.
///
/// Returns `None` if the current leader is not hosted by this node.
/// The [`Host`] is spawned implicitly if not already running.
async fn leader(&self, database_id: u64) -> anyhow::Result<Option<Host>>;
fn module_logs_dir(&self, replica_id: u64) -> ModuleLogsDir;
}

/// Client view of a running module.
pub struct Host {
pub replica_id: u64,
host_controller: HostController,
}

impl Host {
pub fn new(replica_id: u64, host_controller: HostController) -> Self {
Self {
replica_id,
host_controller,
}
}

pub async fn module(&self) -> Result<ModuleHost, NoSuchModule> {
self.host_controller.get_module_host(self.replica_id).await
}

pub async fn module_watcher(&self) -> Result<watch::Receiver<ModuleHost>, NoSuchModule> {
self.host_controller.watch_module_host(self.replica_id).await
}

pub async fn exec_sql(
&self,
auth: AuthCtx,
database: Database,
body: String,
) -> axum::response::Result<Vec<StmtResultJson>> {
let module_host = self
.module()
.await
.map_err(|_| (StatusCode::NOT_FOUND, "module not found".to_string()))?;

let json = self
.host_controller
.using_database(
database,
self.replica_id,
move |db| -> axum::response::Result<_, (StatusCode, String)> {
tracing::info!(sql = body);
let results =
sql::execute::run(db, &body, auth, Some(&module_host.info().subscriptions)).map_err(|e| {
log::warn!("{}", e);
if let Some(auth_err) = e.get_auth_error() {
(StatusCode::UNAUTHORIZED, auth_err.to_string())
} else {
(StatusCode::BAD_REQUEST, e.to_string())
}
})?;

let json = db.with_read_only(Workload::Sql, |tx| {
results
.into_iter()
.map(|result| {
let rows = result.data;
let schema = result
.head
.fields
.iter()
.map(|x| {
let ty = x.algebraic_type.clone();
let name = translate_col(tx, x.field);
ProductTypeElement::new(ty, name)
})
.collect();
StmtResultJson { schema, rows }
})
.collect::<Vec<_>>()
});

Ok(json)
},
)
.await
.map_err(log_and_500)??;

Ok(json)
}

pub async fn update(
&self,
database: Database,
host_type: HostType,
program_bytes: Box<[u8]>,
) -> anyhow::Result<UpdateDatabaseResult> {
self.host_controller
.update_module_host(database, host_type, self.replica_id, program_bytes)
.await
}
}

/// Parameters for publishing a database.
Expand Down Expand Up @@ -155,9 +259,6 @@ impl<T: ControlStateReadAccess + ?Sized> ControlStateReadAccess for Arc<T> {
fn get_replicas(&self) -> anyhow::Result<Vec<Replica>> {
(**self).get_replicas()
}
fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica> {
(**self).get_leader_replica_by_database(database_id)
}

// Energy
fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>> {
Expand All @@ -172,6 +273,10 @@ impl<T: ControlStateReadAccess + ?Sized> ControlStateReadAccess for Arc<T> {
fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>> {
(**self).reverse_lookup(database_identity)
}

fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica> {
(**self).get_leader_replica_by_database(database_id)
}
}

#[async_trait]
Expand Down Expand Up @@ -209,23 +314,28 @@ impl<T: ControlStateWriteAccess + ?Sized> ControlStateWriteAccess for Arc<T> {
}
}

#[async_trait]
impl<T: NodeDelegate + ?Sized> NodeDelegate for Arc<T> {
type JwtAuthProviderT = T::JwtAuthProviderT;
fn gather_metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
(**self).gather_metrics()
}

fn host_controller(&self) -> &HostController {
(**self).host_controller()
}

fn client_actor_index(&self) -> &ClientActorIndex {
(**self).client_actor_index()
}

fn jwt_auth_provider(&self) -> &Self::JwtAuthProviderT {
(**self).jwt_auth_provider()
}

async fn leader(&self, database_id: u64) -> anyhow::Result<Option<Host>> {
(**self).leader(database_id).await
}

fn module_logs_dir(&self, replica_id: u64) -> ModuleLogsDir {
(**self).module_logs_dir(replica_id)
}
}

pub fn log_and_500(e: impl std::fmt::Display) -> ErrorResponse {
Expand Down
Loading