Skip to content

Commit c8a461f

Browse files
committed
Auto merge of #2328 - jtgeibel:revert-2269, r=jtgeibel
Revert #2269 This is to make master deployable. The original PR caused issues with running background jobs. This reverts commit b63607e, reversing changes made to dc18552. r? @ghost cc @sgrif #2269
2 parents 89a6419 + 8b4e769 commit c8a461f

File tree

12 files changed

+74
-36
lines changed

12 files changed

+74
-36
lines changed

Cargo.lock

Lines changed: 5 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ dotenv = "0.15"
4848
toml = "0.4"
4949
diesel = { version = "1.4.0", features = ["postgres", "serde_json", "chrono", "r2d2"] }
5050
diesel_full_text_search = "1.0.0"
51-
swirl = { git = "https://github.com/sgrif/swirl.git", rev = "6ef8c4cd" }
51+
swirl = { git = "https://github.com/sgrif/swirl.git", rev = "de5d8bb" }
5252
serde_json = "1.0.0"
5353
serde = { version = "1.0.0", features = ["derive"] }
5454
chrono = { version = "0.4.0", features = ["serde"] }

src/background_jobs.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ impl swirl::db::DieselPool for DieselPool {
2424
#[allow(missing_debug_implementations)]
2525
pub struct Environment {
2626
index: Arc<Mutex<Repository>>,
27+
// FIXME: https://github.com/sfackler/r2d2/pull/70
28+
pub connection_pool: AssertUnwindSafe<DieselPool>,
2729
pub uploader: Uploader,
2830
http_client: AssertUnwindSafe<Client>,
2931
}
@@ -34,29 +36,46 @@ impl Clone for Environment {
3436
fn clone(&self) -> Self {
3537
Self {
3638
index: self.index.clone(),
39+
connection_pool: AssertUnwindSafe(self.connection_pool.0.clone()),
3740
uploader: self.uploader.clone(),
3841
http_client: AssertUnwindSafe(self.http_client.0.clone()),
3942
}
4043
}
4144
}
4245

4346
impl Environment {
44-
pub fn new(index: Repository, uploader: Uploader, http_client: Client) -> Self {
45-
Self::new_shared(Arc::new(Mutex::new(index)), uploader, http_client)
47+
pub fn new(
48+
index: Repository,
49+
connection_pool: DieselPool,
50+
uploader: Uploader,
51+
http_client: Client,
52+
) -> Self {
53+
Self::new_shared(
54+
Arc::new(Mutex::new(index)),
55+
connection_pool,
56+
uploader,
57+
http_client,
58+
)
4659
}
4760

4861
pub fn new_shared(
4962
index: Arc<Mutex<Repository>>,
63+
connection_pool: DieselPool,
5064
uploader: Uploader,
5165
http_client: Client,
5266
) -> Self {
5367
Self {
5468
index,
69+
connection_pool: AssertUnwindSafe(connection_pool),
5570
uploader,
5671
http_client: AssertUnwindSafe(http_client),
5772
}
5873
}
5974

75+
pub fn connection(&self) -> Result<DieselPooledConn<'_>, PoolError> {
76+
self.connection_pool.get()
77+
}
78+
6079
pub fn lock_index(&self) -> Result<MutexGuard<'_, Repository>, PerformError> {
6180
let repo = self.index.lock().unwrap_or_else(PoisonError::into_inner);
6281
repo.reset_head()?;

src/bin/background-worker.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ fn main() {
2424
println!("Booting runner");
2525

2626
let config = cargo_registry::Config::default();
27-
let db_url = db::connection_url(&config.db_url);
2827

2928
let job_start_timeout = dotenv::var("BACKGROUND_JOB_TIMEOUT")
3029
.unwrap_or_else(|_| "30".into())
@@ -40,11 +39,22 @@ fn main() {
4039
println!("Index cloned");
4140

4241
let build_runner = || {
43-
let environment =
44-
Environment::new_shared(repository.clone(), config.uploader.clone(), Client::new());
45-
let db_config = r2d2::Pool::builder().min_idle(Some(0));
46-
swirl::Runner::builder(environment)
47-
.connection_pool_builder(&db_url, db_config)
42+
// 2x the thread pool size -- not all our jobs need a DB connection,
43+
// but we want to always be able to run our jobs in parallel, rather
44+
// than adjusting based on how many concurrent jobs need a connection.
45+
// Eventually swirl will do this for us, and this will be the default
46+
// -- we should just let it do a thread pool size of CPU count, and a
47+
// a connection pool size of 2x that when that lands.
48+
let db_config = r2d2::Pool::builder().max_size(4);
49+
let db_pool = db::diesel_pool(&config.db_url, config.env, db_config);
50+
let environment = Environment::new_shared(
51+
repository.clone(),
52+
db_pool.clone(),
53+
config.uploader.clone(),
54+
Client::new(),
55+
);
56+
swirl::Runner::builder(db_pool, environment)
57+
.thread_count(2)
4858
.job_start_timeout(Duration::from_secs(job_start_timeout))
4959
.build()
5060
};

src/controllers/krate/publish.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,8 @@ pub fn publish(req: &mut dyn Request) -> AppResult<Response> {
183183
.unwrap_or_else(|| String::from("README.md")),
184184
repo,
185185
)
186-
.enqueue(&conn)?;
186+
.enqueue(&conn)
187+
.map_err(|e| AppError::from_std_error(e))?;
187188
}
188189

189190
let cksum = app
@@ -203,7 +204,9 @@ pub fn publish(req: &mut dyn Request) -> AppResult<Response> {
203204
yanked: Some(false),
204205
links,
205206
};
206-
git::add_crate(git_crate).enqueue(&conn)?;
207+
git::add_crate(git_crate)
208+
.enqueue(&conn)
209+
.map_err(|e| AppError::from_std_error(e))?;
207210

208211
// The `other` field on `PublishWarnings` was introduced to handle a temporary warning
209212
// that is no longer needed. As such, crates.io currently does not return any `other`

src/controllers/version/yank.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ fn modify_yank(req: &mut dyn Request, yanked: bool) -> AppResult<Response> {
4444

4545
insert_version_owner_action(&conn, version.id, user.id, ids.api_token_id(), action)?;
4646

47-
git::yank(krate.name, version, yanked).enqueue(&conn)?;
47+
git::yank(krate.name, version, yanked)
48+
.enqueue(&conn)
49+
.map_err(|e| AppError::from_std_error(e))?;
4850

4951
ok_true()
5052
}

src/db.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,25 +63,22 @@ pub fn connect_now() -> ConnectionResult<PgConnection> {
6363
PgConnection::establish(&url.to_string())
6464
}
6565

66-
pub fn connection_url(url: &str) -> String {
67-
let mut url = Url::parse(url).expect("Invalid database URL");
68-
if dotenv::var("HEROKU").is_ok() && !url.query_pairs().any(|(k, _)| k == "sslmode") {
69-
url.query_pairs_mut().append_pair("sslmode", "require");
70-
}
71-
url.into_string()
72-
}
73-
7466
pub fn diesel_pool(
7567
url: &str,
7668
env: Env,
7769
config: r2d2::Builder<ConnectionManager<PgConnection>>,
7870
) -> DieselPool {
79-
let url = connection_url(url);
71+
let mut url = Url::parse(url).expect("Invalid database URL");
72+
if dotenv::var("HEROKU").is_ok() && !url.query_pairs().any(|(k, _)| k == "sslmode") {
73+
url.query_pairs_mut().append_pair("sslmode", "require");
74+
}
75+
8076
if env == Env::Test {
81-
let conn = PgConnection::establish(&url).expect("failed to establish connection");
77+
let conn =
78+
PgConnection::establish(&url.into_string()).expect("failed to establish connection");
8279
DieselPool::test_conn(conn)
8380
} else {
84-
let manager = ConnectionManager::new(url);
81+
let manager = ConnectionManager::new(url.into_string());
8582
DieselPool::Pool(config.build(manager).unwrap())
8683
}
8784
}

src/git.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,6 @@ pub fn add_crate(env: &Environment, krate: Crate) -> Result<(), PerformError> {
289289
/// push the changes.
290290
#[swirl::background_job]
291291
pub fn yank(
292-
conn: &PgConnection,
293292
env: &Environment,
294293
krate: String,
295294
version: Version,
@@ -300,6 +299,8 @@ pub fn yank(
300299
let repo = env.lock_index()?;
301300
let dst = repo.index_file(&krate);
302301

302+
let conn = env.connection()?;
303+
303304
conn.transaction(|| {
304305
let yanked_in_db = versions::table
305306
.find(version.id)

src/render.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,6 @@ pub fn readme_to_html(text: &str, filename: &str, base_url: Option<&str>) -> Str
222222

223223
#[swirl::background_job]
224224
pub fn render_and_upload_readme(
225-
conn: &PgConnection,
226225
env: &Environment,
227226
version_id: i32,
228227
text: String,
@@ -233,6 +232,7 @@ pub fn render_and_upload_readme(
233232
use diesel::prelude::*;
234233

235234
let rendered = readme_to_html(&text, &file_name, base_url.as_deref());
235+
let conn = env.connection()?;
236236

237237
conn.transaction(|| {
238238
Version::record_readme_rendering(version_id, &conn)?;

src/tasks/update_downloads.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::{
2+
background_jobs::Environment,
23
models::VersionDownload,
34
schema::{crates, metadata, version_downloads, versions},
45
};
@@ -7,7 +8,8 @@ use diesel::prelude::*;
78
use swirl::PerformError;
89

910
#[swirl::background_job]
10-
pub fn update_downloads(conn: &PgConnection) -> Result<(), PerformError> {
11+
pub fn update_downloads(env: &Environment) -> Result<(), PerformError> {
12+
let conn = env.connection()?;
1113
update(&conn)?;
1214
Ok(())
1315
}

src/tests/util.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ impl Drop for TestAppInner {
7474
// Lazily run any remaining jobs
7575
if let Some(runner) = &self.runner {
7676
runner.run_all_pending_jobs().expect("Could not run jobs");
77-
runner.check_for_failed_jobs().expect("Failed jobs remain");
77+
runner.assert_no_failed_jobs().expect("Failed jobs remain");
7878
}
7979

8080
// Manually verify that all jobs have completed successfully
@@ -189,7 +189,7 @@ impl TestApp {
189189

190190
runner.run_all_pending_jobs().expect("Could not run jobs");
191191
runner
192-
.check_for_failed_jobs()
192+
.assert_no_failed_jobs()
193193
.expect("Could not determine if jobs failed");
194194
}
195195

@@ -220,23 +220,24 @@ impl TestAppBuilder {
220220
let (app, middle) = crate::build_app(self.config, self.proxy);
221221

222222
let runner = if self.build_job_runner {
223+
let connection_pool = app.primary_database.clone();
223224
let repository_config = RepositoryConfig {
224225
index_location: Url::from_file_path(&git::bare()).unwrap(),
225226
credentials: Credentials::Missing,
226227
};
227228
let index = WorkerRepository::open(&repository_config).expect("Could not clone index");
228229
let environment = Environment::new(
229230
index,
231+
connection_pool.clone(),
230232
app.config.uploader.clone(),
231233
app.http_client().clone(),
232234
);
233235

234236
Some(
235-
Runner::builder(environment)
237+
Runner::builder(connection_pool, environment)
236238
// We only have 1 connection in tests, so trying to run more than
237239
// 1 job concurrently will just block
238240
.thread_count(1)
239-
.connection_pool(app.primary_database.clone())
240241
.job_start_timeout(Duration::from_secs(5))
241242
.build(),
242243
)

src/util/errors.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ impl dyn AppError {
9797
self.get_type_id() == TypeId::of::<T>()
9898
}
9999

100+
pub fn from_std_error(err: Box<dyn Error + Send>) -> Box<dyn AppError> {
101+
Self::try_convert(&*err).unwrap_or_else(|| internal(&err))
102+
}
103+
100104
fn try_convert(err: &(dyn Error + Send + 'static)) -> Option<Box<Self>> {
101105
match err.downcast_ref() {
102106
Some(DieselError::NotFound) => Some(Box::new(NotFound)),

0 commit comments

Comments
 (0)