A pure-Python concurrent job execution engine for AI infrastructure workloads.
PYREXIS focuses on the infrastructure around AI, not models:
- Scheduling — Fair priority-based job ordering with starvation prevention
- Retries — Bounded retry logic with exponential backoff
- Concurrency — Thread/process/async execution models
- Streaming — Pipeline-based incremental processing
- Durability — Persistent state with crash recovery
No ML libraries. No magic. Just reliable job orchestration.
graph LR
J[Job] --> S[Scheduler]
S --> E[Engine]
E --> EX[Executor]
EX --> P[Pipeline]
P --> R[Result]
E --> ST[StateStore]
ST -.persist.-> E
- Strongly typed with Pydantic
- State machine enforcement
- Priority-based scheduling
- Configurable execution mode (thread/process/async)
- Priority queue with aging
- Starvation prevention
- Thread-safe operations
- Job submission and state management
- Graceful shutdown coordination
- Metrics collection
- Multi-stage processing
- Generator-based streaming
- Progress hooks
- Threads for I/O-bound tasks
- Processes for CPU-bound tasks
- Async for event-driven orchestration
from core import Engine, Scheduler, ExecutorRouter
from models import Job, JobStatus
from storage import StateStore
from utils import ShutdownCoordinator
# Setup
scheduler = Scheduler()
executor = ExecutorRouter()
state_store = StateStore("./state.db")
shutdown = ShutdownCoordinator()
engine = Engine(
scheduler=scheduler,
executor=executor,
state_store=state_store,
shutdown_coordinator=shutdown
)
# Create and submit job
job = Job(
job_id="task-1",
priority=5,
payload={"data": "process_this"},
execution_mode="thread"
)
engine.submit_job(job)
# Run engine loop
engine.run_loop() # Processes jobs until shutdown# Submit a job
python main.py submit \
--job-id task-1 \
--priority 5 \
--payload '{"type": "example", "data": "process_this"}' \
--mode thread
# Check status
python main.py status --job-id task-1
# Start daemon to process jobs
python main.py daemon
# Monitor in real-time
python main.py monitor
# List recent jobs
python main.py list --limit 20
# Cancel a job
python main.py cancel --job-id task-1
# View metrics
python main.py metricsSee docs/CLI.md for complete CLI reference.
flowchart TD
A[Create Job] --> B[Submit to Engine]
B --> C{Status = CREATED?}
C -->|Yes| D[Transition to PENDING]
C -->|No| ERROR[ValueError]
D --> E[Add to Scheduler]
E --> F[Engine.run_next]
F --> G[Scheduler.next_job]
G -->|Job found| H[Transition to RUNNING]
G -->|No jobs| END[Return None]
H --> I[Executor routes by mode]
I --> J{Execution Mode}
J -->|thread| K[ThreadWorkerPool]
J -->|process| L[ProcessWorkerPool]
J -->|async| M[AsyncTaskRunner]
K --> N[Execute Pipeline]
L --> N
M --> N
N --> O{Success?}
O -->|Yes| P[Transition to COMPLETED]
O -->|No| Q{Retries left?}
Q -->|Yes| R[Transition to RETRYING]
Q -->|No| S[Transition to FAILED]
R --> H
P --> T[Save Result]
S --> T
T --> U[Persist to StateStore]
PYREXIS is designed for graceful degradation, not perfect uptime:
- ✅ Crashes are expected — Jobs retry automatically
- ✅ Retries are bounded — Max attempts prevent infinite loops
- ✅ Results are immutable — Once saved, never modified
- ✅ State is durable — Persistent storage survives restarts
# Job fails during execution
job.record_failure("Connection timeout")
# Automatic retry if attempts < max_retries
if job.attempts < job.max_retries:
job.status = RETRYING
# Exponential backoff: 2^attempts seconds
else:
job.status = FAILED- Graceful shutdown — No new jobs pulled, current jobs finish
- Thread pools stop — Workers exit after current task
- Process pools terminate — Clean resource cleanup
- State persists — All job states saved before exit
| ❌ Use Case | Why Not | Use Instead |
|---|---|---|
| Real-time systems (< 10ms latency) | Threading/scheduling overhead | Direct function calls |
| Distributed clusters | Single-machine only | Celery, Ray |
| GPU training pipelines | No CUDA awareness | Kubeflow, MLflow |
| High-throughput streaming (>100k jobs/sec) | Not optimized for scale | Kafka, Flink |
- ✅ Orchestrating AI inference pipelines (batch processing)
- ✅ Multi-stage data preprocessing workflows
- ✅ Concurrent API request handling (I/O-bound)
- ✅ CPU-intensive batch computations
- ✅ Retry-heavy external service calls
# Clone repository
git clone https://github.com/yourusername/pyrexis.git
cd pyrexis
# Install dependencies (minimal)
pip install pydantic pytest
# Or install in development mode
pip install -e .
# Run tests
pytest tests/ -vPYREXIS has 47 brutal tests that try to break the system:
- ✅ Starvation prevention — Low-priority jobs eventually run
- ✅ Race conditions — 500 concurrent job submissions
- ✅ Retry exhaustion — Exact max_retries enforcement
- ✅ State transitions — Illegal transitions rejected
- ✅ Graceful shutdown — No data loss on interrupt
Key insight: We test failure modes, not just functionality.
# Run all tests
pytest tests/ -v
# Run specific test category
pytest tests/test_scheduler.py -v
pytest tests/test_concurrency.py -v
pytest tests/test_shutdown.py -v
pytest tests/test_load.py -v
# Run with coverage
pytest tests/ --cov=. --cov-report=htmlpyrexis/
├── api/ # CLI interface for job management
├── core/ # Engine, Scheduler, Pipeline, Executor
│ ├── engine.py # Job lifecycle orchestration
│ ├── scheduler.py # Priority-based job scheduling
│ ├── pipeline.py # Multi-stage streaming execution
│ ├── executor.py # Concurrency routing
│ └── base_pipeline.py # Base class for custom pipelines
├── models/ # Job and Result data models
│ ├── job.py # Job model with state machine
│ └── result.py # Execution result model
├── storage/ # StateStore persistence
│ └── state.py # File-based state storage (shelve)
├── concurrency/ # Thread/Process/Async pools
│ ├── threads.py # ThreadWorkerPool for I/O-bound tasks
│ ├── processes.py # ProcessWorkerPool for CPU-bound tasks
│ └── async_tasks.py # AsyncTaskRunner for event-driven tasks
├── utils/ # Utilities and observability
│ ├── metrics.py # MetricsRegistry and TimedBlock
│ ├── logging.py # Logging configuration
│ ├── retry.py # Retry decorators and utilities
│ ├── shutdown.py # ShutdownCoordinator
│ ├── cache.py # LRU cache implementation
│ ├── profiling.py # Performance profiling tools
│ ├── timing.py # Timer utilities
│ └── registry.py # PluginRegistry metaclass
├── plugins/ # Custom pipeline implementations
│ └── text_inference.py # Example text inference pipeline
├── tests/ # Comprehensive test suite
└── docs/ # Architecture and design docs
See docs/ARCHITECTURE.md for detailed system design, including:
- Component interactions
- State machine diagram
- Concurrency model
- Failure recovery strategies
# Access metrics
metrics = engine.get_metrics()
# View counters
print(metrics.get_counters())
# {'job.retries': 3}
# View timings
print(metrics.get_timings())
# {'job.execution': {'count': 10, 'avg': 0.15, 'max': 0.42}}Metrics are thread-safe and collected automatically during execution.
We welcome contributions! See CONTRIBUTING.md for guidelines:
- Small PRs preferred
- Tests required for all changes
- No new dependencies without discussion
- Readability > cleverness
MIT License - see LICENSE for details.
Pyrexis (πύρεξις) — Greek for "fever" or "intense heat."
A fitting name for a system that orchestrates intense computational workloads.
Built for engineers who need reliable job orchestration, not magic.