diff --git a/Cargo.lock b/Cargo.lock index 05e09c72..ebe3bd49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3640,6 +3640,7 @@ version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76" dependencies = [ + "indexmap", "itoa", "ryu", "serde", diff --git a/docs/HTTP_V1_SPEC.md b/docs/HTTP_V1_SPEC.md new file mode 100644 index 00000000..8fc69399 --- /dev/null +++ b/docs/HTTP_V1_SPEC.md @@ -0,0 +1,70 @@ +# The sqld HTTP API v1 specification ("Hrana over HTTP") + +Version 1 of the HTTP API ("Hrana over HTTP") is designed to complement the +WebSocket-based Hrana protocol for use cases that don't require stateful +database connections and for which the additional network rountrip required by +WebSockets relative to HTTP is not necessary. + +This API aims to be of production quality and it is primarily intended to be +consumed by client libraries. It does not deprecate or replace the "version 0" +of the HTTP API, which is designed to be quick and easy for users who send HTTP +requests manually (for example using `curl` or by directly using an HTTP +library). + +## Overview + +This HTTP API uses data structures and semantics from the Hrana protocol; +versions of the HTTP API are intended to correspond to versions of the Hrana +protocol, so HTTP API v1 corresponds to the `hrana1` version of Hrana. + +Endpoints in the HTTP API correspond to requests in Hrana. Each request is +executed as if a fresh Hrana stream was opened for the request. + +All request and response bodies are encoded in JSON, with content type +`application/json`. + +## Execute a statement + +``` +POST /v1/execute + +-> { + "stmt": Stmt, +} + +<- { + "result": StmtResult, +} +``` + +The `execute` endpoint receives a statement and returns the result of executing +the statement. The `Stmt` and `StmtResult` structures are from the Hrana +protocol. The semantics of this endpoint is the same as the `execute` request in +Hrana. + +## Execute a batch + +``` +POST /v1/batch + +-> { + "batch": Batch, +} + +<- { + "result": BatchResult, +} +``` + +The `batch` endpoint receives a batch and returns the result of executing the +statement. The `Batch` and `BatchResult` structures are from the Hrana protocol. +The semantics of this endpoint is the same as the `batch` request in Hrana. + +## Errors + +Successful responses are indicated by a HTTP status code in range [200, 300). +Errors are indicated with HTTP status codes in range [400, 600), and the error +responses should have the format of `Error` from the Hrana protocol. However, +the clients should be able to handle error responses that don't correspond to +this format; in particular, the server may produce some error responses with the +error message as plain text. diff --git a/sqld/Cargo.toml b/sqld/Cargo.toml index 2089983c..d6301cea 100644 --- a/sqld/Cargo.toml +++ b/sqld/Cargo.toml @@ -39,7 +39,7 @@ rusqlite = { version = "0.28.0", git = "https://github.com/psarna/rusqlite", rev "column_decltype" ] } serde = { version = "1.0.149", features = ["derive", "rc"] } -serde_json = "1.0.91" +serde_json = { version = "1.0.91", features = ["preserve_order"] } smallvec = "1.10.0" sqld-libsql-bindings = { version = "0", path = "../sqld-libsql-bindings" } sqlite3-parser = { version = "0.6.0", default-features = false, features = [ "YYNOERRORRECOVERY" ] } diff --git a/sqld/src/http/hrana_over_http.rs b/sqld/src/http/hrana_over_http.rs new file mode 100644 index 00000000..365e5b9a --- /dev/null +++ b/sqld/src/http/hrana_over_http.rs @@ -0,0 +1,108 @@ +use anyhow::{anyhow, Context, Result}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use std::sync::Arc; + +use crate::database::service::DbFactory; +use crate::{batch, hrana}; + +#[derive(thiserror::Error, Debug)] +enum ResponseError { + #[error("Could not parse request body: {source}")] + BadRequestBody { source: serde_json::Error }, + + #[error(transparent)] + Stmt(batch::StmtError), +} + +pub async fn handle_index( + _req: hyper::Request, +) -> Result> { + let body = "This is sqld HTTP API v1 (\"Hrana over HTTP\")"; + let body = hyper::Body::from(body); + Ok(hyper::Response::builder() + .header("content-type", "text/plain") + .body(body) + .unwrap()) +} + +pub async fn handle_execute( + db_factory: Arc, + req: hyper::Request, +) -> Result> { + #[derive(Debug, Deserialize)] + struct ReqBody { + stmt: batch::proto::Stmt, + } + + #[derive(Debug, Serialize)] + struct RespBody { + result: batch::proto::StmtResult, + } + + let res: Result<_> = async move { + let req_body = json_request_body::(req.into_body()).await?; + let db = db_factory + .create() + .await + .context("Could not create a database connection")?; + let result = batch::execute_stmt(&*db, &req_body.stmt) + .await + .map_err(|err| match err.downcast::() { + Ok(stmt_err) => anyhow!(ResponseError::Stmt(stmt_err)), + Err(err) => err, + }) + .context("Could not execute statement")?; + Ok(json_response(hyper::StatusCode::OK, &RespBody { result })) + } + .await; + + Ok(match res { + Ok(resp) => resp, + Err(err) => error_response(err.downcast::()?), + }) +} + +async fn json_request_body(body: hyper::Body) -> Result { + let body = hyper::body::to_bytes(body).await?; + let body = + serde_json::from_slice(&body).map_err(|e| ResponseError::BadRequestBody { source: e })?; + Ok(body) +} + +fn error_response(err: ResponseError) -> hyper::Response { + use batch::StmtError; + let status = match &err { + ResponseError::BadRequestBody { .. } => hyper::StatusCode::BAD_REQUEST, + ResponseError::Stmt(err) => match err { + StmtError::SqlParse { .. } + | StmtError::SqlNoStmt + | StmtError::SqlManyStmts + | StmtError::ArgsInvalid { .. } + | StmtError::SqlInputError { .. } => hyper::StatusCode::BAD_REQUEST, + StmtError::ArgsBothPositionalAndNamed => hyper::StatusCode::NOT_IMPLEMENTED, + StmtError::TransactionTimeout | StmtError::TransactionBusy => { + hyper::StatusCode::SERVICE_UNAVAILABLE + } + StmtError::SqliteError { .. } => hyper::StatusCode::INTERNAL_SERVER_ERROR, + }, + }; + + json_response( + status, + &hrana::proto::Error { + message: err.to_string(), + }, + ) +} + +fn json_response( + status: hyper::StatusCode, + body: &T, +) -> hyper::Response { + let body = serde_json::to_vec(body).unwrap(); + hyper::Response::builder() + .status(status) + .header("content-type", "application/json") + .body(hyper::Body::from(body)) + .unwrap() +} diff --git a/sqld/src/http/mod.rs b/sqld/src/http/mod.rs index f3bc6406..1e0598ce 100644 --- a/sqld/src/http/mod.rs +++ b/sqld/src/http/mod.rs @@ -1,3 +1,4 @@ +mod hrana_over_http; mod types; use std::future::poll_fn; @@ -22,6 +23,7 @@ use tower_http::{compression::CompressionLayer, cors}; use tracing::{Level, Span}; use crate::auth::Auth; +use crate::database::service::DbFactory; use crate::error::Error; use crate::hrana; use crate::http::types::HttpQuery; @@ -230,6 +232,7 @@ async fn handle_request( req: Request, sender: mpsc::Sender, upgrade_tx: mpsc::Sender, + db_factory: Arc, enable_console: bool, ) -> anyhow::Result> { if hyper_tungstenite::is_upgrade_request(&req) { @@ -249,6 +252,8 @@ async fn handle_request( (&Method::GET, "/version") => Ok(handle_version()), (&Method::GET, "/console") if enable_console => show_console().await, (&Method::GET, "/health") => Ok(handle_health()), + (&Method::GET, "/v1") => hrana_over_http::handle_index(req).await, + (&Method::POST, "/v1/execute") => hrana_over_http::handle_execute(db_factory, req).await, _ => Ok(Response::builder().status(404).body(Body::empty()).unwrap()), } } @@ -261,7 +266,8 @@ fn handle_version() -> Response { pub async fn run_http( addr: SocketAddr, auth: Arc, - db_factory: F, + db_factory_service: F, + db_factory: Arc, upgrade_tx: mpsc::Sender, enable_console: bool, idle_shutdown_layer: Option, @@ -306,6 +312,7 @@ where req, sender.clone(), upgrade_tx.clone(), + db_factory.clone(), enable_console, ) }); @@ -313,7 +320,7 @@ where let server = hyper::server::Server::bind(&addr).serve(tower::make::Shared::new(service)); tokio::spawn(async move { - let mut pool = pool::Builder::new().build(db_factory, ()); + let mut pool = pool::Builder::new().build(db_factory_service, ()); while let Some(Message { queries, resp }) = receiver.recv().await { if let Err(e) = poll_fn(|c| pool.poll_ready(c)).await { tracing::error!("Connection pool error: {e}"); diff --git a/sqld/src/lib.rs b/sqld/src/lib.rs index e4da6ea9..a1980e1f 100644 --- a/sqld/src/lib.rs +++ b/sqld/src/lib.rs @@ -119,6 +119,7 @@ async fn run_service( addr, auth.clone(), service.clone().map_response(|s| Constant::new(s, 1)), + service.factory.clone(), upgrade_tx, config.enable_http_console, idle_shutdown_layer.clone(),