A reproducible, high-throughput, distributed open-source pipeline for processing web-scale (hundreds of billions) multimodal datasets. Built on Ray with Rust-accelerated and GPU-optimized operators for ablation, scoring, and deduplication at scale.
Reproduce SOTA foundation model data pipelines β from rule-based to model-based, spanning text, image, and multimodal data.
| Pipeline | Paper | Status |
|---|---|---|
| FineWeb | 15T tokens, quality filtering | π§ In Progress |
| FineWeb-Edu | Educational content classifier | π§ In Progress |
| RefinedWeb | URL filtering, trafilatura, dedup | β URL Filter |
| DCLM | Data curation for LLMs | π Planned |
| Dolma | Open corpus toolkit | π Planned |
| RedPajama-V2 | 30T tokens, quality signals | π Planned |
| Pipeline | Paper | Status |
|---|---|---|
| Z-Image | Image generation foundation model | β Implemented |
| Imagen 3 | Image quality & AIGC detection | β Implemented |
| LAION-5B | CLIP filtering, dedup | β Implemented |
| DataComp | CLIP/SigLIP filtering | β Implemented |
| Qwen-VL | Vision-language data | π§ In Progress |
| Seed1.5-VL | Vision-language reasoning | π Planned |
| HoneyBee | Data recipes for VL reasoners | π Planned |
| Cosmos | World model platform | π Planned |
| Pipeline | Paper | Status |
|---|---|---|
| Panda-70M | Video captioning | π Planned |
| InternVid | Video-language | π Planned |
| OpenVid-1M | Video generation | π Planned |
https://huggingface.co/spaces/classtag/mega-data-factory-reports This space contains interactive HTML reports for pipeline runs, showcasing metrics, visualizations, and performance statistics.
# Clone the repository
git clone https://github.com/duoan/mega-data-factory.git
cd mega-data-factory
# Install with Rust acceleration (recommended)
uv pip install -e .
# Or install without Rust (pure Python fallback)
uv syncRequires Rust toolchain for building accelerated operators. Install via rustup.
# Run pipeline with config
mdf run --config configs/z_image.yaml
# Or with options
mdf run -c configs/z_image.yaml --max-samples 1000 --batch-size 500π¦ = Rust Accelerated | π₯οΈ = GPU Optimized
| Loader | Description | Features |
|---|---|---|
HuggingFaceLoader |
Load from HuggingFace datasets | Streaming, sharding |
CommonCrawlLoader |
Load from CommonCrawl WARC files | π¦ Rust text extraction, distributed |
Filters (rule-based, from RefinedWeb):
| Operator | Description | Reference |
|---|---|---|
URLFilter |
Domain blocklist, URL word scoring, quality source exclusion | RefinedWeb Β§G.1 |
TextLengthFilter |
Filter by character/word count | FineWeb, RefinedWeb |
Deduplicators:
| Operator | Description |
|---|---|
TextExactDeduplicator |
Exact content hash deduplication (xxhash/MD5) |
Coming Soon:
LanguageFilter- fastText language detectionPerplexityFilter- KenLM perplexity scoringRepetitionFilter- n-gram repetition detectionQualityClassifierFilter- Model-based quality (FineWeb-Edu style)MinHashDeduplicator- Near-duplicate detection
Refiners (enrich records with new fields):
| Operator | Description | Acceleration |
|---|---|---|
ImageMetadataRefiner |
Width, height, format, file size | CPU |
ImageTechnicalQualityRefiner |
Compression artifacts, entropy | π¦ Rust |
ImageVisualDegradationsRefiner |
Color cast, blur, watermark, noise | CPU |
ImageClipEmbeddingRefiner |
CLIP embeddings (OpenCLIP) | π₯οΈ GPU |
ImageSigLIPEmbeddingRefiner |
SigLIP2 embeddings | π₯οΈ GPU |
ImageAestheticQualityRefiner |
Aesthetic score (CLIP-based) | CPU |
ImageAIGCDetectorRefiner |
AI-generated image detection | CPU |
Filters:
| Operator | Description |
|---|---|
ImageQualityFilter |
Filter by size, quality metrics, aesthetic score |
Deduplicators:
| Operator | Description | Acceleration |
|---|---|---|
ImagePhashDeduplicator |
Perceptual hash deduplication | π¦ Rust |
| Writer | Description |
|---|---|
ParquetDataWriter |
Write to Parquet files |
IcebergDataWriter |
Write to Apache Iceberg tables |
Deep Dive: See docs/ARCHITECTURE.md for a comprehensive explanation of the distributed pipeline-parallel design, including ObjectRef chaining, backpressure control, bucketed deduplication, and theoretical scalability analysis.
%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#4f46e5', 'primaryTextColor': '#fff', 'primaryBorderColor': '#6366f1', 'lineColor': '#a5b4fc', 'secondaryColor': '#1e1b4b', 'tertiaryColor': '#312e81', 'background': '#0f0f23', 'mainBkg': '#1e1b4b', 'nodeBorder': '#6366f1', 'clusterBkg': '#1e1b4b', 'clusterBorder': '#6366f1', 'titleColor': '#e0e7ff', 'edgeLabelBackground': '#312e81'}}}%%
flowchart TB
subgraph Driver["Ray Driver"]
Config[Config]
Executor[Executor]
Progress[Stats]
end
subgraph ObjectStore["Object Store"]
Batches["Shared Memory"]
end
subgraph Stage0["CPU Pool Γ8"]
direction LR
W0["W0"]
W1["W1"]
W2["W2"]
Wn["..."]
W7["W7"]
end
subgraph Stage1["GPU Pool Γ2"]
direction LR
GPU0["GPU0"]
GPU1["GPU1"]
end
subgraph Output["Output"]
Writer[Parquet]
end
HF["HuggingFace"] --> Driver
Driver --> ObjectStore
ObjectStore --> Stage0
Stage0 --> ObjectStore
ObjectStore --> Stage1
Stage1 --> Writer
%%{init: {'theme': 'base', 'themeVariables': { 'primaryColor': '#059669', 'primaryTextColor': '#fff', 'primaryBorderColor': '#10b981', 'lineColor': '#6ee7b7', 'secondaryColor': '#064e3b', 'tertiaryColor': '#065f46', 'background': '#0f0f23', 'mainBkg': '#064e3b', 'nodeBorder': '#10b981', 'clusterBkg': '#064e3b', 'clusterBorder': '#10b981'}}}%%
flowchart LR
subgraph Input["Batches"]
B0["B0"] & B1["B1"] & B2["B2"] & B3["B3"]
B4["B4"] & B5["B5"] & B6["B6"] & B7["B7"]
end
subgraph CPU["CPU Pool Γ8 workers"]
C0["C0 π¦"] & C1["C1 π¦"] & C2["C2 π¦"] & C3["C3 π¦"]
C4["C4 π¦"] & C5["C5 π¦"] & C6["C6 π¦"] & C7["C7 π¦"]
end
subgraph GPU["GPU Pool Γ2 workers"]
G0["G0 CLIP"]
G1["G1 CLIP"]
end
B0 --> C0
B1 --> C1
B2 --> C2
B3 --> C3
B4 --> C4
B5 --> C5
B6 --> C6
B7 --> C7
C0 & C1 & C2 & C3 --> G0
C4 & C5 & C6 & C7 --> G1
%%{init: {'theme': 'dark'}}%%
sequenceDiagram
participant D as Driver
participant OS as ObjectStore
participant CPU as CPU Γ8
participant GPU as GPU Γ2
participant W as Writer
D->>OS: Submit batches
par CPU Processing
OS->>CPU: Batch 0-7
end
CPU->>OS: Processed
par GPU Processing
OS->>GPU: Batch 0-7
end
GPU->>W: Write Parquet
W->>D: Done
%%{init: {'theme': 'dark'}}%%
gantt
title Batch Processing Timeline
dateFormat X
axisFormat %s
section CPU-0
B0 :c0, 0, 2
B8 :c0b, 8, 2
section CPU-1
B1 :c1, 0, 2
B9 :c1b, 8, 2
section CPU-7
B7 :c7, 0, 2
B15 :c7b, 8, 2
section GPU-0
B0 :g0a, 2, 3
B2 :g0b, 5, 3
section GPU-1
B1 :g1a, 2, 3
B3 :g1b, 5, 3
Key Points:
- CPU Pool: 8 workers for metadata, quality (π¦ Rust), filtering, dedup
- GPU Pool: 2 workers for CLIP embeddings (limited by VRAM)
- Load Balancing: Ray auto-distributes batches to idle workers
# configs/example_commoncrawl.yaml
# RefinedWeb-style text extraction pipeline
data_loader:
type: CommonCrawlLoader
params:
crawl_id: "CC-MAIN-2024-51"
num_workers: 16 # Distributed WARC loading
stages:
- name: content_filtering
operators:
# RefinedWeb Β§G.1: URL filtering
- name: url_filter
params:
url_field: "url"
score_threshold: 0.5
exclude_quality_sources: true # Exclude Wikipedia, arXiv, etc.
# Length filtering
- name: text_length_filter
params:
min_length: 100
max_length: 100000
text_field: "text"
# Exact deduplication (RefinedWeb Β§G.3)
- name: text_exact_deduplicator
params:
include_url: true # URL+content dedup
worker:
min_replicas: 2
max_replicas: 8
data_writer:
type: ParquetDataWriter
params:
output_path: "./output/commoncrawl"
executor:
max_samples: 1000000
batch_size: 1000
metrics:
enabled: true
generate_report: true# configs/z_image.yaml
# Image quality + aesthetic + AIGC detection pipeline
data_loader:
type: HuggingFaceLoader
params:
dataset_name: "jp1924/Laion400m-1"
split: "train"
streaming: true
stages:
# Stage 1: Basic metadata and quality (CPU, Rust-accelerated)
- name: basic_stage
operators:
- name: image_metadata_refiner
- name: image_technical_quality_refiner # π¦ Rust
- name: image_quality_filter
params:
min_width: 128
min_height: 128
max_compression_artifacts: 0.8
- name: image_phash_deduplicator # π¦ Rust
worker:
min_replicas: 2
max_replicas: 8
resources:
cpu: 1
# Stage 2: Embedding extraction (GPU)
- name: embedding_stage
operators:
- name: image_clip_embedding_refiner
params:
model_name: "ViT-L-14"
pretrained: "openai"
use_fp16: true
- name: image_siglip_embedding_refiner
params:
model_name: "google/siglip2-so400m-patch14-384"
use_fp16: true
worker:
min_replicas: 1
max_replicas: 2
resources:
gpu: 1
# Stage 3: Quality scoring
- name: scoring_stage
operators:
- name: image_aesthetic_quality_refiner
- name: image_aigc_detector_refiner
params:
threshold: 0.5
worker:
min_replicas: 2
max_replicas: 4
resources:
cpu: 1
data_writer:
type: ParquetDataWriter
params:
output_path: "./output/z_image"
executor:
max_samples: 100000
batch_size: 256
dedup_num_buckets: 16
metrics:
enabled: true
generate_report: true============================================================
Pipeline: CommonCrawl text extraction (1M records)
Hardware: 8 CPU cores
============================================================
stage_0:
[Stage Summary]
Input: 1,000,000 β Output: 945,866 (94.6% pass)
Total time: 49.11s
Throughput: 20,362 records/sec
URLFilter: 20,362 rec/sec (98.1% pass) # RefinedWeb Β§G.1
TextLengthFilter: 1,976,454 rec/sec (96.4% pass) # Near instant
============================================================
Projections:
10M records β ~8 minutes
100M records β ~1.4 hours
1B records β ~14 hours
Benchmark on Mac M1 Pro (MPS):
============================================================
Pipeline: Image quality + embedding (1K records)
============================================================
stage_0 (CPU, Rust-accelerated):
[Stage Summary]
Input: 1,000 β Output: 898 (89.8% pass)
Total time: 0.61s
Throughput: 1,630 records/sec
ImageMetadataRefiner: 27,000 rec/sec
ImageTechnicalQualityRefiner: 2,500 rec/sec π¦ Rust
ImageQualityFilter: 4,200,000 rec/sec
ImagePhashDeduplicator: 1,500 rec/sec π¦ Rust
stage_1 (GPU):
[Stage Summary]
Input: 898 β Output: 898
Total time: 6.80s
Throughput: 132 records/sec
ImageClipEmbeddingRefiner: 132 rec/sec π₯οΈ GPU
============================================================
mega-data-factory/
βββ mega_data_factory/
β βββ cli.py # CLI entry point (mdf command)
β βββ framework/
β β βββ executor.py # Pipeline orchestration
β β βββ worker.py # RayWorker actors
β β βββ loader_worker.py # DataLoaderWorker actors
β β βββ backend.py # DedupBackend (distributed state)
β β βββ operator.py # Operator, Refiner, Filter, Deduplicator
β β βββ config.py # YAML config parsing
β β βββ registry.py # Component registries
β β βββ metrics/ # Metrics collection & reporting
β βββ loaders/
β β βββ huggingface_loader.py # HuggingFace datasets
β β βββ commoncrawl_loader.py # CommonCrawl WARC files
β βββ operators/
β β βββ refiners/ # Image refiners (metadata, quality, embeddings)
β β βββ filters/ # Text + Image filters
β β βββ dedup/ # Deduplicators (phash, minhash)
β βββ writers/
β β βββ parquet_writer.py # Parquet output
β β βββ iceberg_writer.py # Apache Iceberg output
β βββ models/ # Model trainers (aesthetic, AIGC, k-means)
βββ src/lib.rs # π¦ Rust operators (quality, phash, HTML extraction)
βββ configs/ # Pipeline configurations
β βββ z_image.yaml # Image pipeline
β βββ example_commoncrawl.yaml # Text pipeline
βββ tests/ # Unit tests
βββ Cargo.toml # Rust dependencies
βββ pyproject.toml # Python config (maturin build)
from mega_data_factory.framework import Filter, OperatorRegistry
class MyTextFilter(Filter):
def __init__(self, min_words: int = 50):
super().__init__()
self.min_words = min_words
def should_keep_batch(self, records: list[dict]) -> list[bool]:
return [len(r.get("text", "").split()) >= self.min_words for r in records]
OperatorRegistry.register("MyTextFilter", MyTextFilter)from mega_data_factory.framework import Refiner, OperatorRegistry
import pyarrow as pa
class MyImageRefiner(Refiner):
def refine_batch(self, records: list[dict]) -> None:
for record in records:
record["my_score"] = compute_score(record["image"])
def get_output_schema(self) -> dict[str, pa.DataType]:
return {"my_score": pa.float32()}
OperatorRegistry.register("MyImageRefiner", MyImageRefiner)- Pipeline Parallelism: Ray ObjectRef chaining enables concurrent stage execution without blocking (details)
- Distributed Data Loading: Sharded file loading with checkpoint support for fault recovery
- Backpressure Control: Bounded in-flight batches prevent OOM on large datasets
- Bucketed Deduplication: Distributed state sharding scales to 100B+ keys (details)
- Rust Acceleration: 10-25x speedup for image quality, hashing, and HTML extraction
- GPU Optimization: CLIP/SigLIP embedding extraction with FP16 and batch inference
- Elastic Scaling: Dynamic worker allocation with min/max replicas per stage
- Config-Driven: YAML configs define entire pipelines with no code changes
- RefinedWeb (arXiv:2306.01116) - URL filtering, trafilatura, MassiveText dedup
- FineWeb - 15T token dataset, quality filtering
- DCLM (arXiv:2406.11794) - Data curation for language models
- Dolma (arXiv:2402.00159) - Open corpus for LLM pretraining
- Z-Image (arXiv:2511.22699) - Image generation foundation model data
- DataComp (arXiv:2304.14108) - CLIP filtering benchmark
- LAION-5B (arXiv:2210.08402) - Large-scale image-text dataset
- OpenCLIP - CLIP implementation
- SigLIP2 - Vision encoder
- dom_smoothie - Rust readability.js port
MIT License
@software{mega_data_factory,
author = {Duo An},
title = {Mega Data Factory},
year = {2025},
publisher = {GitHub},
url = {https://github.com/duoan/mega-data-factory}
}

