Skip to content

[Phase 1] Integrate LerobotWriter with Worker.process_job() #72

@zhexuany

Description

@zhexuany

Summary

Complete the Worker.process_job() implementation by integrating the LerobotWriter from roboflow-dataset with the distributed worker infrastructure.

Context

The current Worker.process_job() in crates/roboflow-distributed/src/worker.rs (line 475-531) is a placeholder that simulates processing with a 100ms sleep. This blocks all distributed data transformation functionality.

Reference: DISTRIBUTED_DESIGN.md - Phase 1: Pipeline Integration

Tasks

  • Download input file from S3/OSS using storage backend
  • Initialize LerobotWriter with storage backend and job configuration
  • Create streaming reader for bag/MCAP file
  • Process frames through the alignment pipeline
  • Write frames to LerobotWriter
  • Call writer.finalize() to complete episode
  • Upload outputs to S3/OSS

Implementation Notes

// In crates/roboflow-distributed/src/worker.rs
async fn process_job(&self, job: &JobRecord) -> ProcessingResult {
    // 1. Create storage-aware reader
    let reader = self.create_streaming_reader(&job.source_key).await?;
    
    // 2. Initialize LeRobot writer with storage
    let writer = LerobotWriter::new(
        self.storage.clone(),
        self.build_output_prefix(job),
        self.local_buffer_path(),
        LerobotConfig::from_job(job),
    )?;
    
    // 3. Process frames
    for frame in reader.frames() {
        writer.write_frame(&frame)?;
    }
    
    // 4. Finalize
    writer.finalize()?;
    
    ProcessingResult::Success
}

Dependencies

Acceptance Criteria

  • Worker can process a bag/MCAP file from S3 and produce LeRobot v2.1 output
  • Output is uploaded to configured S3/OSS bucket
  • Job status transitions correctly (Processing → Complete)
  • Integration test passes with local storage

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    area/distributedDistributed coordination and TiKVarea/lerobotLeRobot dataset formatpriority/criticalMust be done first, blocks other worksize/LLarge: 1-2 weekstype/featureNew feature or functionality

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions