Skip to content

Commit deae7ad

Browse files
committed
api: initial provisioner manager
1 parent 3864db9 commit deae7ad

File tree

5 files changed

+202
-7
lines changed

5 files changed

+202
-7
lines changed

Cargo.lock

Lines changed: 9 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ regex = "1.5.4"
1616
reqwest = {version = "0.11.6", features = ["json"]}
1717
time = "0.2.25"
1818

19+
tokio = {version = "1.15.0", features = ["full"]}
20+
1921
rocket = {version = "0.5.0-rc.1", features = ["json"]}
2022

2123
serde = {version = "1.0", features = ["derive"]}
@@ -26,6 +28,8 @@ base64 = "0.13.0"
2628
form_urlencoded = "1.0.1"
2729

2830
db_models = {package = "haas_db_models", path = "crates/db_models"}
31+
provisioner = {package = "haas_provisioner", path = "crates/provisioner"}
32+
2933
jsonwebtoken = "7.2.0"
3034

3135
[dependencies.rocket_sync_db_pools]

Rocket.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
[default]
22
address = "0.0.0.0"
33
port = 5000
4+
provisioner = {caddy_api_base = "http://caddy:2019/", caddy_container_name = "caddy"}
45

56
[global.databases]
67
db = {url = "postgres://postgres:postgres@localhost:5432/postgres"}
8+
9+
# Only applied in debug mode (i.e. local development)
10+
[debug]
11+
provisioner = {caddy_api_base = "http://localhost:2019", caddy_container_name = "caddy-server"}

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use trust_dns_client::{client::AsyncClient, udp::UdpClientStream};
1212

1313
mod api;
1414
mod auth;
15+
mod provision;
1516
mod slack;
1617
mod utils;
1718

