Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit c2fed59

Browse files
committed
port hrana for libsqlx server
we can now allocate primaries, and query them
1 parent 52c566c commit c2fed59

36 files changed

+3015
-202
lines changed

Cargo.lock

Lines changed: 22 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libsqlx-server/Cargo.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,23 @@ edition = "2021"
77

88
[dependencies]
99
axum = "0.6.18"
10+
base64 = "0.21.2"
1011
bincode = "1.3.3"
12+
bytes = "1.4.0"
1113
clap = { version = "4.3.11", features = ["derive"] }
1214
color-eyre = "0.6.2"
1315
futures = "0.3.28"
16+
hmac = "0.12.1"
1417
hyper = { version = "0.14.27", features = ["h2", "server"] }
1518
libsqlx = { version = "0.1.0", path = "../libsqlx" }
1619
moka = { version = "0.11.2", features = ["future"] }
20+
parking_lot = "0.12.1"
21+
priority-queue = "1.3.2"
22+
rand = "0.8.5"
1723
regex = "1.9.1"
18-
serde = { version = "1.0.166", features = ["derive"] }
24+
serde = { version = "1.0.166", features = ["derive", "rc"] }
25+
serde_json = "1.0.100"
26+
sha2 = "0.10.7"
1927
sled = "0.34.7"
2028
thiserror = "1.0.43"
2129
tokio = { version = "1.29.1", features = ["full"] }

libsqlx-server/src/allocation/mod.rs

Lines changed: 64 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1-
use std::collections::HashMap;
21
use std::path::PathBuf;
2+
use std::sync::Arc;
33

44
use libsqlx::libsql::{LibsqlDatabase, LogCompactor, LogFile, PrimaryType};
55
use libsqlx::Database as _;
66
use tokio::sync::{mpsc, oneshot};
77
use tokio::task::{block_in_place, JoinSet};
88

9+
use crate::hrana;
10+
use crate::hrana::http::handle_pipeline;
11+
use crate::hrana::http::proto::{PipelineRequestBody, PipelineResponseBody};
12+
913
use self::config::{AllocConfig, DbConfig};
1014

1115
pub mod config;
@@ -19,16 +23,11 @@ pub struct ConnectionId {
1923
}
2024

2125
pub enum AllocationMessage {
22-
/// Execute callback against connection
23-
Exec {
24-
connection_id: ConnectionId,
25-
exec: ExecFn,
26-
},
27-
/// Create a new connection, execute the callback and return the connection id.
28-
NewConnExec {
29-
exec: ExecFn,
30-
ret: oneshot::Sender<ConnectionId>,
31-
},
26+
NewConnection(oneshot::Sender<ConnectionHandle>),
27+
HranaPipelineReq {
28+
req: PipelineRequestBody,
29+
ret: oneshot::Sender<crate::Result<PipelineResponseBody>>,
30+
}
3231
}
3332

