Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
Merge #305
Browse files Browse the repository at this point in the history
305: Add HTTP API v1 ("Hrana over HTTP") r=MarinPostma a=honzasp

We need to update the HTTP API to support batches and to fix some issues (such as encoding of types). Instead of coming up with a new API, let's just reuse the Hrana structures and semantics. In effect, the new version of the HTTP API is "Hrana over HTTP".

This is needed for [the new generation of the client libraries](tursodatabase/libsql-client-ts#10).

Co-authored-by: Jan Špaček <patek.mail@gmail.com>
  • Loading branch information
bors[bot] and honzasp authored Mar 28, 2023
2 parents a558b64 + c50b035 commit fb8b3c2
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 70 additions & 0 deletions docs/HTTP_V1_SPEC.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# The sqld HTTP API v1 specification ("Hrana over HTTP")

Version 1 of the HTTP API ("Hrana over HTTP") is designed to complement the
WebSocket-based Hrana protocol for use cases that don't require stateful
database connections and for which the additional network rountrip required by
WebSockets relative to HTTP is not necessary.

This API aims to be of production quality and it is primarily intended to be
consumed by client libraries. It does not deprecate or replace the "version 0"
of the HTTP API, which is designed to be quick and easy for users who send HTTP
requests manually (for example using `curl` or by directly using an HTTP
library).

## Overview

This HTTP API uses data structures and semantics from the Hrana protocol;
versions of the HTTP API are intended to correspond to versions of the Hrana
protocol, so HTTP API v1 corresponds to the `hrana1` version of Hrana.

Endpoints in the HTTP API correspond to requests in Hrana. Each request is
executed as if a fresh Hrana stream was opened for the request.

All request and response bodies are encoded in JSON, with content type
`application/json`.

## Execute a statement

```
POST /v1/execute
-> {
"stmt": Stmt,
}
<- {
"result": StmtResult,
}
```

The `execute` endpoint receives a statement and returns the result of executing
the statement. The `Stmt` and `StmtResult` structures are from the Hrana
protocol. The semantics of this endpoint is the same as the `execute` request in
Hrana.

## Execute a batch

```
POST /v1/batch
-> {
"batch": Batch,
}
<- {
"result": BatchResult,
}
```

The `batch` endpoint receives a batch and returns the result of executing the
statement. The `Batch` and `BatchResult` structures are from the Hrana protocol.
The semantics of this endpoint is the same as the `batch` request in Hrana.

## Errors

Successful responses are indicated by a HTTP status code in range [200, 300).
Errors are indicated with HTTP status codes in range [400, 600), and the error
responses should have the format of `Error` from the Hrana protocol. However,
the clients should be able to handle error responses that don't correspond to
this format; in particular, the server may produce some error responses with the
error message as plain text.
2 changes: 1 addition & 1 deletion sqld/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev
"column_decltype"
] }
serde = { version = "1.0.149", features = ["derive", "rc"] }
serde_json = "1.0.91"
serde_json = { version = "1.0.91", features = ["preserve_order"] }
smallvec = "1.10.0"
sqld-libsql-bindings = { version = "0", path = "../sqld-libsql-bindings" }
sqlite3-parser = { version = "0.6.0", default-features = false, features = [ "YYNOERRORRECOVERY" ] }
Expand Down
157 changes: 157 additions & 0 deletions sqld/src/http/hrana_over_http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use anyhow::{anyhow, Context, Result};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::future::Future;
use std::sync::Arc;

use crate::database::service::DbFactory;
use crate::database::Database;
use crate::{batch, hrana};

#[derive(thiserror::Error, Debug)]
enum ResponseError {
#[error("Could not parse request body: {source}")]
BadRequestBody { source: serde_json::Error },

#[error(transparent)]
Stmt(batch::StmtError),
#[error(transparent)]
Batch(batch::BatchError),
}

