Skip to content

A production-grade backend engine that validates, schedules, executes, and streams AI-style workloads using only Python’s standard library.

License

Notifications You must be signed in to change notification settings

suriyasureshok/Pyrexis

Repository files navigation

PYREXIS

A pure-Python concurrent job execution engine for AI infrastructure workloads.

Tests Python License

Why PYREXIS?

PYREXIS focuses on the infrastructure around AI, not models:

  • Scheduling — Fair priority-based job ordering with starvation prevention
  • Retries — Bounded retry logic with exponential backoff
  • Concurrency — Thread/process/async execution models
  • Streaming — Pipeline-based incremental processing
  • Durability — Persistent state with crash recovery

No ML libraries. No magic. Just reliable job orchestration.


Core Concepts

graph LR
    J[Job] --> S[Scheduler]
    S --> E[Engine]
    E --> EX[Executor]
    EX --> P[Pipeline]
    P --> R[Result]
    E --> ST[StateStore]
    ST -.persist.-> E
Loading

Job → Unit of work

  • Strongly typed with Pydantic
  • State machine enforcement
  • Priority-based scheduling
  • Configurable execution mode (thread/process/async)

Scheduler → Fair ordering

  • Priority queue with aging
  • Starvation prevention
  • Thread-safe operations

Engine → Lifecycle orchestration

  • Job submission and state management
  • Graceful shutdown coordination
  • Metrics collection

Pipeline → Streamed execution

  • Multi-stage processing
  • Generator-based streaming
  • Progress hooks

Executor → Concurrency router

  • Threads for I/O-bound tasks
  • Processes for CPU-bound tasks
  • Async for event-driven orchestration

Minimal Example

Programmatic API

from core import Engine, Scheduler, ExecutorRouter
from models import Job, JobStatus
from storage import StateStore
from utils import ShutdownCoordinator

# Setup
scheduler = Scheduler()
executor = ExecutorRouter()
state_store = StateStore("./state.db")
shutdown = ShutdownCoordinator()

engine = Engine(
    scheduler=scheduler,
    executor=executor,
    state_store=state_store,
    shutdown_coordinator=shutdown
)

# Create and submit job
job = Job(
    job_id="task-1",
    priority=5,
    payload={"data": "process_this"},
    execution_mode="thread"
)

engine.submit_job(job)

# Run engine loop
engine.run_loop()  # Processes jobs until shutdown

Command-Line Interface

# Submit a job
python main.py submit \
  --job-id task-1 \
  --priority 5 \
  --payload '{"type": "example", "data": "process_this"}' \
  --mode thread

# Check status
python main.py status --job-id task-1

# Start daemon to process jobs
python main.py daemon

# Monitor in real-time
python main.py monitor

# List recent jobs
python main.py list --limit 20

# Cancel a job
python main.py cancel --job-id task-1

# View metrics
python main.py metrics

See docs/CLI.md for complete CLI reference.


Job Flow

flowchart TD
    A[Create Job] --> B[Submit to Engine]
    B --> C{Status = CREATED?}
    C -->|Yes| D[Transition to PENDING]
    C -->|No| ERROR[ValueError]
    D --> E[Add to Scheduler]
    E --> F[Engine.run_next]
    F --> G[Scheduler.next_job]
    G -->|Job found| H[Transition to RUNNING]
    G -->|No jobs| END[Return None]
    H --> I[Executor routes by mode]
    I --> J{Execution Mode}
    J -->|thread| K[ThreadWorkerPool]
    J -->|process| L[ProcessWorkerPool]
    J -->|async| M[AsyncTaskRunner]
    K --> N[Execute Pipeline]
    L --> N
    M --> N
    N --> O{Success?}
    O -->|Yes| P[Transition to COMPLETED]
    O -->|No| Q{Retries left?}
    Q -->|Yes| R[Transition to RETRYING]
    Q -->|No| S[Transition to FAILED]
    R --> H
    P --> T[Save Result]
    S --> T
    T --> U[Persist to StateStore]
Loading

Failure Model

PYREXIS is designed for graceful degradation, not perfect uptime:

  • Crashes are expected — Jobs retry automatically
  • Retries are bounded — Max attempts prevent infinite loops
  • Results are immutable — Once saved, never modified
  • State is durable — Persistent storage survives restarts

Retry Logic

# Job fails during execution
job.record_failure("Connection timeout")

# Automatic retry if attempts < max_retries
if job.attempts < job.max_retries:
    job.status = RETRYING
    # Exponential backoff: 2^attempts seconds
