Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit dd455cd

Browse files
authored
Merge pull request #539 from libsql/deallocate
deallocate
2 parents 9fea306 + 5d16027 commit dd455cd

File tree

7 files changed

+81
-38
lines changed

7 files changed

+81
-38
lines changed

libsqlx-server/src/allocation/mod.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -590,14 +590,16 @@ impl ReplicaConnection {
590590
.unwrap(),
591591
BuilderStep::BeginRows => req.builder.begin_rows().unwrap(),
592592
BuilderStep::BeginRow => req.builder.begin_row().unwrap(),
593-
BuilderStep::AddRowValue(v) => req.builder.add_row_value((&v).into()).unwrap(),
593+
BuilderStep::AddRowValue(v) => {
594+
req.builder.add_row_value((&v).into()).unwrap()
595+
}
594596
BuilderStep::FinishRow => req.builder.finish_row().unwrap(),
595597
BuilderStep::FinishRows => req.builder.finish_rows().unwrap(),
596598
BuilderStep::Finnalize { is_txn, frame_no } => {
597599
let _ = req.builder.finnalize(is_txn, frame_no).unwrap();
598600
finnalized = true;
599-
},
600-
BuilderStep::FinnalizeError(e) => {
601+
}
602+
BuilderStep::FinnalizeError(e) => {
601603
req.builder.finnalize_error(e);
602604
finnalized = true;
603605
}
@@ -625,15 +627,13 @@ impl ConnectionHandler for ReplicaConnection {
625627
// self.conn.writer().current_req.timeout.poll()
626628
let mut req = self.conn.writer().current_req.lock();
627629
let should_abort_query = match &mut *req {
628-
Some(ref mut req) => {
629-
match req.timeout.as_mut().poll(cx) {
630-
Poll::Ready(_) => {
631-
req.builder.finnalize_error("request timed out".to_string());
632-
true
633-
}
634-
Poll::Pending => return Poll::Pending,
630+
Some(ref mut req) => match req.timeout.as_mut().poll(cx) {
631+
Poll::Ready(_) => {
632+
req.builder.finnalize_error("request timed out".to_string());
633+
true
635634
}
636-
}
635+
Poll::Pending => return Poll::Pending,
636+
},
637637
None => return Poll::Ready(()),
638638
};
639639

libsqlx-server/src/http/admin.rs

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,39 @@
1-
use std::sync::Arc;
21
use std::str::FromStr;
2+
use std::sync::Arc;
33
use std::time::Duration;
44

5+
use axum::extract::{Path, State};
6+
use axum::routing::{delete, post};
57
use axum::{Json, Router};
6-
use axum::routing::post;
7-
use axum::extract::State;
88
use color_eyre::eyre::Result;
99
use hyper::server::accept::Accept;
1010
use serde::{Deserialize, Deserializer, Serialize};
1111
use tokio::io::{AsyncRead, AsyncWrite};
1212

13-
use crate::meta::Store;
1413
use crate::allocation::config::{AllocConfig, DbConfig};
14+
use crate::linc::bus::Bus;
1515
use crate::linc::NodeId;
16+
use crate::manager::Manager;
17+
use crate::meta::DatabaseId;
1618

1719
pub struct Config {
18-
pub meta_store: Arc<Store>,
20+
pub bus: Arc<Bus<Arc<Manager>>>,
1921
}
2022

2123
struct AdminServerState {
22-
meta_store: Arc<Store>,
24+
bus: Arc<Bus<Arc<Manager>>>,
2325
}
2426

2527
pub async fn run_admin_api<I>(config: Config, listener: I) -> Result<()>
2628
where
2729
I: Accept<Error = std::io::Error>,
2830
I::Conn: AsyncRead + AsyncWrite + Send + Unpin + 'static,
2931
{
30-
let state = AdminServerState {
31-
meta_store: config.meta_store,
32-
};
32+
let state = AdminServerState { bus: config.bus };
3333

3434
let app = Router::new()
3535
.route("/manage/allocation", post(allocate).get(list_allocs))
36+
.route("/manage/allocation/:db_name", delete(deallocate))
3637
.with_state(Arc::new(state));
3738
axum::Server::builder(listener)
3839
.serve(app.into_make_service())
@@ -49,7 +50,7 @@ struct AllocateResp {}
4950

5051
#[derive(Deserialize, Debug)]
5152
struct AllocateReq {
52-
alloc_id: String,
53+
database_name: String,
5354
max_conccurent_connection: Option<u32>,
5455
config: DbConfigReq,
5556
}
@@ -60,7 +61,10 @@ pub enum DbConfigReq {
6061
Primary {},
6162
Replica {
6263
primary_node_id: NodeId,
63-
#[serde(deserialize_with = "deserialize_duration", default = "default_proxy_timeout")]
64+
#[serde(
65+
deserialize_with = "deserialize_duration",
66+
default = "default_proxy_timeout"
67+
)]
6468
proxy_request_timeout_duration: Duration,
6569
},
6670
}
@@ -78,8 +82,8 @@ where
7882
type Value = Duration;
7983

8084
fn visit_str<E>(self, v: &str) -> std::result::Result<Self::Value, E>
81-
where
82-
E: serde::de::Error,
85+
where
86+
E: serde::de::Error,
8387
{
8488
match humantime::Duration::from_str(v) {
8589
Ok(d) => Ok(*d),
@@ -90,7 +94,6 @@ where
9094
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
9195
f.write_str("a duration, in a string format")
9296
}
93-
9497
}
9598

9699
deserializer.deserialize_str(Visitor)
@@ -102,7 +105,7 @@ async fn allocate(
102105
) -> Result<Json<AllocateResp>, Json<ErrorResponse>> {
103106
let config = AllocConfig {
104107
max_conccurent_connection: req.max_conccurent_connection.unwrap_or(16),
105-
db_name: req.alloc_id.clone(),
108+
db_name: req.database_name.clone(),
106109
db_config: match req.config {
107110
DbConfigReq::Primary {} => DbConfig::Primary {},
108111
DbConfigReq::Replica {
@@ -114,7 +117,20 @@ async fn allocate(
114117
},
115118
},
116119
};
117-
state.meta_store.allocate(&req.alloc_id, &config).await;
120+
121+
let dispatcher = state.bus.clone();
122+
let id = DatabaseId::from_name(&req.database_name);
123+
state.bus.handler().allocate(id, &config, dispatcher).await;
124+
125+
Ok(Json(AllocateResp {}))
126+
}
127+
128+
async fn deallocate(
129+
State(state): State<Arc<AdminServerState>>,
130+
Path(database_name): Path<String>,
131+
) -> Result<Json<AllocateResp>, Json<ErrorResponse>> {
132+
let id = DatabaseId::from_name(&database_name);
133+
state.bus.handler().deallocate(id).await;
118134

119135
Ok(Json(AllocateResp {}))
120136
}
@@ -133,7 +149,9 @@ async fn list_allocs(
133149
State(state): State<Arc<AdminServerState>>,
134150
) -> Result<Json<ListAllocResp>, Json<ErrorResponse>> {
135151
let allocs = state
136-
.meta_store
152+
.bus
153+
.handler()
154+
.store()
137155
.list_allocs()
138156
.await
139157
.into_iter()

libsqlx-server/src/http/user/extractors.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ impl FromRequestParts<Arc<UserApiState>> for Database {
2020
let Ok(host_str) = std::str::from_utf8(host.as_bytes()) else {return Err(UserApiError::MissingHost)};
2121
let db_name = parse_host(host_str)?;
2222
let db_id = DatabaseId::from_name(db_name);
23-
let Some(sender) = state.manager.alloc(db_id, state.bus.clone()).await else { return Err(UserApiError::UnknownDatabase(db_name.to_owned())) };
23+
let Some(sender) = state.manager.schedule(db_id, state.bus.clone()).await else { return Err(UserApiError::UnknownDatabase(db_name.to_owned())) };
2424

2525
Ok(Database { sender })
2626
}

libsqlx-server/src/linc/bus.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ impl<H: Handler> Bus<H> {
3131
self.node_id
3232
}
3333

34+
pub fn handler(&self) -> &H {
35+
&self.handler
36+
}
37+
3438
pub async fn incomming(self: &Arc<Self>, incomming: Inbound) {
3539
self.handler.handle(self.clone(), incomming).await;
3640
}

libsqlx-server/src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ struct Args {
3535
async fn spawn_admin_api(
3636
set: &mut JoinSet<Result<()>>,
3737
config: &AdminApiConfig,
38-
meta_store: Arc<Store>,
38+
bus: Arc<Bus<Arc<Manager>>>,
3939
) -> Result<()> {
4040
let admin_api_listener = TcpListener::bind(config.addr).await?;
4141
let fut = run_admin_api(
42-
http::admin::Config { meta_store },
42+
http::admin::Config { bus },
4343
AddrIncoming::from_listener(admin_api_listener)?,
4444
);
4545
set.spawn(fut);
@@ -98,7 +98,7 @@ async fn main() -> Result<()> {
9898
let bus = Arc::new(Bus::new(config.cluster.id, manager.clone()));
9999

100100
spawn_cluster_networking(&mut join_set, &config.cluster, bus.clone()).await?;
101-
spawn_admin_api(&mut join_set, &config.admin_api, store.clone()).await?;
101+
spawn_admin_api(&mut join_set, &config.admin_api, bus.clone()).await?;
102102
spawn_user_api(&mut join_set, &config.user_api, manager, bus).await?;
103103

104104
join_set.join_next().await;

libsqlx-server/src/manager.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use moka::future::Cache;
66
use tokio::sync::mpsc;
77
use tokio::task::JoinSet;
88

9+
use crate::allocation::config::AllocConfig;
910
use crate::allocation::{Allocation, AllocationMessage, Database};
1011
use crate::hrana;
1112
use crate::linc::bus::Dispatch;
@@ -31,7 +32,7 @@ impl Manager {
3132
}
3233

3334
/// Returns a handle to an allocation, lazily initializing if it isn't already loaded.
34-
pub async fn alloc(
35+
pub async fn schedule(
3536
self: &Arc<Self>,
3637
database_id: DatabaseId,
3738
dispatcher: Arc<dyn Dispatch>,
@@ -65,14 +66,35 @@ impl Manager {
6566

6667
None
6768
}
69+
70+
pub async fn allocate(
71+
self: &Arc<Self>,
72+
database_id: DatabaseId,
73+
meta: &AllocConfig,
74+
dispatcher: Arc<dyn Dispatch>,
75+
) {
76+
self.store().allocate(database_id, meta).await;
77+
self.schedule(database_id, dispatcher).await;
78+
}
79+
80+
pub async fn deallocate(&self, database_id: DatabaseId) {
81+
self.meta_store.deallocate(database_id).await;
82+
self.cache.remove(&database_id).await;
83+
let db_path = self.db_path.join("dbs").join(database_id.to_string());
84+
tokio::fs::remove_dir_all(db_path).await.unwrap();
85+
}
86+
87+
pub fn store(&self) -> &Store {
88+
&self.meta_store
89+
}
6890
}
6991

7092
#[async_trait::async_trait]
7193
impl Handler for Arc<Manager> {
7294
async fn handle(&self, bus: Arc<dyn Dispatch>, msg: Inbound) {
7395
if let Some(sender) = self
7496
.clone()
75-
.alloc(msg.enveloppe.database_id.unwrap(), bus.clone())
97+
.schedule(msg.enveloppe.database_id.unwrap(), bus.clone())
7698
.await
7799
{
78100
let _ = sender.send(AllocationMessage::Inbound(msg)).await;

libsqlx-server/src/meta.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,19 @@ impl Store {
5656
Self { meta_store }
5757
}
5858

59-
pub async fn allocate(&self, database_name: &str, meta: &AllocConfig) {
59+
pub async fn allocate(&self, id: DatabaseId, meta: &AllocConfig) {
6060
//TODO: Handle conflict
6161
block_in_place(|| {
6262
let meta_bytes = bincode::serialize(meta).unwrap();
63-
let id = DatabaseId::from_name(database_name);
6463
self.meta_store
6564
.compare_and_swap(id, None as Option<&[u8]>, Some(meta_bytes))
6665
.unwrap()
6766
.unwrap();
6867
});
6968
}
7069

71-
pub async fn deallocate(&self, _database_name: &str) {
72-
todo!()
70+
pub async fn deallocate(&self, id: DatabaseId) {
71+
block_in_place(|| self.meta_store.remove(id).unwrap());
7372
}
7473

7574
pub async fn meta(&self, database_id: &DatabaseId) -> Option<AllocConfig> {

0 commit comments

Comments
 (0)