Create ML Pipeline with Metaflow for Model Training and Deployment
Background
Currently, the model training process involves manually running several scripts in sequence:
Generate dataset (training_set.py)
Train model (train.py)
Evaluate model (eval_model.py)
Quantize model (quantitize.py)
Sign model (Issue Add Model Signing to ML Pipeline #30 )
Upload to Hugging Face (Issue Share Model and Dataset via Hugging Face #33 )
Rebuild DMG (manual)
This manual workflow is error-prone, not reproducible, and difficult to scale . An automated ML pipeline using Metaflow will provide:
✅ Reproducibility - Track every experiment with versioned data, code, and parameters
✅ Automation - Run the entire pipeline with a single command
✅ Scalability - Easy to run on cloud infrastructure (AWS, GCP)
✅ Visibility - Track metrics, artifacts, and lineage
✅ Orchestration - Handle dependencies and failure recovery
✅ Integration - Connect training → evaluation → deployment seamlessly
Why Metaflow?
Metaflow is an open-source ML infrastructure framework developed by Netflix that:
Uses Python decorators for simple pipeline definition
Provides automatic versioning of data and experiments
Supports local and cloud execution (AWS Batch, Kubernetes)
Integrates with W&B, and other ML tools
Has minimal boilerplate compared to Airflow/Kubeflow
Implementation Plan
Phase 1: Install Metaflow and Dependencies
File : pyproject.toml
Add Metaflow dependencies:
[project .optional-dependencies ]
# Add to existing sections
mlops = [
" metaflow>=2.12.0" ,
# Optional: experiment tracking
]
Installation:
pip install -e " .[mlops]"
# Configure Metaflow (local first)
metaflow configure
Phase 2: Create ML Pipeline Flow
File : model/flows/training_pipeline.py (new)
Create a comprehensive Metaflow pipeline:
"""
Metaflow pipeline for PII detection model training and deployment.
This pipeline orchestrates the complete ML workflow:
1. Dataset generation/loading
2. Data preprocessing and validation
3. Model training
4. Model evaluation
5. Model quantization (ONNX)
6. Model signing (cryptographic hash)
7. Model upload (Hugging Face Hub)
8. (Optional) Trigger DMG rebuild
Usage:
# Run entire pipeline locally
python model/flows/training_pipeline.py run
# Run specific step
python model/flows/training_pipeline.py run --step train
# Run with parameters
python model/flows/training_pipeline.py run \
--num-samples 1000 \
--num-epochs 3 \
--batch-size 16
# View results
python model/flows/training_pipeline.py show latest
"""
from metaflow import FlowSpec , step , Parameter , card , conda_base , resources
import json
from pathlib import Path
from datetime import datetime
@conda_base (
python = "3.11" ,
libraries = {
"torch" : ">=1.9.0" ,
"transformers" : ">=4.20.0" ,
"datasets" : ">=2.0.0" ,
"optimum" : ">=1.15.0" ,
"scikit-learn" : ">=1.0.0" ,
}
)
class PIITrainingPipeline (FlowSpec ):
"""
End-to-end ML pipeline for PII detection model training.
This flow coordinates:
- Dataset generation and validation
- Model training with multi-task learning
- Model evaluation and metrics tracking
- Model quantization for production
- Model signing for security
- Model upload to Hugging Face Hub
- Optional: Trigger DMG rebuild
"""
# Pipeline parameters (can be overridden from CLI)
num_samples = Parameter (
'num-samples' ,
help = 'Number of training samples to generate' ,
default = 10000 ,
type = int
)
num_epochs = Parameter (
'num-epochs' ,
help = 'Number of training epochs' ,
default = 5 ,
type = int
)
batch_size = Parameter (
'batch-size' ,
help = 'Training batch size' ,
default = 16 ,
type = int
)
learning_rate = Parameter (
'learning-rate' ,
help = 'Learning rate for training' ,
default = 3e-5 ,
type = float
)
model_name = Parameter (
'model-name' ,
help = 'Base model to fine-tune' ,
default = 'distilbert-base-cased' ,
type = str
)
skip_dataset_generation = Parameter (
'skip-dataset-gen' ,
help = 'Skip dataset generation (use existing data)' ,
default = False ,
type = bool
)
skip_quantization = Parameter (
'skip-quantization' ,
help = 'Skip ONNX quantization step' ,
default = False ,
type = bool
)
skip_signing = Parameter (
'skip-signing' ,
help = 'Skip model signing step' ,
default = False ,
type = bool
)
skip_hf_upload = Parameter (
'skip-hf-upload' ,
help = 'Skip Hugging Face upload' ,
default = True , # Default to skip for testing
type = bool
)
trigger_dmg_build = Parameter (
'trigger-dmg' ,
help = 'Trigger DMG build via GitHub Actions' ,
default = False ,
type = bool
)
@step
def start (self ):
"""
Initialize the pipeline and set up configuration.
"""
from model .src .config import TrainingConfig
print ("=" * 80 )
print ("🚀 Starting PII Detection Model Training Pipeline" )
print ("=" * 80 )
# Create training configuration
self .config = TrainingConfig (
model_name = self .model_name ,
num_epochs = self .num_epochs ,
batch_size = self .batch_size ,
learning_rate = self .learning_rate ,
max_samples = self .num_samples ,
output_dir = "model/trained" ,
)
# Store metadata
self .pipeline_start_time = datetime .utcnow ().isoformat ()
self .pipeline_version = "1.0.0"
print (f"📋 Configuration:" )
print (f" Model: { self .config .model_name } " )
print (f" Epochs: { self .config .num_epochs } " )
print (f" Batch Size: { self .config .batch_size } " )
print (f" Learning Rate: { self .config .learning_rate } " )
print (f" Max Samples: { self .config .max_samples } " )
self .next (self .generate_dataset )
@resources (memory = 8000 , cpu = 4 )
@step
def generate_dataset (self ):
"""
Generate synthetic PII dataset using LLM or load existing data.
"""
if self .skip_dataset_generation :
print ("⏭️ Skipping dataset generation (using existing data)" )
self .dataset_path = "model/dataset/reviewed_samples"
self .dataset_stats = {"note" : "Using existing dataset" }
self .next (self .preprocess_data )
return
print ("\n " + "=" * 80 )
print ("📊 Step 1: Dataset Generation" )
print ("=" * 80 )
from model .dataset .training_set import TrainingSetGenerator , TrainingSetConfig
# Configure dataset generation
dataset_config = TrainingSetConfig (
num_samples = self .num_samples ,
output_dir = "model/dataset" ,
)
generator = TrainingSetGenerator (dataset_config )
# Generate samples (this will create samples in reviewed_samples/)
print (f"Generating { self .num_samples } samples..." )
# Note: Actual generation happens via training_set.py's main()
# For now, point to existing data
self .dataset_path = "model/dataset/reviewed_samples"
self .dataset_stats = {
"num_samples" : self .num_samples ,
"output_dir" : self .dataset_path ,
}
print (f"✓ Dataset ready at: { self .dataset_path } " )
self .next (self .preprocess_data )
@resources (memory = 8000 , cpu = 4 )
@step
def preprocess_data (self ):
"""
Preprocess data: tokenization, label mapping, train/val split.
"""
print ("\n " + "=" * 80 )
print ("🔧 Step 2: Data Preprocessing" )
print ("=" * 80 )
from model .src .preprocessing import DatasetProcessor
from transformers import AutoTokenizer
# Load tokenizer
tokenizer = AutoTokenizer .from_pretrained (self .config .model_name )
# Initialize processor
processor = DatasetProcessor (self .config , tokenizer )
# Load and process dataset
print (f"Loading data from { self .dataset_path } ..." )
train_dataset , val_dataset = processor .prepare_datasets (
data_dir = self .dataset_path
)
self .train_size = len (train_dataset )
self .val_size = len (val_dataset )
# Store datasets as artifacts
self .train_dataset = train_dataset
self .val_dataset = val_dataset
print (f"✓ Training samples: { self .train_size } " )
print (f"✓ Validation samples: { self .val_size } " )
self .next (self .train_model )
@resources (memory = 16000 , cpu = 8 , gpu = 1 ) # Request GPU if available
@step
def train_model (self ):
"""
Train the multi-task PII detection model.
"""
print ("\n " + "=" * 80 )
print ("🏋️ Step 3: Model Training" )
print ("=" * 80 )
from model .src .trainer import PIITrainer
# Initialize trainer
trainer_instance = PIITrainer (self .config )
trainer_instance .initialize_model ()
# Train model
print ("Starting training..." )
trainer = trainer_instance .train (self .train_dataset , self .val_dataset )
# Extract metrics
self .training_metrics = {
"train_loss" : trainer .state .log_history [- 1 ].get ("train_loss" ),
"eval_loss" : trainer .state .log_history [- 1 ].get ("eval_loss" ),
"eval_pii_f1" : trainer .state .log_history [- 1 ].get ("eval_pii_f1" ),
"eval_coref_f1" : trainer .state .log_history [- 1 ].get ("eval_coref_f1" ),
}
self .model_path = self .config .output_dir
print (f"\n ✓ Training complete!" )
print (f"✓ Model saved to: { self .model_path } " )
print (f"✓ Metrics: { self .training_metrics } " )
self .next (self .evaluate_model )
@resources (memory = 8000 , cpu = 4 )
@step
def evaluate_model (self ):
"""
Evaluate the trained model on test set.
"""
print ("\n " + "=" * 80 )
print ("📊 Step 4: Model Evaluation" )
print ("=" * 80 )
from model .src .eval_model import PIIModelLoader
# Load model
loader = PIIModelLoader (self .model_path )
loader .load_model ()
# Run evaluation on test cases
test_cases = [
"My name is John Smith and my email is john@example.com" ,
"Call me at 555-123-4567 or email sarah.miller@company.com" ,
"SSN: 123-45-6789, DOB: 01/15/1990" ,
]
evaluation_results = []
for text in test_cases :
entities , coref_clusters , inference_time = loader .predict (text )
evaluation_results .append ({
"text" : text ,
"num_entities" : len (entities ),
"num_clusters" : len (coref_clusters ),
"inference_time_ms" : inference_time ,
})
self .evaluation_results = evaluation_results
# Calculate average inference time
avg_inference_time = sum (r ["inference_time_ms" ] for r in evaluation_results ) / len (evaluation_results )
print (f"\n ✓ Evaluation complete!" )
print (f"✓ Average inference time: { avg_inference_time :.2f} ms" )
print (f"✓ Test cases processed: { len (test_cases )} " )
self .next (self .quantize_model )
@resources (memory = 16000 , cpu = 8 )
@step
def quantize_model (self ):
"""
Quantize model to ONNX format for production deployment.
"""
if self .skip_quantization :
print ("\n ⏭️ Skipping model quantization" )
self .quantized_model_path = None
self .next (self .sign_model )
return
print ("\n " + "=" * 80 )
print ("⚙️ Step 5: Model Quantization" )
print ("=" * 80 )
import subprocess
# Run quantization script
cmd = [
"python" , "-m" , "model.src.quantitize" ,
"--model_path" , self .model_path ,
"--output_path" , "model/quantized" ,
]
result = subprocess .run (cmd , capture_output = True , text = True )
if result .returncode == 0 :
self .quantized_model_path = "model/quantized"
print (f"✓ Model quantized successfully!" )
print (f"✓ Quantized model: { self .quantized_model_path } " )
else :
print (f"❌ Quantization failed: { result .stderr } " )
self .quantized_model_path = None
self .next (self .sign_model )
@step
def sign_model (self ):
"""
Sign model with cryptographic hash for integrity verification.
Depends on Issue #30.
"""
if self .skip_signing :
print ("\n ⏭️ Skipping model signing" )
self .model_signature = None
self .next (self .upload_model )
return
print ("\n " + "=" * 80 )
print ("🔐 Step 6: Model Signing" )
print ("=" * 80 )
try :
from model .src .sign_model import sign_trained_model
# Sign the model
model_hash = sign_trained_model (self .model_path )
self .model_signature = {
"sha256" : model_hash ,
"signed_at" : datetime .utcnow ().isoformat (),
}
print (f"✓ Model signed successfully!" )
print (f"✓ SHA-256: { model_hash } " )
except ImportError :
print ("⚠️ Model signing not available (install model-signing package)" )
self .model_signature = None
except Exception as e :
print (f"⚠️ Model signing failed: { e } " )
self .model_signature = None
self .next (self .upload_model )
@step
def upload_model (self ):
"""
Upload model to Hugging Face Hub.
Depends on Issue #33.
"""
if self .skip_hf_upload :
print ("\n ⏭️ Skipping Hugging Face upload" )
self .hf_model_url = None
self .next (self .trigger_build )
return
print ("\n " + "=" * 80 )
print ("☁️ Step 7: Hugging Face Upload" )
print ("=" * 80 )
try :
import subprocess
# Upload to Hugging Face (test with private repo)
cmd = [
"python" , "src/scripts/upload_model_to_hf.py" ,
"--model-path" , self .model_path ,
"--repo-id" , "yaak/pii-detector" ,
"--model-type" , "pytorch" ,
]
result = subprocess .run (cmd , capture_output = True , text = True )
if result .returncode == 0 :
self .hf_model_url = "https://huggingface.co/yaak/pii-detector"
print (f"✓ Model uploaded to Hugging Face!" )
print (f"✓ URL: { self .hf_model_url } " )
else :
print (f"⚠️ Upload failed: { result .stderr } " )
self .hf_model_url = None
except Exception as e :
print (f"⚠️ Upload failed: { e } " )
self .hf_model_url = None
self .next (self .trigger_build )
@step
def trigger_build (self ):
"""
Trigger DMG build via GitHub Actions.
Optional step for automated deployment.
"""
if not self .trigger_dmg_build :
print ("\n ⏭️ Skipping DMG build trigger" )
self .dmg_build_triggered = False
self .next (self .end )
return
print ("\n " + "=" * 80 )
print ("🔨 Step 8: Trigger DMG Build" )
print ("=" * 80 )
try :
import subprocess
import os
# Trigger GitHub Actions workflow
token = os .environ .get ("GITHUB_TOKEN" )
if not token :
print ("⚠️ GITHUB_TOKEN not set, skipping build trigger" )
self .dmg_build_triggered = False
self .next (self .end )
return
cmd = [
"gh" , "workflow" , "run" , "build-dmg.yml" ,
"--ref" , "main" ,
]
result = subprocess .run (cmd , capture_output = True , text = True )
if result .returncode == 0 :
self .dmg_build_triggered = True
print ("✓ DMG build triggered successfully!" )
else :
print (f"⚠️ Build trigger failed: { result .stderr } " )
self .dmg_build_triggered = False
except Exception as e :
print (f"⚠️ Build trigger failed: { e } " )
self .dmg_build_triggered = False
self .next (self .end )
@card
@step
def end (self ):
"""
Finalize pipeline and generate summary report.
"""
print ("\n " + "=" * 80 )
print ("✅ Pipeline Complete!" )
print ("=" * 80 )
# Calculate total pipeline time
from datetime import datetime
end_time = datetime .utcnow ()
start_time = datetime .fromisoformat (self .pipeline_start_time )
duration = (end_time - start_time ).total_seconds ()
# Generate summary
summary = {
"pipeline_version" : self .pipeline_version ,
"duration_seconds" : duration ,
"configuration" : {
"model_name" : self .model_name ,
"num_epochs" : self .num_epochs ,
"batch_size" : self .batch_size ,
"learning_rate" : self .learning_rate ,
"num_samples" : self .num_samples ,
},
"dataset" : {
"train_size" : self .train_size ,
"val_size" : self .val_size ,
"dataset_path" : self .dataset_path ,
},
"training" : {
"model_path" : self .model_path ,
"metrics" : self .training_metrics ,
},
"quantization" : {
"enabled" : not self .skip_quantization ,
"model_path" : getattr (self , 'quantized_model_path' , None ),
},
"signing" : {
"enabled" : not self .skip_signing ,
"signature" : getattr (self , 'model_signature' , None ),
},
"upload" : {
"enabled" : not self .skip_hf_upload ,
"url" : getattr (self , 'hf_model_url' , None ),
},
"dmg_build" : {
"triggered" : getattr (self , 'dmg_build_triggered' , False ),
},
}
self .pipeline_summary = summary
# Print summary
print (f"\n 📊 Pipeline Summary:" )
print (f" Duration: { duration :.0f} s ({ duration / 60 :.1f} minutes)" )
print (f" Model: { self .model_path } " )
print (f" Training F1: { self .training_metrics .get ('eval_pii_f1' , 'N/A' )} " )
print (f" Quantized: { '✓' if not self .skip_quantization else '✗' } " )
print (f" Signed: { '✓' if not self .skip_signing else '✗' } " )
print (f" Uploaded: { '✓' if not self .skip_hf_upload else '✗' } " )
# Save summary to file
summary_path = Path (self .model_path ) / "pipeline_summary.json"
with open (summary_path , 'w' ) as f :
json .dump (summary , f , indent = 2 )
print (f"\n ✓ Summary saved to: { summary_path } " )
print ("=" * 80 )
if __name__ == '__main__' :
PIITrainingPipeline ()
Phase 3: Create Pipeline Trigger Script
File : model/flows/run_pipeline.sh (new)
Create a convenience script for running the pipeline:
#! /bin/bash
set -e
echo " 🚀 Running PII Detection Model Training Pipeline"
echo " =================================================="
# Parse arguments
NUM_SAMPLES=${1:- 10000}
NUM_EPOCHS=${2:- 5}
BATCH_SIZE=${3:- 16}
# Run the Metaflow pipeline
python model/flows/training_pipeline.py run \
--num-samples " $NUM_SAMPLES " \
--num-epochs " $NUM_EPOCHS " \
--batch-size " $BATCH_SIZE "
# Show the results
echo " "
echo " 📊 Viewing pipeline results..."
python model/flows/training_pipeline.py show latest
echo " "
echo " ✅ Pipeline execution complete!"
Phase 4: Add CI/CD Integration
File : .github/workflows/train-model.yml (new)
Create GitHub Actions workflow to trigger training:
name : Train Model
on :
workflow_dispatch :
inputs :
num_samples :
description : ' Number of training samples'
required : false
default : ' 10000'
type : string
num_epochs :
description : ' Number of epochs'
required : false
default : ' 5'
type : string
trigger_dmg_build :
description : ' Trigger DMG build after training'
required : false
default : false
type : boolean
jobs :
train-model :
runs-on : ubuntu-latest
timeout-minutes : 180 # 3 hours max
steps :
- name : Checkout code
uses : actions/checkout@v4
- name : Set up Python
uses : actions/setup-python@v4
with :
python-version : ' 3.11'
- name : Install dependencies
run : |
pip install -e ".[mlops,training]"
- name : Configure Metaflow
run : |
metaflow configure aws # For cloud execution
- name : Run training pipeline
env :
OPENAI_API_KEY : ${{ secrets.OPENAI_API_KEY }}
HF_TOKEN : ${{ secrets.HF_TOKEN }}
GITHUB_TOKEN : ${{ secrets.GITHUB_TOKEN }}
run : |
python model/flows/training_pipeline.py run \
--num-samples ${{ github.event.inputs.num_samples }} \
--num-epochs ${{ github.event.inputs.num_epochs }} \
--trigger-dmg ${{ github.event.inputs.trigger_dmg_build }}
- name : Upload artifacts
uses : actions/upload-artifact@v3
with :
name : trained-model
path : |
model/trained/
model/quantized/
Phase 5: Update Makefile
File : Makefile
Add pipeline commands:
# ML Pipeline targets
.PHONY : pipeline-run pipeline-show pipeline-clean
pipeline-run : # # Run ML training pipeline with Metaflow
\t@echo "Running ML training pipeline..."
\tpython model/flows/training_pipeline.py run
pipeline-run-quick : # # Run pipeline with minimal samples (for testing)
\t@echo "Running quick pipeline test (100 samples, 1 epoch)..."
\tpython model/flows/training_pipeline.py run \
\t\t--num-samples 100 \
\t\t--num-epochs 1 \
\t\t--skip-quantization \
\t\t--skip-signing \
\t\t--skip-hf-upload
pipeline-show : # # Show latest pipeline run results
\tpython model/flows/training_pipeline.py show latest
pipeline-list : # # List all pipeline runs
\tpython model/flows/training_pipeline.py list
pipeline-clean : # # Clean pipeline artifacts
\t@echo "Cleaning pipeline artifacts..."
\trm -rf .metaflow/
\t@echo "✓ Pipeline artifacts cleaned"
Usage Examples
Quick Local Test
# Run minimal pipeline for testing (100 samples, 1 epoch)
make pipeline-run-quick
# View results
make pipeline-show
Full Training Run
# Run complete pipeline
python model/flows/training_pipeline.py run \
--num-samples 10000 \
--num-epochs 5 \
--batch-size 16
# Or use make
make pipeline-run
Custom Configuration
# Custom parameters
python model/flows/training_pipeline.py run \
--num-samples 20000 \
--num-epochs 10 \
--batch-size 32 \
--learning-rate 5e-5 \
--model-name " bert-base-cased"
Production Deployment
# Full pipeline with all steps
python model/flows/training_pipeline.py run \
--num-samples 50000 \
--num-epochs 5 \
--skip-hf-upload false \
--trigger-dmg true
View Pipeline History
# List all runs
make pipeline-list
# Show specific run
python model/flows/training_pipeline.py show < run-id>
# Show latest successful run
python model/flows/training_pipeline.py show latest
Pipeline Visualization
Metaflow provides automatic visualization:
# View pipeline structure
python model/flows/training_pipeline.py show
# View lineage graph
metaflow show training_pipeline
# Launch Metaflow UI (optional)
metaflow ui
Pipeline DAG:
start
↓
generate_dataset
↓
preprocess_data
↓
train_model
↓
evaluate_model
↓
quantize_model
↓
sign_model
↓
upload_model
↓
trigger_build
↓
end
Integration with Existing Issues
Issue #30 : Model Signing
The sign_model step integrates with the model signing implementation:
from model .src .sign_model import sign_trained_model
model_hash = sign_trained_model (self .model_path )
Issue #33 : Hugging Face Upload
The upload_model step triggers the HF upload script:
subprocess .run ([
"python" , "src/scripts/upload_model_to_hf.py" ,
"--model-path" , self .model_path ,
"--repo-id" , "yaak/pii-detector" ,
])
Issue #32 : Dataset Statistics
Can be added as an optional step:
@step
def generate_dataset_stats (self ):
subprocess .run ([
"python" , "src/scripts/generate_dataset_stats.py" ,
"--input" , self .dataset_path ,
"--output" , "docs/dataset_statistics" ,
])
Cloud Execution (Optional)
AWS Batch Integration
# Configure AWS
metaflow configure aws
# Run on AWS Batch
python model/flows/training_pipeline.py run --with batch
Kubernetes Integration
# Configure Kubernetes
metaflow configure kubernetes
# Run on Kubernetes
python model/flows/training_pipeline.py run --with kubernetes
Success Criteria
Benefits
For Development
Faster iteration : Run entire workflow with one command
Reproducibility : Track every experiment
Debugging : Easy to rerun individual steps
Collaboration : Share runs with team
For Production
Automation : Schedule training runs
Monitoring : Track metrics over time
Scalability : Run on cloud infrastructure
CI/CD : Integrate with deployment pipeline
For MLOps
Experiment tracking : Compare runs
Model versioning : Automatic artifact storage
Lineage tracking : Understand data/model provenance
Cost optimization : Resource management
Timeline
Week 1:
Day 1-2: Install Metaflow, understand framework
Day 3-4: Create basic pipeline structure
Day 5: Implement dataset and preprocessing steps
Week 2:
Day 1-2: Implement training and evaluation steps
Day 3: Integrate quantization, signing, upload
Day 4: Test end-to-end locally
Day 5: Documentation and CI/CD setup
References
Notes
This MLOps pipeline provides a production-ready, reproducible, and scalable training infrastructure. It integrates seamlessly with existing scripts while adding:
Automatic experiment tracking
Easy parameterization
Cloud execution capability
CI/CD integration
Clear separation of concerns
The pipeline is designed to grow with the project - start simple with local execution, then scale to cloud infrastructure as needed.
Complexity : Medium-High
Impact : High (enables MLOps best practices)
Dependencies : Issues #30 (signing), #33 (HF upload)
Create ML Pipeline with Metaflow for Model Training and Deployment
Background
Currently, the model training process involves manually running several scripts in sequence:
training_set.py)train.py)eval_model.py)quantitize.py)This manual workflow is error-prone, not reproducible, and difficult to scale. An automated ML pipeline using Metaflow will provide:
✅ Reproducibility - Track every experiment with versioned data, code, and parameters
✅ Automation - Run the entire pipeline with a single command
✅ Scalability - Easy to run on cloud infrastructure (AWS, GCP)
✅ Visibility - Track metrics, artifacts, and lineage
✅ Orchestration - Handle dependencies and failure recovery
✅ Integration - Connect training → evaluation → deployment seamlessly
Why Metaflow?
Metaflow is an open-source ML infrastructure framework developed by Netflix that:
Implementation Plan
Phase 1: Install Metaflow and Dependencies
File:
pyproject.tomlAdd Metaflow dependencies:
Installation:
Phase 2: Create ML Pipeline Flow
File:
model/flows/training_pipeline.py(new)Create a comprehensive Metaflow pipeline:
Phase 3: Create Pipeline Trigger Script
File:
model/flows/run_pipeline.sh(new)Create a convenience script for running the pipeline:
Phase 4: Add CI/CD Integration
File:
.github/workflows/train-model.yml(new)Create GitHub Actions workflow to trigger training:
Phase 5: Update Makefile
File:
MakefileAdd pipeline commands:
Usage Examples
Quick Local Test
Full Training Run
Custom Configuration
Production Deployment
View Pipeline History
Pipeline Visualization
Metaflow provides automatic visualization:
Pipeline DAG:
Integration with Existing Issues
Issue #30: Model Signing
The
sign_modelstep integrates with the model signing implementation:Issue #33: Hugging Face Upload
The
upload_modelstep triggers the HF upload script:Issue #32: Dataset Statistics
Can be added as an optional step:
Cloud Execution (Optional)
AWS Batch Integration
Kubernetes Integration
Success Criteria
training_pipeline.py)Benefits
For Development
For Production
For MLOps
Timeline
Week 1:
Week 2:
References
Notes
This MLOps pipeline provides a production-ready, reproducible, and scalable training infrastructure. It integrates seamlessly with existing scripts while adding:
The pipeline is designed to grow with the project - start simple with local execution, then scale to cloud infrastructure as needed.
Complexity: Medium-High
Impact: High (enables MLOps best practices)
Dependencies: Issues #30 (signing), #33 (HF upload)