else:
    job.status = FAILED

Shutdown Behavior

  • Graceful shutdown — No new jobs pulled, current jobs finish
  • Thread pools stop — Workers exit after current task
  • Process pools terminate — Clean resource cleanup
  • State persists — All job states saved before exit

When NOT to Use PYREXIS

❌ Use Case Why Not Use Instead
Real-time systems (< 10ms latency) Threading/scheduling overhead Direct function calls
Distributed clusters Single-machine only Celery, Ray
GPU training pipelines No CUDA awareness Kubeflow, MLflow
High-throughput streaming (>100k jobs/sec) Not optimized for scale Kafka, Flink

When to use PYREXIS:

  • ✅ Orchestrating AI inference pipelines (batch processing)
  • ✅ Multi-stage data preprocessing workflows
  • ✅ Concurrent API request handling (I/O-bound)
  • ✅ CPU-intensive batch computations
  • ✅ Retry-heavy external service calls

Installation

# Clone repository
git clone https://github.com/yourusername/pyrexis.git
cd pyrexis

# Install dependencies (minimal)
pip install pydantic pytest

# Or install in development mode
pip install -e .

# Run tests
pytest tests/ -v

Testing Philosophy

PYREXIS has 47 brutal tests that try to break the system:

  • Starvation prevention — Low-priority jobs eventually run
  • Race conditions — 500 concurrent job submissions
  • Retry exhaustion — Exact max_retries enforcement
  • State transitions — Illegal transitions rejected
  • Graceful shutdown — No data loss on interrupt

Key insight: We test failure modes, not just functionality.

# Run all tests
pytest tests/ -v

# Run specific test category
pytest tests/test_scheduler.py -v
pytest tests/test_concurrency.py -v
pytest tests/test_shutdown.py -v
pytest tests/test_load.py -v

# Run with coverage
pytest tests/ --cov=. --cov-report=html

Project Structure

pyrexis/
├── api/                # CLI interface for job management
├── core/               # Engine, Scheduler, Pipeline, Executor
│   ├── engine.py       # Job lifecycle orchestration
│   ├── scheduler.py    # Priority-based job scheduling
│   ├── pipeline.py     # Multi-stage streaming execution
│   ├── executor.py     # Concurrency routing
│   └── base_pipeline.py # Base class for custom pipelines
├── models/             # Job and Result data models
│   ├── job.py          # Job model with state machine
│   └── result.py       # Execution result model
├── storage/            # StateStore persistence
│   └── state.py        # File-based state storage (shelve)
├── concurrency/        # Thread/Process/Async pools
│   ├── threads.py      # ThreadWorkerPool for I/O-bound tasks
│   ├── processes.py    # ProcessWorkerPool for CPU-bound tasks
│   └── async_tasks.py  # AsyncTaskRunner for event-driven tasks
├── utils/              # Utilities and observability
│   ├── metrics.py      # MetricsRegistry and TimedBlock
│   ├── logging.py      # Logging configuration
│   ├── retry.py        # Retry decorators and utilities
│   ├── shutdown.py     # ShutdownCoordinator
│   ├── cache.py        # LRU cache implementation
│   ├── profiling.py    # Performance profiling tools
│   ├── timing.py       # Timer utilities
│   └── registry.py     # PluginRegistry metaclass
├── plugins/            # Custom pipeline implementations
│   └── text_inference.py # Example text inference pipeline
├── tests/              # Comprehensive test suite
└── docs/               # Architecture and design docs

Architecture

See docs/ARCHITECTURE.md for detailed system design, including:

  • Component interactions
  • State machine diagram
  • Concurrency model
  • Failure recovery strategies

Metrics & Observability

# Access metrics
metrics = engine.get_metrics()

# View counters
print(metrics.get_counters())
# {'job.retries': 3}

# View timings
print(metrics.get_timings())
# {'job.execution': {'count': 10, 'avg': 0.15, 'max': 0.42}}

Metrics are thread-safe and collected automatically during execution.


Contributing

We welcome contributions! See CONTRIBUTING.md for guidelines:

  • Small PRs preferred
  • Tests required for all changes
  • No new dependencies without discussion
  • Readability > cleverness

License

MIT License - see LICENSE for details.


Why "PYREXIS"?

Pyrexis (πύρεξις) — Greek for "fever" or "intense heat."
A fitting name for a system that orchestrates intense computational workloads.


Built for engineers who need reliable job orchestration, not magic.

About

A production-grade backend engine that validates, schedules, executes, and streams AI-style workloads using only Python’s standard library.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages