Unified job scheduler for the North-Shore-AI platform
NSAI.Work provides a protocol-first, multi-tenant job scheduling system with priority queues, resource-aware scheduling, and pluggable backend execution.
- Unified Job IR: Single representation for all work types (tool calls, experiments, training, inference)
- Priority Queues: Four levels (realtime/interactive/batch/offline) with FIFO ordering
- Multi-Tenancy: Tenant isolation with quotas and fairness
- Resource-Aware: CPU, GPU, memory, cost-aware scheduling
- Pluggable Backends: Local, ALTAR, Modal, Ray, and custom backends
- Retry Policies: Configurable exponential/linear/constant backoff
- Telemetry Integration: Full observability via Erlang telemetry
- ETS-Based Registry: Fast in-memory job storage and indexing
Add work to your list of dependencies in mix.exs:
def deps do
[
{:work, "~> 0.1.0"}
]
end# Start the application
{:ok, _} = Application.ensure_all_started(:work)
# Create a job
job = Work.Job.new(
kind: :tool_call,
tenant_id: "acme",
namespace: "default",
priority: :interactive,
payload: %{tool: "calculator", args: [2, 2]},
resources: Work.Resources.new(cpu: 1.0, memory_mb: 512)
)
# Submit for execution
{:ok, submitted} = Work.submit(job)
# Check status
{:ok, job} = Work.get(submitted.id)
IO.inspect(job.status) # => :running or :succeeded
# Get statistics
stats = Work.stats()
IO.inspect(stats)NSAI.Work consists of several cooperating components:
┌─────────────────────────────────────────────────┐
│ Application Layer │
│ (Crucible, Synapse, CNS, ALTAR, etc.) │
└────────────────────┬────────────────────────────┘
│ submit jobs
▼
┌─────────────────────────────────────────────────┐
│ Work.Scheduler │
│ - Admission control │
│ - Priority queue routing │
│ - Resource-aware dispatch │
└────────────────────┬────────────────────────────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
[realtime] [interactive] [batch] [offline]
│ │ │ │
└────────────┴────────────┴─────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ Work.Executor │
│ - Backend selection │
│ - Job execution │
│ - Retry handling │
└────────────────────┬────────────────────────────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
[Local] [ALTAR] [Modal] [Custom]
- Work.Job: Universal job IR with metadata, resources, constraints
- Work.Scheduler: Priority queue management and admission control
- Work.Queue: FIFO queues per priority level
- Work.Executor: Job execution with backend delegation
- Work.Registry: ETS-based job storage and indexing
- Work.Backend: Pluggable execution backends
- Work.Telemetry: Event instrumentation
Jobs move through the following states:
pending -> queued -> running -> {succeeded | failed | canceled | timeout}
↓
(retry) -> queued
- pending: Job created, waiting for admission
- queued: Admitted and waiting in priority queue
- running: Executing on a backend
- succeeded: Completed successfully
- failed: Execution failed (may retry)
- canceled: Explicitly canceled
- timeout: Exceeded time limit
Jobs are scheduled based on four priority levels:
- realtime: Immediate execution, user-blocking operations
- interactive: User-facing operations with quick turnaround
- batch: Background processing, experiments, training
- offline: Bulk operations, data processing, low priority
Specify resource requirements for intelligent scheduling:
resources = Work.Resources.new(
cpu: 4.0, # CPU cores
memory_mb: 8192, # Memory in MB
gpu: "A100", # GPU type
gpu_count: 1, # Number of GPUs
timeout_ms: 300_000, # 5 minute timeout
max_cost_usd: 1.0 # Cost cap
)
job = Work.Job.new(
kind: :training_step,
tenant_id: "acme",
namespace: "ml",
payload: %{model: "llama-3.1-8b"},
resources: resources
)Configure retry behavior per job:
constraints = Work.Constraints.new(
retry_policy: %{
max_attempts: 5,
backoff: :exponential,
base_delay_ms: 1000,
max_delay_ms: 60_000,
jitter: true
}
)
job = Work.Job.new(
kind: :tool_call,
tenant_id: "acme",
namespace: "default",
payload: %{},
constraints: constraints
)NSAI.Work ships with three built-in backends:
Direct BEAM process execution (development, lightweight tasks):
# Automatically selected for simple jobs
job = Work.Job.new(
kind: :tool_call,
tenant_id: "test",
namespace: "default",
payload: %{tool: "echo"}
)Executes tool calls via ALTAR's LATER runtime with ADM function declarations:
# Enable ALTAR in config
config :work,
enable_altar: true,
altar_registry: Work.AltarRegistry
# Register a tool
Work.AltarTools.register(
"proposer_extract",
"Extract claims from document",
%{
type: :OBJECT,
properties: %{
"doc_id" => %{type: :STRING, description: "Document ID"},
"max_claims" => %{type: :NUMBER, description: "Max claims"}
},
required: ["doc_id"]
},
&MyApp.CNS.Proposer.extract/1
)
# Submit a tool call job
job = Work.Job.new(
kind: :tool_call,
tenant_id: "acme",
namespace: "cns",
payload: %{
tool_name: "proposer_extract",
args: %{doc_id: "doc_123", max_claims: 10}
}
)
{:ok, job} = Work.submit(job)See the ALTAR Integration section for more details.
Testing backend with configurable behavior:
# Configure mock to fail
Work.Backends.Mock.configure(behavior: {:fail, "Simulated failure"})
# Or delay execution
Work.Backends.Mock.configure(delay_ms: 500)
# Check execution history
history = Work.Backends.Mock.history()Implement the Work.Backend behaviour:
defmodule MyApp.Backend.Custom do
@behaviour Work.Backend
@impl true
def execute(job) do
# Your execution logic
{:ok, result}
end
@impl true
def supports?(job) do
job.kind == :custom_work
end
end
# Configure executor with custom backend
Work.Executor.start_link(
backends: [
local: Work.Backends.Local,
custom: MyApp.Backend.Custom
]
)NSAI.Work emits telemetry events for observability:
# Attach a handler
:telemetry.attach(
"work-metrics",
[:work, :job, :completed],
&MyApp.Metrics.handle_event/4,
nil
)
# Or use the built-in console logger
Work.Telemetry.attach_console_logger()[:work, :job, :submitted]- Job submitted[:work, :job, :queued]- Job enqueued[:work, :job, :started]- Job execution started[:work, :job, :completed]- Job completed[:work, :job, :canceled]- Job canceled[:work, :job, :retry]- Job retry scheduled[:work, :scheduler, :admit]- Admission decision[:work, :scheduler, :select_backend]- Backend selection[:work, :queue, :enqueue]- Queue enqueue[:work, :queue, :dequeue]- Queue dequeue[:work, :queue, :overflow]- Queue capacity exceeded
All jobs belong to a tenant and namespace:
job = Work.Job.new(
tenant_id: "customer-123",
namespace: "production",
# ...
)
# List jobs for a tenant
jobs = Work.list("customer-123")
# Filter by namespace
jobs = Work.list("customer-123", namespace: "production")
# Filter by status
running = Work.list("customer-123", status: :running)ALTAR (Adaptive Language Tool and Action Runtime) provides a local execution runtime for tool calls with ADM (function declaration) support. The ALTAR backend enables Work to execute tool calls through ALTAR's LATER runtime.
- Add ALTAR to your dependencies (already included with Work):
{:altar, "~> 0.2.0"}- Enable ALTAR in your application config:
# config/config.exs
config :work,
enable_altar: true,
altar_registry: Work.AltarRegistryUse Work.AltarTools to register tools with ALTAR:
# Simple tool
Work.AltarTools.register(
"calculator_add",
"Add two numbers",
%{
type: :OBJECT,
properties: %{
"a" => %{type: :NUMBER, description: "First number"},
"b" => %{type: :NUMBER, description: "Second number"}
},
required: ["a", "b"]
},
fn %{"a" => a, "b" => b} -> {:ok, a + b} end
)
# Complex tool with nested objects
Work.AltarTools.register(
"search_documents",
"Search document corpus with filters",
%{
type: :OBJECT,
properties: %{
"query" => %{type: :STRING, description: "Search query"},
"limit" => %{type: :NUMBER, description: "Max results"},
"filters" => %{
type: :OBJECT,
properties: %{
"category" => %{type: :STRING},
"date_range" => %{
type: :OBJECT,
properties: %{
"start" => %{type: :STRING},
"end" => %{type: :STRING}
}
}
}
}
},
required: ["query"]
},
&MyApp.Search.search/1
)Tool functions must:
- Accept a single argument (map of parameters)
- Return
{:ok, result}on success - Return
{:error, reason}on failure
Example:
defmodule MyApp.CNS.Proposer do
def extract(%{"doc_id" => doc_id} = args) do
max_claims = Map.get(args, "max_claims", 10)
case fetch_document(doc_id) do
{:ok, document} ->
claims = extract_claims(document, max_claims)
{:ok, %{claims: claims, doc_id: doc_id}}
{:error, reason} ->
{:error, "Failed to extract claims: #{reason}"}
end
end
endCreate and submit jobs with the :tool_call kind:
job = Work.Job.new(
kind: :tool_call,
tenant_id: "acme",
namespace: "default",
priority: :interactive,
payload: %{
tool_name: "proposer_extract",
args: %{
doc_id: "doc_123",
max_claims: 5
}
}
)
{:ok, submitted_job} = Work.submit(job)
# Wait for completion
case Work.get(submitted_job.id) do
{:ok, %{status: :succeeded, result: result}} ->
IO.inspect(result)
{:ok, %{status: :failed, error: error}} ->
IO.puts("Job failed: #{error.message}")
end# Check if tool exists
if Work.AltarTools.registered?("calculator_add") do
IO.puts("Tool is available")
endFor CNS (Critic-Network Synthesis) agents:
# Register Proposer agent tool
Work.AltarTools.register(
"cns_proposer",
"Extract structured claims from documents",
%{
type: :OBJECT,
properties: %{
"document" => %{type: :STRING, description: "Source document"},
"schema" => %{type: :STRING, description: "Target schema"}
},
required: ["document"]
},
&CNS.Agents.Proposer.extract/1
)
# Register Antagonist agent tool
Work.AltarTools.register(
"cns_antagonist",
"Find contradictions in claim networks",
%{
type: :OBJECT,
properties: %{
"sno_graph" => %{type: :OBJECT, description: "SNO graph"},
"beta1_threshold" => %{type: :NUMBER, description: "β₁ threshold"}
},
required: ["sno_graph"]
},
&CNS.Agents.Antagonist.critique/1
)
# Execute dialectical workflow
proposer_job = Work.Job.new(
kind: :tool_call,
tenant_id: "research",
namespace: "cns",
priority: :batch,
payload: %{
tool_name: "cns_proposer",
args: %{document: document_text, schema: "scifact"}
}
)
{:ok, proposer_result} = Work.submit(proposer_job)defmodule Crucible.Pipeline.Runner do
def run_stage(experiment, stage, context) do
job = Work.Job.new(
kind: :experiment_step,
tenant_id: context[:tenant_id],
namespace: experiment.id,
priority: :batch,
payload: %{
experiment_id: experiment.id,
stage_name: stage.name,
input_context: context
},
constraints: Work.Constraints.new(
concurrency_group: "experiment:#{experiment.id}"
)
)
Work.submit(job)
end
enddefmodule ALTAR.GRID do
def dispatch(tool, input, opts \\ []) do
job = Work.Job.new(
kind: :tool_call,
tenant_id: opts[:tenant_id] || "default",
namespace: opts[:namespace] || "default",
priority: opts[:priority] || :interactive,
payload: %{
tool_id: tool.id,
function_name: tool.name,
arguments: input
},
resources: translate_resources(tool.resources)
)
Work.submit(job)
end
end- Persistent storage backend (Postgres/Ecto)
- Tenant quotas and rate limiting
- ALTAR backend integration
- Modal.com backend adapter
- Ray backend adapter
- Job dependencies and DAGs
- Concurrency groups
- Cost tracking and budgets
- Web UI for monitoring
This is part of the North-Shore-AI platform. Contributions welcome!
MIT License - see LICENSE file for details