3433
pub enum Database {
@@ -73,12 +72,34 @@ impl Database {
7372
pub struct Allocation {
7473
pub inbox: mpsc::Receiver<AllocationMessage>,
7574
pub database: Database,
76-
/// senders to the spawned connections
77-
pub connections: HashMap<u32, mpsc::Sender<ExecFn>>,
7875
/// spawned connection futures, returning their connection id on completion.
7976
pub connections_futs: JoinSet<u32>,
8077
pub next_conn_id: u32,
8178
pub max_concurrent_connections: u32,
79+
80+
pub hrana_server: Arc<hrana::http::Server>,
81+
}
82+
83+
pub struct ConnectionHandle {
84+
exec: mpsc::Sender<ExecFn>,
85+
exit: oneshot::Sender<()>,
86+
}
87+
88+
impl ConnectionHandle {
89+
pub async fn exec<F, R>(&self, f: F) -> crate::Result<R>
90+
where F: for<'a> FnOnce(&'a mut (dyn libsqlx::Connection + 'a)) -> R + Send + 'static,
91+
R: Send + 'static,
92+
{
93+
let (sender, ret) = oneshot::channel();
94+
let cb = move |conn: &mut dyn libsqlx::Connection| {
95+
let res = f(conn);
96+
let _ = sender.send(res);
97+
};
98+
99+
self.exec.send(Box::new(cb)).await.unwrap();
100+
101+
Ok(ret.await?)
102+
}
82103
}
83104

84105
impl Allocation {
@@ -87,34 +108,36 @@ impl Allocation {
87108
tokio::select! {
88109
Some(msg) = self.inbox.recv() => {
89110
match msg {
90-
AllocationMessage::Exec { connection_id, exec } => {
91-
if let Some(sender) = self.connections.get(&connection_id.id) {
92-
if let Err(_) = sender.send(exec).await {
93-
tracing::debug!("connection {} closed.", connection_id.id);
94-
self.connections.remove_entry(&connection_id.id);
95-
}
96-
}
97-
},
98-
AllocationMessage::NewConnExec { exec, ret } => {
99-
let id = self.new_conn_exec(exec).await;
100-
let _ = ret.send(id);
111+
AllocationMessage::NewConnection(ret) => {
112+
let _ =ret.send(self.new_conn().await);
101113
},
114+
AllocationMessage::HranaPipelineReq { req, ret} => {
115+
let res = handle_pipeline(&self.hrana_server.clone(), req, || async {
116+
let conn= self.new_conn().await;
117+
dbg!();
118+
Ok(conn)
119+
}).await;
120+
let _ = ret.send(res);
121+
}
102122
}
103123
},
104124
maybe_id = self.connections_futs.join_next() => {
105-
if let Some(Ok(id)) = maybe_id {
106-
self.connections.remove_entry(&id);
125+
if let Some(Ok(_id)) = maybe_id {
126+
// self.connections.remove_entry(&id);
107127
}
108128
},
109129
else => break,
110130
}
111131
}
112132
}
113133

114-
async fn new_conn_exec(&mut self, exec: ExecFn) -> ConnectionId {
134+
async fn new_conn(&mut self) -> ConnectionHandle {
135+
dbg!();
115136
let id = self.next_conn_id();
137+
dbg!();
116138
let conn = block_in_place(|| self.database.connect());
117-
let (close_sender, exit) = mpsc::channel(1);
139+
dbg!();
140+
let (close_sender, exit) = oneshot::channel();
118141
let (exec_sender, exec_receiver) = mpsc::channel(1);
119142
let conn = Connection {
120143
id,
@@ -123,36 +146,40 @@ impl Allocation {
123146
exec: exec_receiver,
124147
};
125148

149+
dbg!();
126150
self.connections_futs.spawn(conn.run());
127-
// This should never block!
128-
assert!(exec_sender.try_send(exec).is_ok());
129-
assert!(self.connections.insert(id, exec_sender).is_none());
151+
dbg!();
152+
153+
ConnectionHandle {
154+
exec: exec_sender,
155+
exit: close_sender,
156+
}
130157

131-
ConnectionId { id, close_sender }
132158
}
133159

134160
fn next_conn_id(&mut self) -> u32 {
135161
loop {
136162
self.next_conn_id = self.next_conn_id.wrapping_add(1);
137-
if !self.connections.contains_key(&self.next_conn_id) {
138-
return self.next_conn_id;
139-
}
163+
return self.next_conn_id;
164+
// if !self.connections.contains_key(&self.next_conn_id) {
165+
// return self.next_conn_id;
166+
// }
140167
}
141168
}
142169
}
143170

144171
struct Connection {
145172
id: u32,
146173
conn: Box<dyn libsqlx::Connection + Send>,
147-
exit: mpsc::Receiver<()>,
174+
exit: oneshot::Receiver<()>,
148175
exec: mpsc::Receiver<ExecFn>,
149176
}
150177

151178
impl Connection {
152179
async fn run(mut self) -> u32 {
153180
loop {
154181
tokio::select! {
155-
_ = self.exit.recv() => break,
182+
_ = &mut self.exit => break,
156183
Some(exec) = self.exec.recv() => {
157184
tokio::task::block_in_place(|| exec(&mut *self.conn));
158185
}

libsqlx-server/src/database.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use tokio::sync::{mpsc, oneshot};
2+
3+
use crate::hrana::http::proto::{PipelineRequestBody, PipelineResponseBody};
4+
use crate::allocation::{AllocationMessage, ConnectionHandle};
5+
6+
pub struct Database {
7+
pub sender: mpsc::Sender<AllocationMessage>,
8+
}
9+
10+
impl Database {
11+
pub async fn hrana_pipeline(&self, req: PipelineRequestBody) -> crate::Result<PipelineResponseBody> {
12+
dbg!();
13+
let (sender, ret) = oneshot::channel();
14+
dbg!();
15+
self.sender.send(AllocationMessage::HranaPipelineReq { req, ret: sender }).await.unwrap();
16+
dbg!();
17+
ret.await.unwrap()
18+
}
19+
}

libsqlx-server/src/databases/mod.rs

Lines changed: 0 additions & 5 deletions
This file was deleted.

libsqlx-server/src/databases/store.rs

Lines changed: 0 additions & 12 deletions
This file was deleted.

0 commit comments

Comments
 (0)