src/provision.rs

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
use std::collections::HashMap;
2+
use std::sync::Arc;
3+
4+
use crate::DbConn;
5+
use chrono::NaiveDateTime;
6+
use diesel::prelude::*;
7+
use provisioner::{Provisioner, ProvisionerEvent};
8+
use tokio::sync::broadcast::{self, Sender};
9+
10+
pub use provisioner::hyper::Uri;
11+
12+
#[derive(serde::Serialize, Debug, Clone)]
13+
#[serde(rename_all = "snake_case")]
14+
pub struct ProvisionerEvent2 {
15+
ts: NaiveDateTime,
16+
#[serde(flatten)]
17+
event: Result<ProvisionerEvent, String>,
18+
}
19+
20+
impl ProvisionerEvent2 {
21+
pub fn make(event: Result<ProvisionerEvent, String>) -> Self {
22+
let ts = chrono::Utc::now().naive_utc();
23+
Self { ts, event }
24+
}
25+
}
26+
27+
struct PooledDbRunner {
28+
c: Arc<DbConn>,
29+
}
30+
31+
#[rocket::async_trait]
32+
impl provisioner::DbRunner for &PooledDbRunner {
33+
async fn run<U: Send + 'static>(
34+
&mut self,
35+
f: Box<
36+
dyn for<'a> FnOnce(&'a mut PgConnection) -> Result<U, diesel::result::Error>
37+
+ Send
38+
+ 'static,
39+
>,
40+
) -> Result<U, diesel::result::Error> {
41+
self.c.run(f).await
42+
}
43+
}
44+
45+
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
46+
pub struct ProvisionerConfig {
47+
#[serde(with = "url_serializer")]
48+
caddy_api_base: provisioner::caddy::Url,
49+
caddy_container_name: String,
50+
}
51+
52+
mod url_serializer {
53+
use provisioner::caddy::Url;
54+
use serde::de::{
55+
Deserializer, Error as DeError, Unexpected as DeUnexpected, Visitor as DeVisitor,
56+
};
57+
use serde::ser::Serializer;
58+
59+
pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result<Url, D::Error> {
60+
struct UrlVisitor;
61+
62+
impl<'de> DeVisitor<'de> for UrlVisitor {
63+
type Value = Url;
64+
65+
fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E>
66+
where
67+
E: serde::de::Error,
68+
{
69+
v.parse()
70+
.map_err(|_| DeError::invalid_value(DeUnexpected::Str(v), &self))
71+
}
72+
73+
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
74+
write!(formatter, "a URI")
75+
}
76+
}
77+
78+
de.deserialize_str(UrlVisitor)
79+
}
80+
81+
pub fn serialize<S: Serializer>(url: &Url, ser: S) -> Result<S::Ok, S::Error> {
82+
ser.serialize_str(&url.to_string())
83+
}
84+
}
85+
86+
pub struct ProvisionerManager {
87+
provisioner: Provisioner,
88+
event_channels: HashMap<i32, Sender<ProvisionerEvent2>>,
89+
}
90+
91+
impl ProvisionerManager {
92+
pub fn from_figment(f: &rocket::figment::Figment) -> provisioner::Result<Self> {
93+
let c = f
94+
.extract_inner::<ProvisionerConfig>("provisioner")
95+
.expect("Failed to extract config from figment");
96+
Ok(Self {
97+
provisioner: Provisioner::connecting_with_local_defaults(
98+
c.caddy_api_base,
99+
c.caddy_container_name,
100+
)?,
101+
event_channels: Default::default(),
102+
})
103+
}
104+
105+
pub async fn create_build(
106+
self: Arc<Self>,
107+
conn: Arc<DbConn>,
108+
git_uri: Uri,
109+
app_id: i32,
110+
app_slug: &str,
111+
) -> diesel::QueryResult<i32> {
112+
use db_models::schema::builds::dsl::builds;
113+
use db_models::{Build, NewBuild};
114+
let app_slug = app_slug.to_owned();
115+
let build = conn
116+
.run(move |c| {
117+
diesel::insert_into(builds)
118+
.values(NewBuild { app_id })
119+
.get_result::<Build>(c)
120+
})
121+
.await?;
122+
let build_id = build.id;
123+
let (tx, mut rx) = broadcast::channel(10);
124+
let conn2 = Arc::clone(&conn);
125+
// Receive build events and append them to the db
126+
tokio::spawn(async move {
127+
loop {
128+
match rx.recv().await {
129+
Ok(ev) => {
130+
conn2
131+
.run(move |c| {
132+
// Diesel doesn't support array_append
133+
diesel::sql_query(
134+
"UPDATE builds SET events = array_append(events, ?) WHERE id = ?",
135+
)
136+
.bind::<diesel::sql_types::Text, _>(serde_json::to_string(&ev).unwrap())
137+
.bind::<diesel::sql_types::Integer, _>(build_id)
138+
.execute(c)
139+
.unwrap();
140+
})
141+
.await;
142+
}
143+
Err(broadcast::error::RecvError::Closed) => break,
144+
_ => {}
145+
}
146+
}
147+
});
148+
// Start the build / deploy
149+
tokio::spawn(async move {
150+
let (tx2, mut rx2) = broadcast::channel(10);
151+
let tx_clone = tx.clone();
152+
tokio::spawn(async move {
153+
loop {
154+
match rx2.recv().await {
155+
Ok(ev) => {
156+
tx_clone.send(ProvisionerEvent2::make(Ok(ev))).unwrap();
157+
}
158+
Err(broadcast::error::RecvError::Closed) => break,
159+
_ => {}
160+
}
161+
}
162+
});
163+
let runner = PooledDbRunner { c: conn.clone() };
164+
let br = self
165+
.provisioner
166+
.build_image_from_github(app_id, &app_slug, &git_uri, Some(tx2.clone()))
167+
.await;
168+
if let Err(e) = br {
169+
tx.send(ProvisionerEvent2::make(Err(e.to_string())))
170+
.unwrap();
171+
}
172+
let dr = self
173+
.provisioner
174+
.deploy_app(app_id, &mut &runner, Some(tx2.clone()))
175+
.await;
176+
if let Err(e) = dr {
177+
tx.send(ProvisionerEvent2::make(Err(e.to_string())))
178+
.unwrap();
179+
}
180+
});
181+
Ok(5)
182+
}
183+
}

0 commit comments

Comments
 (0)