-
Notifications
You must be signed in to change notification settings - Fork 0
Closed
Labels
area/distributedDistributed coordination and TiKVDistributed coordination and TiKVpriority/highHigh priorityHigh prioritysize/SSmall: 1-2 daysSmall: 1-2 daystype/featureNew feature or functionalityNew feature or functionality
Description
Summary
Implement graceful shutdown for distributed workers to properly release jobs and clean up state on SIGTERM/SIGINT.
Priority: HIGH π₯
READY TO START - Dependencies met. This enables K8s deployment (#18).
Dependencies
- β Worker Loop - COMPLETE
- β Heartbeat - COMPLETE
- β LerobotWriter Integration ([Phase 1] Integrate LerobotWriter with Worker.process_job()Β #72) - COMPLETE
- β Checkpoint Save ([Phase 1] Add checkpoint save during pipeline processingΒ #73) - COMPLETE
Enables
- β‘οΈ [Phase 9.1] Implement long-running Worker Deployment [READY TO START]Β #18 (K8s Worker Deployment)
- β‘οΈ [Phase 6.2] Create container image and Helm chart [BLOCKED by #18]Β #20 (Container Image and Helm Chart)
Design
Shutdown Sequence
- Receive SIGTERM or SIGINT
- Set shutdown flag
- Stop accepting new jobs
- Complete current checkpoint (not entire job)
- Release job back to Pending
- Clear heartbeat
- Exit cleanly
Timeout
- Force exit after 30 seconds
- Log warning if forced
Tasks
1. Define Shutdown Handler
- Create
crates/roboflow-distributed/src/shutdown.rs - Define
ShutdownHandler:pub struct ShutdownHandler { shutdown_flag: Arc<AtomicBool>, shutdown_tx: broadcast::Sender<()>, }
- Signal registration for SIGTERM, SIGINT
2. Implement Signal Handler
- Use
tokio::signalcrate - On signal:
- Set shutdown_flag
- Send to shutdown_tx channel
- Log "Shutdown requested"
3. Integrate with Worker Loop
- Check shutdown_flag in main loop:
loop { if shutdown.is_requested() { break; } // ... claim and process jobs }
- Don't claim new jobs after shutdown
4. Integrate with Pipeline
- Add shutdown check to ProgressCallback
- On shutdown during processing:
- Complete current batch (checkpoint boundary)
- Save checkpoint
- Return early with Interrupted error
5. Implement Job Release
- On shutdown while processing:
- Save final checkpoint
- Update job: status=Pending, owner=null
- Preserve checkpoint for next worker
- Log job release
6. Implement Heartbeat Cleanup
- On shutdown:
- Stop heartbeat thread
- Delete
/heartbeat/{pod_id}key
- Prevent false zombie detection
7. Implement Timeout
- Start timeout timer on shutdown signal
- After 30 seconds: force exit
- Log warning with current state
8. K8s Integration
- K8s sends SIGTERM first
- Then waits terminationGracePeriodSeconds
- Then SIGKILL
- Set terminationGracePeriodSeconds >= 35s
Files to Create
crates/roboflow-distributed/src/shutdown.rs
Files to Modify
crates/roboflow-distributed/src/worker.rscrates/roboflow-distributed/src/heartbeat.rscrates/roboflow-dataset/src/streaming/converter.rs(add shutdown check callback)
Acceptance Criteria
- Signal handler registered
- Shutdown flag propagates
- Worker stops accepting jobs
- Pipeline exits at checkpoint boundary
- Job released to Pending
- Heartbeat cleaned up
- Timeout forces exit
- Integration test: send SIGTERM during processing
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
area/distributedDistributed coordination and TiKVDistributed coordination and TiKVpriority/highHigh priorityHigh prioritysize/SSmall: 1-2 daysSmall: 1-2 daystype/featureNew feature or functionalityNew feature or functionality