-
Notifications
You must be signed in to change notification settings - Fork 0
Closed
Labels
area/distributedDistributed coordination and TiKVDistributed coordination and TiKVarea/lerobotLeRobot dataset formatLeRobot dataset formatpriority/criticalMust be done first, blocks other workMust be done first, blocks other worksize/LLarge: 1-2 weeksLarge: 1-2 weekstype/featureNew feature or functionalityNew feature or functionality
Description
Summary
Complete the Worker.process_job() implementation by integrating the LerobotWriter from roboflow-dataset with the distributed worker infrastructure.
Context
The current Worker.process_job() in crates/roboflow-distributed/src/worker.rs (line 475-531) is a placeholder that simulates processing with a 100ms sleep. This blocks all distributed data transformation functionality.
Reference: DISTRIBUTED_DESIGN.md - Phase 1: Pipeline Integration
Tasks
- Download input file from S3/OSS using storage backend
- Initialize
LerobotWriterwith storage backend and job configuration - Create streaming reader for bag/MCAP file
- Process frames through the alignment pipeline
- Write frames to LerobotWriter
- Call
writer.finalize()to complete episode - Upload outputs to S3/OSS
Implementation Notes
// In crates/roboflow-distributed/src/worker.rs
async fn process_job(&self, job: &JobRecord) -> ProcessingResult {
// 1. Create storage-aware reader
let reader = self.create_streaming_reader(&job.source_key).await?;
// 2. Initialize LeRobot writer with storage
let writer = LerobotWriter::new(
self.storage.clone(),
self.build_output_prefix(job),
self.local_buffer_path(),
LerobotConfig::from_job(job),
)?;
// 3. Process frames
for frame in reader.frames() {
writer.write_frame(&frame)?;
}
// 4. Finalize
writer.finalize()?;
ProcessingResult::Success
}Dependencies
- Requires: [Phase 7.1] Integrate pipeline with checkpoint hooks [PARTIALLY COMPLETE] #47 (Checkpoint hooks) for production use
- Blocks: All distributed processing functionality
Acceptance Criteria
- Worker can process a bag/MCAP file from S3 and produce LeRobot v2.1 output
- Output is uploaded to configured S3/OSS bucket
- Job status transitions correctly (Processing → Complete)
- Integration test passes with local storage
Related
- Design: DISTRIBUTED_DESIGN.md
- Alignment: ROADMAP_ALIGNMENT.md
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
area/distributedDistributed coordination and TiKVDistributed coordination and TiKVarea/lerobotLeRobot dataset formatLeRobot dataset formatpriority/criticalMust be done first, blocks other workMust be done first, blocks other worksize/LLarge: 1-2 weeksLarge: 1-2 weekstype/featureNew feature or functionalityNew feature or functionality