Skip to content

Commit 233cd3d

Browse files
authored
BM-1854: fix(bento): aux worker cleanup/connection handling (#1294)
Various fixes/changes to bento cleanup: - 1ddbcfb: Env to allow turning off either cleanup cron job (if any issues in future or if intending to manually handle db maintenance) - afcc428: Increase aux max client connections default to handle the 4 concurrent tasks. All other workers are sequentially processing so this should be sufficient for now - 2bd9755: to avoid running on startup to avoid issues with the spinning on startup where all try to connect at one time Separated into diff commits if we only want a subset of these changes
1 parent 6ffff5a commit 233cd3d

File tree

3 files changed

+52
-39
lines changed

3 files changed

+52
-39
lines changed

bento/crates/workflow/src/lib.rs

Lines changed: 50 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,14 @@ pub struct Args {
161161
/// How often to clean up completed jobs
162162
#[clap(env, long, default_value_t = 60 * 60)]
163163
cleanup_poll_interval: u64,
164+
165+
/// Disable cron to clean up completed jobs in taskdb.
166+
#[clap(env = "BENTO_DISABLE_COMPLETED_CLEANUP")]
167+
disable_completed_cleanup: bool,
168+
169+
/// Disable cron to clean up stuck tasks in taskdb.
170+
#[clap(env = "BENTO_DISABLE_STUCK_TASK_CLEANUP")]
171+
disable_stuck_task_cleanup: bool,
164172
}
165173

166174
/// Core agent context to hold all optional clients / pools and state
@@ -283,42 +291,46 @@ impl Agent {
283291

284292
// Enable stuck task maintenance for aux workers
285293
if self.args.task_stream == AUX_WORK_TYPE {
286-
let term_sig_copy = term_sig.clone();
287-
let db_pool_copy = self.db_pool.clone();
288-
let stuck_tasks_interval = self.args.stuck_tasks_poll_interval;
289-
tokio::spawn(async move {
290-
loop {
291-
if let Err(e) = Self::poll_for_stuck_tasks(
292-
term_sig_copy.clone(),
293-
db_pool_copy.clone(),
294-
stuck_tasks_interval,
295-
)
296-
.await
297-
{
298-
tracing::error!("[BENTO-WF-105] Stuck tasks cleanup failed: {:#}", e);
299-
time::sleep(time::Duration::from_secs(60)).await;
294+
if !self.args.disable_stuck_task_cleanup {
295+
let term_sig_copy = term_sig.clone();
296+
let db_pool_copy = self.db_pool.clone();
297+
let stuck_tasks_interval = self.args.stuck_tasks_poll_interval;
298+
tokio::spawn(async move {
299+
loop {
300+
if let Err(e) = Self::poll_for_stuck_tasks(
301+
term_sig_copy.clone(),
302+
db_pool_copy.clone(),
303+
stuck_tasks_interval,
304+
)
305+
.await
306+
{
307+
tracing::error!("[BENTO-WF-105] Stuck tasks cleanup failed: {:#}", e);
308+
time::sleep(time::Duration::from_secs(60)).await;
309+
}
300310
}
301-
}
302-
});
311+
});
312+
}
303313

304314
// Enable completed job cleanup for aux workers
305-
let term_sig_copy = term_sig.clone();
306-
let db_pool_copy = self.db_pool.clone();
307-
let cleanup_interval = self.args.cleanup_poll_interval;
308-
tokio::spawn(async move {
309-
loop {
310-
if let Err(e) = Self::poll_for_completed_job_cleanup(
311-
term_sig_copy.clone(),
312-
db_pool_copy.clone(),
313-
cleanup_interval,
314-
)
315-
.await
316-
{
317-
tracing::error!("[BENTO-WF-106] Completed job cleanup failed: {:#}", e);
318-
time::sleep(time::Duration::from_secs(cleanup_interval)).await;
315+
if !self.args.disable_completed_cleanup {
316+
let term_sig_copy = term_sig.clone();
317+
let db_pool_copy = self.db_pool.clone();
318+
let cleanup_interval = self.args.cleanup_poll_interval;
319+
tokio::spawn(async move {
320+
loop {
321+
if let Err(e) = Self::poll_for_completed_job_cleanup(
322+
term_sig_copy.clone(),
323+
db_pool_copy.clone(),
324+
cleanup_interval,
325+
)
326+
.await
327+
{
328+
tracing::error!("[BENTO-WF-106] Completed job cleanup failed: {:#}", e);
329+
time::sleep(time::Duration::from_secs(cleanup_interval)).await;
330+
}
319331
}
320-
}
321-
});
332+
});
333+
}
322334
}
323335

324336
while !term_sig.load(Ordering::Relaxed) {
@@ -512,6 +524,9 @@ impl Agent {
512524
poll_interval: u64,
513525
) -> Result<()> {
514526
while !term_sig.load(Ordering::Relaxed) {
527+
// Sleep before each check to avoid running on startup
528+
time::sleep(tokio::time::Duration::from_secs(poll_interval)).await;
529+
515530
tracing::debug!("Checking for stuck pending tasks...");
516531

517532
// First check if there are any stuck tasks
@@ -537,9 +552,6 @@ impl Agent {
537552
tracing::info!("Fixed {} stuck pending tasks", fixed_count);
538553
}
539554
}
540-
541-
// Sleep before next check
542-
time::sleep(tokio::time::Duration::from_secs(poll_interval)).await;
543555
}
544556

545557
Ok(())
@@ -555,15 +567,15 @@ impl Agent {
555567
poll_interval: u64,
556568
) -> Result<()> {
557569
while !term_sig.load(Ordering::Relaxed) {
570+
// Sleep before each check to avoid running on startup
571+
time::sleep(tokio::time::Duration::from_secs(poll_interval)).await;
572+
558573
tracing::debug!("Cleaning up completed jobs...");
559574

560575
let cleared_count = taskdb::clear_completed_jobs(&db_pool).await?;
561576
if cleared_count > 0 {
562577
tracing::info!("Cleared {} completed jobs", cleared_count);
563578
}
564-
565-
// Sleep before next cleanup
566-
time::sleep(tokio::time::Duration::from_secs(poll_interval)).await;
567579
}
568580

569581
Ok(())

bento/crates/workflow/src/redis.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
// as found in the LICENSE-BSL file.
55

66
use anyhow::{Context, Result};
7-
use deadpool_redis::Connection;
87
pub use deadpool_redis::{Config, Pool as RedisPool, Runtime, redis::AsyncCommands};
98
use redis::{RedisResult, ToRedisArgs};
109

compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ services:
178178
cpus: 1
179179
environment:
180180
<<: *base-environment
181+
# Default to 4 max client connections if not set (work, poll for requeue, completed cleanup, stuck tasks cleanup)
182+
DB_MAX_CONNECTIONS: ${DB_MAX_CONNECTIONS:-4}
181183

182184
entrypoint: /app/agent -t aux --monitor-requeue --redis-ttl ${REDIS_TTL:-57600}
183185

0 commit comments

Comments
 (0)