AI Pipeline Orchestration Library for Elixir
A robust, production-ready library for chaining AI providers (Claude and Gemini) with advanced features like fault tolerance, session management, and self-improving Genesis pipelines.
π― Library Readiness: 8.5/10 - Ready for immediate use as a Git dependency with comprehensive testing, clean API, and flexible configuration.
π Remaining 1.5/10 for Full Production Readiness
Missing Features (1.5/10):
- Hex Package Publication (0.5/10) - Currently Git-only, needs
mix hex.publish
workflow - Enhanced Documentation (0.3/10) - ExDoc polish, API reference examples, getting started guide
- Backward Compatibility (0.2/10) - Semantic versioning strategy, deprecation warnings, migration guides
- Performance Benchmarks (0.2/10) - Baseline metrics, memory profiling, concurrency benchmarks
- Production Hardening (0.3/10) - Rate limiting, circuit breakers, structured logging with correlation IDs
Current Status: 8.5/10 = Excellent for Git dependency | 10/10 = Enterprise-ready Hex package
NEW: Our flagship feature - a pipeline that generates pipelines! The Genesis Pipeline is an AI system that creates other AI pipelines, enabling true self-improvement and evolution.
# Generate a new AI pipeline with one command
mix pipeline.generate.live "Create a sentiment analysis pipeline"
# The system will create a complete, executable pipeline in evolved_pipelines/
# Run your generated pipeline immediately:
mix pipeline.run evolved_pipelines/sentiment_analyzer_*.yaml
What just happened? The Genesis Pipeline used Claude to analyze your request, design the optimal pipeline structure, and generate a complete YAML configuration that's immediately ready to execute.
Use pipeline_ex as a dependency in your Elixir applications:
# mix.exs
defp deps do
[
# From Hex.pm (recommended)
{:pipeline_ex, "~> 0.0.1"}
# Or from GitHub
# {:pipeline_ex, git: "https://github.com/nshkrdotcom/pipeline_ex.git", tag: "v0.0.1"}
]
end
# Load and execute a pipeline
{:ok, config} = Pipeline.load_workflow("my_analysis.yaml")
{:ok, results} = Pipeline.execute(config)
# Execute with custom configuration
{:ok, results} = Pipeline.execute(config,
workspace_dir: "/app/ai_workspace",
output_dir: "/app/pipeline_outputs"
)
# Convenience function
{:ok, results} = Pipeline.run("my_analysis.yaml", debug: true)
# Health check
case Pipeline.health_check() do
:ok -> IO.puts("Pipeline system ready")
{:error, issues} -> IO.puts("Issues: #{inspect(issues)}")
end
The library supports flexible configuration through multiple sources:
-
Function options (highest priority):
Pipeline.execute(config, workspace_dir: "/custom/workspace")
-
Environment variables:
export PIPELINE_WORKSPACE_DIR="/app/workspace" export PIPELINE_OUTPUT_DIR="/app/outputs" export PIPELINE_CHECKPOINT_DIR="/app/checkpoints"
-
YAML configuration and defaults
# Phoenix controller
defmodule MyAppWeb.AIController do
def analyze(conn, %{"code" => code}) do
case Pipeline.run("pipelines/code_analysis.yaml",
workspace_dir: "/tmp/ai_workspace") do
{:ok, %{"analysis" => result}} ->
json(conn, %{analysis: result})
{:error, reason} ->
put_status(conn, 500) |> json(%{error: reason})
end
end
end
# Background job with Oban
defmodule MyApp.AnalysisWorker do
use Oban.Worker, queue: :ai_analysis
def perform(%Oban.Job{args: %{"project_id" => project_id}}) do
case Pipeline.execute(get_analysis_config(),
workspace_dir: "/tmp/analysis_#{project_id}",
output_dir: "/app/results/#{project_id}") do
{:ok, results} ->
MyApp.Projects.update_analysis(project, results)
:ok
{:error, reason} ->
{:error, reason}
end
end
end
# Enable mock mode for development/testing
Application.put_env(:pipeline, :test_mode, true)
# All AI calls will be mocked
{:ok, results} = Pipeline.execute(config)
π Complete Library Guide: See the documentation sections below for detailed usage instructions, configuration options, and integration patterns.
π Advanced Features: See ADVANCED_FEATURES.md for comprehensive documentation on loops, complex conditions, file operations, data transformation, codebase intelligence, and state management.
π YAML Format v2 Documentation: See docs/20250704_yaml_format_v2/index.md for complete reference documentation on the Pipeline YAML format, including all step types, prompt systems, control flow, and best practices.
- ποΈ Elixir Library: Use as a dependency in any Elixir application
- π§ Clean API: Simple
Pipeline.execute/2
andPipeline.load_workflow/1
functions - βοΈ Configurable: All paths and settings customizable via options/environment variables
- π§ͺ Mock Mode: Complete testing support without API costs
- π₯ Health Checks: Built-in system validation and monitoring
- π€ Multi-AI Integration: Chain Claude and Gemini APIs together
- π Flexible Execution Modes: Mock, Live, and Mixed modes for testing
- π YAML Workflow Configuration: Define complex multi-step workflows
- π― Structured Output: JSON-based responses with proper error handling
- π§ InstructorLite Integration: Structured generation with Gemini
- π Result Management: Organized output storage and display
- Enhanced Claude Steps: Smart presets, sessions, extraction, batch processing, robust error handling
- Genesis Pipeline: Self-improving AI system that generates other pipelines
- Session Management: Persistent conversations with automatic checkpointing
- Fault Tolerance: Retry mechanisms, circuit breakers, graceful degradation
- Loop Constructs: For/while loops with parallel execution and nested support
- Complex Conditions: Boolean logic, comparisons, mathematical expressions
- File Operations: Copy, move, validate, convert with format transformations
- Data Transformation: Filter, aggregate, join with schema validation
- Codebase Intelligence: Project discovery, code analysis, dependency mapping
- State Management: Variables, interpolation, checkpoints with persistence
- Async Streaming: Real-time response streaming for Claude steps with multiple handlers
π See ADVANCED_FEATURES.md for detailed documentation and examples of all advanced capabilities.
# mix.exs
defp deps do
[
# From Hex.pm (recommended)
{:pipeline_ex, "~> 0.0.1"}
# Or from GitHub
# {:pipeline_ex, git: "https://github.com/nshkrdotcom/pipeline_ex.git", tag: "v0.0.1"}
]
end
git clone <repository>
cd pipeline_ex
mix deps.get
π Start with one of these commands:
# Mock mode (safe, free, fast)
mix showcase # Complete demo with mocks
# Live mode (real API calls, costs money)
mix showcase --live # Complete demo with live APIs
All examples and tests can run in two modes:
Mode | Command Format | API Calls | Costs | Authentication Required |
---|---|---|---|---|
Mock | mix showcase |
None (mocked) | $0.00 | No |
Live | mix showcase --live |
Real API calls | Real costs | Yes (claude login ) |
mix test # All tests with mocked APIs (fast, free)
# Setup authentication first:
export GEMINI_API_KEY="your_api_key"
claude login
# Run tests with real APIs:
mix pipeline.test.live # Only integration tests with live APIs (costs money)
The pipeline supports three execution modes controlled by the TEST_MODE
environment variable:
Mode | Description | Use Case |
---|---|---|
mock |
Uses fake responses | Development, unit testing, CI/CD |
live |
Uses real API calls | Production, integration testing |
mixed |
Mocks for unit tests, live for integration | Hybrid testing approach |
# Mock mode - fast, no API costs
mix pipeline.run examples/comprehensive_config_example.yaml
# Live mode - real AI responses
mix pipeline.run.live examples/comprehensive_config_example.yaml
# Mixed mode - context-dependent
TEST_MODE=mixed mix test
Claude uses the authenticated CLI. Run once:
claude login
Set your API key:
export GEMINI_API_KEY="your_gemini_api_key_here"
Or in your application config:
config :pipeline, gemini_api_key: "your_api_key"
Create YAML workflow files like test_simple_workflow.yaml
:
workflow:
name: "simple_test_workflow"
description: "Test basic claude functionality"
steps:
- name: "analyze_code"
type: "claude"
prompt:
- type: "static"
content: |
Analyze this simple Python function and provide feedback:
def add(a, b):
return a + b
Please provide your analysis in JSON format.
- name: "plan_improvements"
type: "gemini"
prompt:
- type: "static"
content: |
Based on the previous analysis, create a plan to improve the function.
Consider error handling, type hints, and documentation.
defmodule MyApp.AIProcessor do
@doc "Analyze code using the pipeline library"
def analyze_code(code_content) do
# Load your pipeline configuration
case Pipeline.load_workflow("pipelines/code_analysis.yaml") do
{:ok, config} ->
# Execute with custom workspace
Pipeline.execute(config,
workspace_dir: "/tmp/ai_workspace",
debug: true
)
{:error, reason} ->
{:error, "Failed to load workflow: #{reason}"}
end
end
@doc "Health check for the AI system"
def system_ready? do
case Pipeline.health_check() do
:ok -> true
{:error, _issues} -> false
end
end
end
# Usage in your application
{:ok, analysis} = MyApp.AIProcessor.analyze_code(user_code)
IO.inspect(analysis["analysis_step"])
#!/usr/bin/env elixir
Mix.install([
{:pipeline_ex, git: "https://github.com/nshkrdotcom/pipeline_ex.git"}
])
# Load and execute pipeline
case Pipeline.run("test_simple_workflow.yaml", output_dir: "outputs") do
{:ok, results} ->
IO.puts("β
Pipeline completed!")
IO.inspect(results)
{:error, reason} ->
IO.puts("β Pipeline failed: #{reason}")
end
# Test with mock responses (fast)
TEST_MODE=mock elixir run_example.exs
# Test with real Claude + mock Gemini
# (useful when you have Claude access but no Gemini API key)
TEST_MODE=mixed elixir -e "
System.put_env(\"FORCE_MOCK_GEMINI\", \"true\")
Code.compile_file(\"run_example.exs\")
"
# Full live test (requires both API keys)
elixir run_example.exs
The project includes a comprehensive configuration example that demonstrates all available features with minimal steps. This example showcases every configuration option, step type, and feature available in the pipeline system.
# Run the comprehensive example with mocked AI responses
mix pipeline.run examples/comprehensive_config_example.yaml
# Run with debug output to see detailed execution
PIPELINE_DEBUG=true mix pipeline.run examples/comprehensive_config_example.yaml
To run the comprehensive example with actual AI providers:
# Set your Gemini API key (get from https://aistudio.google.com/)
export GEMINI_API_KEY="your_gemini_api_key_here"
# Authenticate Claude CLI (pre-authenticated, no API key needed)
claude auth
# Run with real AI providers
mix pipeline.run.live examples/comprehensive_config_example.yaml
# Run with full debug logging
PIPELINE_DEBUG=true PIPELINE_LOG_LEVEL=debug mix pipeline.run.live examples/comprehensive_config_example.yaml
The examples/comprehensive_config_example.yaml
shows:
- β
Basic step types:
gemini
,claude
,parallel_claude
,gemini_instructor
- β
Enhanced Claude steps:
claude_smart
,claude_session
,claude_extract
,claude_batch
,claude_robust
- β Function calling: Gemini with structured function definitions
- β All Claude tools: Write, Edit, Read, Bash, Search, Glob, Grep
- β Parallel execution: Multiple Claude instances running simultaneously
- β Conditional steps: Steps that run based on previous results
- β All prompt types: Static content, file content, previous responses
- β Workspace management: Sandboxed file operations
- β Token budgets: Fine-tuned AI response configurations
- β Model selection: Different AI models for different tasks
- β Output management: Structured result saving and organization
The pipeline includes five advanced Claude step types that extend the basic claude
step with specialized capabilities:
Intelligent preset-based configuration with environment awareness.
- Presets:
development
,production
,analysis
,chat
,test
- Auto-optimization: Preset-specific tool restrictions and performance tuning
- Environment detection: Automatic preset selection based on Mix environment
- name: "smart_analysis"
type: "claude_smart"
preset: "analysis" # Uses analysis-optimized settings
prompt: [...]
Persistent conversation management with session state.
- Session persistence: Continue conversations across multiple steps
- Automatic checkpointing: Save session state for recovery
- Turn management: Configurable conversation length limits
- name: "session_start"
type: "claude_session"
session_name: "math_tutor"
session_config:
persist: true
max_turns: 50
Advanced content extraction and post-processing.
- Output formats:
json
,markdown
,structured
,summary
- Post-processing: Extract code blocks, recommendations, links, key points
- Content filtering: Apply extraction rules and transformations
- name: "extract_json"
type: "claude_extract"
preset: "analysis"
extraction_config:
format: "json"
post_processing: ["extract_code_blocks", "extract_recommendations"]
include_metadata: true
Parallel task execution with load balancing.
- Concurrent processing: Run multiple Claude queries simultaneously
- Task management: Queue and execute independent tasks
- Performance scaling: Configurable parallelism limits
- name: "batch_analysis"
type: "claude_batch"
batch_config:
max_parallel: 3
tasks:
- id: "task1"
prompt: "Analyze JavaScript code..."
- id: "task2"
prompt: "Analyze Python code..."
Enterprise-grade error recovery and fault tolerance.
- Retry mechanisms: Configurable backoff strategies
- Fallback actions: Graceful degradation options
- Circuit breaker: Prevent cascade failures
- name: "robust_analysis"
type: "claude_robust"
retry_config:
max_retries: 3
backoff_strategy: "exponential"
fallback_action: "simplified_prompt"
Each enhanced step type has dedicated example files for testing:
# Test individual enhanced step types
mix pipeline.run.live examples/claude_smart_example.yaml
mix pipeline.run.live examples/claude_session_example.yaml
mix pipeline.run.live examples/claude_extract_example.yaml
mix pipeline.run.live examples/claude_batch_example.yaml
mix pipeline.run.live examples/claude_robust_example.yaml
# Or run all enhanced examples in mock mode (free)
mix pipeline.run examples/claude_smart_example.yaml
mix pipeline.run examples/claude_session_example.yaml
# ... etc
The pipeline now supports real-time message streaming for all Claude-based steps, displaying complete messages as they arrive from ClaudeCodeSDK for better user experience.
- Real-time feedback: See Claude's complete messages as they arrive (message-by-message)
- Lower memory usage: Process messages without buffering entire responses
- Better UX: Progressive display of assistant responses, tool uses, and results
- Visibility: Watch Claude's tool usage and thinking process in real-time
- name: "streaming_claude"
type: "claude"
claude_options:
# Enable async streaming
async_streaming: true
stream_handler: "console" # Real-time console output
stream_buffer_size: 100 # Message buffer size
# Standard options
max_turns: 10
allowed_tools: ["Write", "Read", "Edit"]
-
Console Handler (
console
) - Real-time terminal outputstream_handler: "console" stream_console_config: show_timestamps: true color_output: true show_progress: true
-
File Handler (
file
) - Stream to file with rotationstream_handler: "file" stream_file_path: "./outputs/stream.jsonl" stream_file_rotation: enabled: true max_size_mb: 10 max_files: 5
-
Buffer Handler (
buffer
) - Collect in memory with statsstream_handler: "buffer" stream_buffer_config: max_size: 1000 circular: true deduplication: true
-
Callback Handler (
callback
) - Custom processingstream_handler: "callback" stream_callback_config: filter_types: ["text", "tool_use"] rate_limit: 10
# Simple streaming example
mix pipeline.run examples/clean_streaming_numbers.yaml
# Multi-message streaming with file operations
mix pipeline.run examples/streaming_file_operations.yaml
# Run streaming tests
mix test test/integration/async_streaming_test.exs
π Complete Streaming Guide: See examples/STREAMING_GUIDE.md for implementation details and docs/ASYNC_STREAMING_MIGRATION_GUIDE.md for adding streaming to existing pipelines.
Async streaming is supported by all Claude-based steps:
claude
- Basic Claude stepclaude_smart
- With presetsclaude_session
- With session continuityclaude_extract
- With extractionclaude_batch
- Parallel streamingclaude_robust
- With error recoveryparallel_claude
- Multiple streams
For advanced configuration, you can set these environment variables:
# API Configuration
export GEMINI_API_KEY="your_gemini_api_key"
# Note: Claude uses CLI authentication (claude auth), no API key needed
# Pipeline Directories
export PIPELINE_WORKSPACE_DIR="./workspace" # Claude's sandbox
export PIPELINE_OUTPUT_DIR="./outputs" # Result storage
export PIPELINE_CHECKPOINT_DIR="./checkpoints" # State management
# Logging and Debug
export PIPELINE_LOG_LEVEL="debug" # debug, info, warn, error
export PIPELINE_DEBUG="true" # Detailed execution logs
# Execution Mode
export TEST_MODE="live" # live, mock, mixed
- Start with the example: Copy
examples/comprehensive_config_example.yaml
- Read the guides:
- Pipeline Configuration Guide for basic configuration
- Prompt System Guide for advanced prompt management, templates, and reusable components
- Recursive Pipelines Guide for pipeline composition and modular workflows
- ADVANCED_FEATURES.md for loops, conditions, file operations, and more
- TESTING_ARCHITECTURE.md for comprehensive testing approaches
- Test in mock mode: Validate your workflow logic without API costs
- Run live: Execute with real AI providers when ready
The run_example.exs
script demonstrates the pipeline:
# Quick test with mocks
TEST_MODE=mock elixir run_example.exs
# Full test with APIs
elixir run_example.exs
test/
βββ unit/ # Fast unit tests (mocked)
βββ integration/ # Integration tests (live APIs)
βββ fixtures/ # Test data and workflows
βββ support/ # Test helpers
defmodule MyTest do
use ExUnit.Case
use Pipeline.TestCase # Provides test mode helpers
test "my feature works in mock mode" do
# Test automatically uses mocks
assert Pipeline.execute_something() == expected_result
end
end
Enable debug output:
# See detailed execution logs
DEBUG=true elixir run_example.exs
# See API request/response details
VERBOSE=true elixir run_example.exs
Debug output includes:
- Step execution flow
- API request/response details
- Provider selection (mock vs live)
- Result processing
# Error: Claude CLI not found or not authenticated
claude login
# Error: GEMINI_API_KEY environment variable not set
export GEMINI_API_KEY="your_key_here"
# Check which providers are mocked vs live
TEST_MODE=mock elixir run_example.exs # All mocked
TEST_MODE=live elixir run_example.exs # All live
TODO: Add license information