Skip to content

Commit 509d7a2

Browse files
committed
Auto merge of #3572 - pietroalbini:broken-start, r=jtgeibel
Allow the application to start without a database connection This PR implements the last bit of #3541, making sure the server binary can start without a database connection. To avoid serving the initial requests without a connection, the server will now wait for 5 seconds at startup to initialize the database connections, and if it can't do that in that timeframe it will treat the database as unhealthy and continue starting up the server. This is critical to our continued availability, so to make sure this doesn't regress I also implemented a test that fails if a database connection becomes a requirement to start the server again. The test is implemented by starting the server binary with the right environment variables and doing proper HTTP requests to it. Fixes #3541 r? `@jtgeibel`
2 parents 99ef346 + 1523948 commit 509d7a2

File tree

8 files changed

+260
-26
lines changed

8 files changed

+260
-26
lines changed

src/app.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ impl App {
121121
.connection_customizer(Box::new(primary_db_connection_config))
122122
.thread_pool(thread_pool.clone());
123123

124-
DieselPool::new(&config.db_primary_config.url, primary_db_config)
124+
DieselPool::new(&config.db_primary_config.url, primary_db_config).unwrap()
125125
};
126126

127127
let replica_database = if let Some(url) = config.db_replica_config.as_ref().map(|c| &c.url)
@@ -141,7 +141,7 @@ impl App {
141141
.connection_customizer(Box::new(replica_db_connection_config))
142142
.thread_pool(thread_pool);
143143

144-
Some(DieselPool::new(&url, replica_db_config))
144+
Some(DieselPool::new(&url, replica_db_config).unwrap())
145145
}
146146
} else {
147147
None

src/bin/server.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
9797
let mut sig_int = rt.block_on(async { signal(SignalKind::interrupt()) })?;
9898
let mut sig_term = rt.block_on(async { signal(SignalKind::terminate()) })?;
9999

100+
// When the user configures PORT=0 the operative system will allocate a random unused port.
101+
// This fetches that random port and uses it to display the "listening on port" message later.
102+
let actual_port = server.local_addr().port();
103+
100104
let server = server.with_graceful_shutdown(async move {
101105
// Wait for either signal
102106
futures_util::select! {
@@ -109,7 +113,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
109113

110114
let server = rt.spawn(async { server.await.unwrap() });
111115

112-
println!("listening on port {}", port);
116+
// Do not change this line! Removing the line or changing its contents in any way will break
117+
// the test suite :)
118+
println!("listening on port {}", actual_port);
113119

114120
// Creating this file tells heroku to tell nginx that the application is ready
115121
// to receive traffic.

src/db.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,28 @@ impl DieselPool {
1919
pub(crate) fn new(
2020
url: &str,
2121
config: r2d2::Builder<ConnectionManager<PgConnection>>,
22-
) -> DieselPool {
22+
) -> Result<DieselPool, PoolError> {
2323
let manager = ConnectionManager::new(connection_url(url));
24-
DieselPool::Pool(config.build(manager).unwrap())
24+
25+
// For crates.io we want the behavior of creating a database pool to be slightly different
26+
// than the defaults of R2D2: the library's build() method assumes its consumers always
27+
// need a database connection to operate, so it blocks creating a pool until a minimum
28+
// number of connections is available.
29+
//
30+
// crates.io can actually operate in a limited capacity without a database connections,
31+
// especially by serving download requests to our users. Because of that we don't want to
32+
// block indefinitely waiting for a connection: we instead need to wait for a bit (to avoid
33+
// serving errors for the first connections until the pool is initialized) and if we can't
34+
// establish any connection continue booting up the application. The database pool will
35+
// automatically be marked as unhealthy and the rest of the application will adapt.
36+
let pool = DieselPool::Pool(config.build_unchecked(manager));
37+
match pool.wait_until_healthy(Duration::from_secs(5)) {
38+
Ok(()) => {}
39+
Err(PoolError::UnhealthyPool) => {}
40+
Err(err) => return Err(err),
41+
}
42+
43+
Ok(pool)
2544
}
2645

2746
pub(crate) fn new_test(url: &str) -> DieselPool {

src/tests/all.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ mod read_only_mode;
4545
mod record;
4646
mod schema_details;
4747
mod server;
48+
mod server_binary;
4849
mod team;
4950
mod token;
5051
mod unhealthy_database;

src/tests/server_binary.rs

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
use crate::builders::CrateBuilder;
2+
use crate::util::{ChaosProxy, FreshSchema};
3+
use anyhow::Error;
4+
use cargo_registry::models::{NewUser, User};
5+
use diesel::prelude::*;
6+
use reqwest::blocking::{Client, Response};
7+
use std::collections::HashMap;
8+
use std::io::{BufRead, BufReader, Read};
9+
use std::process::{Child, Command, Stdio};
10+
use std::sync::{mpsc::Sender, Arc};
11+
use std::time::Duration;
12+
13+
const SERVER_BOOT_TIMEOUT_SECONDS: u64 = 30;
14+
15+
#[test]
16+
fn normal_startup() -> Result<(), Error> {
17+
let server_bin = ServerBin::prepare()?;
18+
initialize_dummy_crate(&server_bin.db()?);
19+
20+
let running_server = server_bin.start()?;
21+
22+
// Ensure the application correctly responds to download requests
23+
let resp = running_server.get("api/v1/crates/FOO/1.0.0/download")?;
24+
assert!(resp.status().is_redirection());
25+
assert!(resp
26+
.headers()
27+
.get("location")
28+
.unwrap()
29+
.to_str()?
30+
.ends_with("/crates/foo/foo-1.0.0.crate"));
31+
32+
Ok(())
33+
}
34+
35+
#[test]
36+
fn startup_without_database() -> Result<(), Error> {
37+
let server_bin = ServerBin::prepare()?;
38+
initialize_dummy_crate(&server_bin.db()?);
39+
40+
// Break the networking *before* starting the binary, to ensure the binary can fully startup
41+
// without a database connection. Most of crates.io should not work when started without a
42+
// database, but unconditional redirects will work.
43+
server_bin.chaosproxy.break_networking();
44+
45+
let running_server = server_bin.start()?;
46+
47+
// Ensure unconditional redirects work.
48+
let resp = running_server.get("api/v1/crates/FOO/1.0.0/download")?;
49+
assert!(resp.status().is_redirection());
50+
assert!(resp
51+
.headers()
52+
.get("location")
53+
.unwrap()
54+
.to_str()?
55+
.ends_with("/crates/FOO/FOO-1.0.0.crate"));
56+
57+
Ok(())
58+
}
59+
60+
fn initialize_dummy_crate(conn: &PgConnection) {
61+
use cargo_registry::schema::users;
62+
63+
let user: User = diesel::insert_into(users::table)
64+
.values(NewUser {
65+
gh_id: 0,
66+
gh_login: "user",
67+
..NewUser::default()
68+
})
69+
.get_result(conn)
70+
.expect("failed to create dummy user");
71+
72+
CrateBuilder::new("foo", user.id)
73+
.version("1.0.0")
74+
.build(conn)
75+
.expect("failed to create dummy crate");
76+
}
77+
78+
struct ServerBin {
79+
chaosproxy: Arc<ChaosProxy>,
80+
db_url: String,
81+
env: HashMap<String, String>,
82+
fresh_schema: FreshSchema,
83+
}
84+
85+
impl ServerBin {
86+
fn prepare() -> Result<Self, Error> {
87+
let mut env = dotenv::vars().collect::<HashMap<_, _>>();
88+
// Bind a random port every time the server is started.
89+
env.insert("PORT".into(), "0".into());
90+
// Avoid creating too many database connections.
91+
env.insert("DB_POOL_SIZE".into(), "2".into());
92+
env.remove("DB_MIN_SIZE");
93+
// Other configuration variables needed for the application to boot.
94+
env.insert("WEB_ALLOWED_ORIGINS".into(), "http://localhost:8888".into());
95+
env.insert(
96+
"SESSION_KEY".into(),
97+
std::iter::repeat('a').take(32).collect(),
98+
);
99+
env.insert("GH_CLIENT_ID".into(), String::new());
100+
env.insert("GH_CLIENT_SECRET".into(), String::new());
101+
102+
// Use a proxied fresh schema as the database url.
103+
let fresh_schema = FreshSchema::new(env.get("TEST_DATABASE_URL").unwrap());
104+
let (chaosproxy, db_url) = ChaosProxy::proxy_database_url(fresh_schema.database_url())?;
105+
env.remove("TEST_DATABASE_URL");
106+
env.insert("DATABASE_URL".into(), db_url.clone());
107+
env.insert("READ_ONLY_REPLICA_URL".into(), db_url.clone());
108+
109+
Ok(ServerBin {
110+
chaosproxy,
111+
db_url,
112+
env,
113+
fresh_schema,
114+
})
115+
}
116+
117+
fn db(&self) -> Result<PgConnection, Error> {
118+
Ok(PgConnection::establish(&self.db_url)?)
119+
}
120+
121+
fn start(self) -> Result<RunningServer, Error> {
122+
let mut process = Command::new(env!("CARGO_BIN_EXE_server"))
123+
.env_clear()
124+
.envs(self.env.into_iter())
125+
.stdout(Stdio::piped())
126+
.stderr(Stdio::piped())
127+
.spawn()?;
128+
129+
let (port_send, port_recv) = std::sync::mpsc::channel();
130+
stream_processor(process.stdout.take().unwrap(), "stdout", Some(port_send));
131+
stream_processor(process.stderr.take().unwrap(), "stderr", None);
132+
133+
// Possible causes for this to fail:
134+
// - the server binary failed to start
135+
// - the server binary requires a database connection now
136+
// - the server binary doesn't print "listening on port {port}" anymore
137+
let port: u16 = port_recv
138+
.recv_timeout(Duration::from_secs(SERVER_BOOT_TIMEOUT_SECONDS))
139+
.map_err(|_| anyhow::anyhow!("the server took too much time to initialize"))?
140+
.parse()?;
141+
142+
let http = Client::builder()
143+
.redirect(reqwest::redirect::Policy::none())
144+
.build()?;
145+
146+
Ok(RunningServer {
147+
process,
148+
port,
149+
http,
150+
_chaosproxy: self.chaosproxy,
151+
_fresh_schema: self.fresh_schema,
152+
})
153+
}
154+
}
155+
156+
struct RunningServer {
157+
process: Child,
158+
port: u16,
159+
http: Client,
160+
161+
// Keep these two items at the bottom in this order to drop everything in the correct order.
162+
_chaosproxy: Arc<ChaosProxy>,
163+
_fresh_schema: FreshSchema,
164+
}
165+
166+
impl RunningServer {
167+
fn get(&self, url: &str) -> Result<Response, Error> {
168+
Ok(self
169+
.http
170+
.get(format!("http://127.0.0.1:{}/{}", self.port, url))
171+
.header("User-Agent", "crates.io test suite")
172+
.send()?)
173+
}
174+
}
175+
176+
impl Drop for RunningServer {
177+
fn drop(&mut self) {
178+
self.process
179+
.kill()
180+
.expect("failed to kill the server binary");
181+
}
182+
}
183+
184+
fn stream_processor<R>(stream: R, kind: &'static str, port_send: Option<Sender<String>>)
185+
where
186+
R: Read + Send + 'static,
187+
{
188+
std::thread::spawn(move || {
189+
let stream = BufReader::new(stream);
190+
for line in stream.lines() {
191+
let line = match line {
192+
Ok(line) => line,
193+
// We receive an EOF when the process terminates
194+
Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => break,
195+
Err(err) => panic!("unexpected error while reading process {}: {}", kind, err),
196+
};
197+
198+
// If we expect the port number to be logged into this stream, look for it and send it
199+
// over the channel as soon as it's found.
200+
if let Some(port) = &port_send {
201+
if let Some(port_str) = line.strip_prefix("listening on port ") {
202+
port.send(port_str.into())
203+
.expect("failed to send the port to the test thread")
204+
}
205+
}
206+
207+
println!("[server {}] {}", kind, line);
208+
}
209+
});
210+
}

src/tests/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ mod fresh_schema;
3838
mod response;
3939
mod test_app;
4040

41+
pub(crate) use chaosproxy::ChaosProxy;
4142
pub(crate) use fresh_schema::FreshSchema;
4243
pub use response::Response;
4344
pub use test_app::TestApp;

src/tests/util/chaosproxy.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use anyhow::Error;
1+
use anyhow::{Context, Error};
22
use std::net::SocketAddr;
33
use std::sync::Arc;
44
use tokio::{
@@ -10,6 +10,7 @@ use tokio::{
1010
runtime::Runtime,
1111
sync::broadcast::Sender,
1212
};
13+
use url::Url;
1314

1415
pub(crate) struct ChaosProxy {
1516
address: SocketAddr,
@@ -51,8 +52,19 @@ impl ChaosProxy {
5152
Ok(instance)
5253
}
5354

54-
pub(crate) fn address(&self) -> SocketAddr {
55-
self.address
55+
pub(crate) fn proxy_database_url(url: &str) -> Result<(Arc<Self>, String), Error> {
56+
let mut db_url = Url::parse(url).context("failed to parse database url")?;
57+
let backend_addr = db_url
58+
.socket_addrs(|| Some(5432))
59+
.context("could not resolve database url")?
60+
.get(0)
61+
.copied()
62+
.ok_or_else(|| anyhow::anyhow!("the database url does not point to any IP"))?;
63+
64+
let instance = ChaosProxy::new(backend_addr).unwrap();
65+
db_url.set_ip_host(instance.address.ip()).unwrap();
66+
db_url.set_port(Some(instance.address.port())).unwrap();
67+
Ok((instance, db_url.into()))
5668
}
5769

5870
pub(crate) fn break_networking(&self) {

src/tests/util/test_app.rs

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -197,25 +197,10 @@ impl TestAppBuilder {
197197
// The schema will be cleared up once the app is dropped.
198198
let (db_chaosproxy, fresh_schema) = if !self.config.use_test_database_pool {
199199
let fresh_schema = FreshSchema::new(&self.config.db_primary_config.url);
200-
self.config.db_primary_config.url = fresh_schema.database_url().into();
201-
202-
let mut db_url =
203-
Url::parse(&self.config.db_primary_config.url).expect("invalid db url");
204-
let backend_addr = db_url
205-
.socket_addrs(|| Some(5432))
206-
.expect("could not resolve database url")
207-
.get(0)
208-
.copied()
209-
.expect("the database url does not point to any IP");
210-
211-
let db_chaosproxy = ChaosProxy::new(backend_addr).unwrap();
212-
db_url.set_ip_host(db_chaosproxy.address().ip()).unwrap();
213-
db_url
214-
.set_port(Some(db_chaosproxy.address().port()))
215-
.unwrap();
216-
self.config.db_primary_config.url = db_url.into();
200+
let (proxy, url) = ChaosProxy::proxy_database_url(fresh_schema.database_url()).unwrap();
201+
self.config.db_primary_config.url = url;
217202

218-
(Some(db_chaosproxy), Some(fresh_schema))
203+
(Some(proxy), Some(fresh_schema))
219204
} else {
220205
(None, None)
221206
};

0 commit comments

Comments
 (0)