pub async fn handle_index(
_req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>> {
let body = "This is sqld HTTP API v1 (\"Hrana over HTTP\")";
let body = hyper::Body::from(body);
Ok(hyper::Response::builder()
.header("content-type", "text/plain")
.body(body)
.unwrap())
}

pub async fn handle_execute(
db_factory: Arc<dyn DbFactory>,
req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>> {
#[derive(Debug, Deserialize)]
struct ReqBody {
stmt: batch::proto::Stmt,
}

#[derive(Debug, Serialize)]
struct RespBody {
result: batch::proto::StmtResult,
}

handle_request(db_factory, req, |db, req_body: ReqBody| async move {
batch::execute_stmt(&*db, &req_body.stmt)
.await
.map(|result| RespBody { result })
.map_err(|err| match err.downcast::<batch::StmtError>() {
Ok(stmt_err) => anyhow!(ResponseError::Stmt(stmt_err)),
Err(err) => err,
})
.context("Could not execute statement")
})
.await
}

pub async fn handle_batch(
db_factory: Arc<dyn DbFactory>,
req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>> {
#[derive(Debug, Deserialize)]
struct ReqBody {
batch: batch::proto::Batch,
}

#[derive(Debug, Serialize)]
struct RespBody {
result: batch::proto::BatchResult,
}

handle_request(db_factory, req, |db, req_body: ReqBody| async move {
batch::execute_batch(&*db, &req_body.batch)
.await
.map(|result| RespBody { result })
.map_err(|err| match err.downcast::<batch::BatchError>() {
Ok(batch_err) => anyhow!(ResponseError::Batch(batch_err)),
Err(err) => err,
})
.context("Could not execute batch")
})
.await
}

async fn handle_request<ReqBody, RespBody, F, Fut>(
db_factory: Arc<dyn DbFactory>,
req: hyper::Request<hyper::Body>,
f: F,
) -> Result<hyper::Response<hyper::Body>>
where
ReqBody: DeserializeOwned,
RespBody: Serialize,
F: FnOnce(Arc<dyn Database>, ReqBody) -> Fut,
Fut: Future<Output = Result<RespBody>>,
{
let res: Result<_> = async move {
let req_body = hyper::body::to_bytes(req.into_body()).await?;
let req_body = serde_json::from_slice(&req_body)
.map_err(|e| ResponseError::BadRequestBody { source: e })?;

let db = db_factory
.create()
.await
.context("Could not create a database connection")?;
let resp_body = f(db, req_body).await?;

Ok(json_response(hyper::StatusCode::OK, &resp_body))
}
.await;

Ok(match res {
Ok(resp) => resp,
Err(err) => error_response(err.downcast::<ResponseError>()?),
})
}

fn error_response(err: ResponseError) -> hyper::Response<hyper::Body> {
use batch::{BatchError, StmtError};
let status = match &err {
ResponseError::BadRequestBody { .. } => hyper::StatusCode::BAD_REQUEST,
ResponseError::Stmt(err) => match err {
StmtError::SqlParse { .. }
| StmtError::SqlNoStmt
| StmtError::SqlManyStmts
| StmtError::ArgsInvalid { .. }
| StmtError::SqlInputError { .. } => hyper::StatusCode::BAD_REQUEST,
StmtError::ArgsBothPositionalAndNamed => hyper::StatusCode::NOT_IMPLEMENTED,
StmtError::TransactionTimeout | StmtError::TransactionBusy => {
hyper::StatusCode::SERVICE_UNAVAILABLE
}
StmtError::SqliteError { .. } => hyper::StatusCode::INTERNAL_SERVER_ERROR,
},
ResponseError::Batch(err) => match err {
BatchError::CondBadStep => hyper::StatusCode::BAD_REQUEST,
},
};

json_response(
status,
&hrana::proto::Error {
message: err.to_string(),
},
)
}

fn json_response<T: Serialize>(
status: hyper::StatusCode,
body: &T,
) -> hyper::Response<hyper::Body> {
let body = serde_json::to_vec(body).unwrap();
hyper::Response::builder()
.status(status)
.header("content-type", "application/json")
.body(hyper::Body::from(body))
.unwrap()
}
12 changes: 10 additions & 2 deletions sqld/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod hrana_over_http;
mod types;

use std::future::poll_fn;
Expand All @@ -22,6 +23,7 @@ use tower_http::{compression::CompressionLayer, cors};
use tracing::{Level, Span};

use crate::auth::Auth;
use crate::database::service::DbFactory;
use crate::error::Error;
use crate::hrana;
use crate::http::types::HttpQuery;
Expand Down Expand Up @@ -230,6 +232,7 @@ async fn handle_request(
req: Request<Body>,
sender: mpsc::Sender<Message>,
upgrade_tx: mpsc::Sender<hrana::Upgrade>,
db_factory: Arc<dyn DbFactory>,
enable_console: bool,
) -> anyhow::Result<Response<Body>> {
if hyper_tungstenite::is_upgrade_request(&req) {
Expand All @@ -249,6 +252,9 @@ async fn handle_request(
(&Method::GET, "/version") => Ok(handle_version()),
(&Method::GET, "/console") if enable_console => show_console().await,
(&Method::GET, "/health") => Ok(handle_health()),
(&Method::GET, "/v1") => hrana_over_http::handle_index(req).await,
(&Method::POST, "/v1/execute") => hrana_over_http::handle_execute(db_factory, req).await,
(&Method::POST, "/v1/batch") => hrana_over_http::handle_batch(db_factory, req).await,
_ => Ok(Response::builder().status(404).body(Body::empty()).unwrap()),
}
}
Expand All @@ -261,7 +267,8 @@ fn handle_version() -> Response<Body> {
pub async fn run_http<F>(
addr: SocketAddr,
auth: Arc<Auth>,
db_factory: F,
db_factory_service: F,
db_factory: Arc<dyn DbFactory>,
upgrade_tx: mpsc::Sender<hrana::Upgrade>,
enable_console: bool,
idle_shutdown_layer: Option<IdleShutdownLayer>,
Expand Down Expand Up @@ -306,14 +313,15 @@ where
req,
sender.clone(),
upgrade_tx.clone(),
db_factory.clone(),
enable_console,
)
});

let server = hyper::server::Server::bind(&addr).serve(tower::make::Shared::new(service));

tokio::spawn(async move {
let mut pool = pool::Builder::new().build(db_factory, ());
let mut pool = pool::Builder::new().build(db_factory_service, ());
while let Some(Message { queries, resp }) = receiver.recv().await {
if let Err(e) = poll_fn(|c| pool.poll_ready(c)).await {
tracing::error!("Connection pool error: {e}");
Expand Down
1 change: 1 addition & 0 deletions sqld/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ async fn run_service(
addr,
auth.clone(),
service.clone().map_response(|s| Constant::new(s, 1)),
service.factory.clone(),
upgrade_tx,
config.enable_http_console,
idle_shutdown_layer.clone(),
Expand Down

0 comments on commit fb8b3c2

Please sign in to comment.