A lightweight, composable plugin framework for building extensible AI systems.
CPEX lets you intercept, enforce, and extend application behavior through plugins without modifying core logic.
Define hook points in your application, write plugins that attach to them, and compose enforcement pipelines that run automatically.
from cpex.framework import hook, Plugin, PluginResult
class RateLimitPlugin(Plugin):
@hook("tool_pre_invoke")
async def check_rate_limit(self, payload, context):
if self.is_over_limit(context):
return PluginResult(
continue_processing=False,
violation=PluginViolation(reason="Rate limit exceeded", code="RATE_LIMIT")
)
return PluginResult(continue_processing=True)Register the plugin, and it runs at every hook invocation. No changes to your application logic.
pip install cpexAI systems interact with tools, APIs, data sources, and other agents. Adding guardrails, observability, or policy checks typically means embedding that logic directly into application code, leading to duplication, tight coupling, and drift.
CPEX introduces standardized interception hooks between your application and its operations. Plugins attach to these hooks and run automatically, keeping enforcement logic separate from business logic.
What you can build with CPEX:
- Security — access control, prompt injection detection, data loss prevention
- Observability — request tracing, audit logging, metrics collection
- Governance — policy enforcement, compliance validation, approval workflows
- Reliability — rate limiting, circuit breakers, response validation
CPEX is designed for modern AI and agent systems, but works equally well for any application that needs safe, modular extensibility.
Your application defines hooks — named interception points before and after critical operations. Plugins register against these hooks and execute automatically when triggered.
Application → Hook Point → Plugin Manager → Application (remaining processing) → Result
│
┌──────┼──────┐
▼ ▼ ▼
Plugin Plugin Plugin
The plugin manager handles registration, ordering, execution, timeouts, and error isolation. You get a deterministic pipeline with no surprises.
A hook is a named interception point in your application. You define a hook where you want plugins to be able to run, then call it there.
Define hook models:
from cpex.framework import PluginPayload, PluginResult
class EmailPayload(PluginPayload):
recipient: str
subject: str
body: str
EmailResult = PluginResult[EmailPayload]Register it:
from cpex.framework.hooks.registry import get_hook_registry
registry = get_hook_registry()
registry.register_hook("email_pre_send", EmailPayload, EmailResult)Call the hook in your application:
async def send_email(recipient: str, subject: str, body: str):
payload = EmailPayload(recipient=recipient, subject=subject, body=body)
context = GlobalContext(request_id="req-123")
result, _ = await manager.invoke_hook("email_pre_send", payload, context)
if not result.continue_processing:
raise PolicyError(result.violation.reason)
# proceed with sending
await smtp.send(payload.recipient, payload.subject, payload.body)CPEX also ships with built-in hooks for common AI operations (tool_pre_invoke, tool_post_invoke, prompt_pre_fetch, prompt_post_fetch, resource_pre_fetch, resource_post_fetch, agent_pre_invoke, agent_post_invoke). These follow the same pattern and are ready to use without registration.
A plugin is a class that implements one or more hook handlers. Use the @hook decorator to attach a method to any hook by name:
from cpex.framework import hook, Plugin, PluginViolation, PluginResult
class EmailFilterPlugin(Plugin):
@hook("email_pre_send")
async def block_external_domains(self, payload: EmailPayload, context) -> PluginResult:
allowed = self.config.config.get("allowed_domains", [])
domain = payload.recipient.split("@")[-1]
if allowed and domain not in allowed:
return PluginResult(
continue_processing=False,
violation=PluginViolation(
reason="Domain not allowed",
code="DOMAIN_BLOCKED",
details={"domain": domain}
)
)
return PluginResult(continue_processing=True)The @hook decorator decouples method names from hook names, which is useful when a plugin handles multiple hooks or when names would otherwise conflict.
For built-in hooks, you can also use the naming convention directly (method name matches hook name) without a decorator:
class ContentFilterPlugin(Plugin):
async def tool_pre_invoke(self, payload: ToolPreInvokePayload, context: PluginContext) -> ToolPreInvokeResult:
blocked = self.config.config.get("blocked_tools", [])
if payload.name in blocked:
return ToolPreInvokeResult(
continue_processing=False,
violation=PluginViolation(reason="Tool blocked by policy", code="TOOL_BLOCKED")
)
return ToolPreInvokeResult(continue_processing=True)A plugin method can:
- Allow execution to continue
- Block execution with a violation
- Modify the payload (using copy-on-write isolation)
Plugins run in phases in this order: sequential → audit → concurrent → fire_and_forget.
| Mode | Execution | Can block? | State merged? | Use case |
|---|---|---|---|---|
sequential |
One at a time, chained | Yes | Yes | Enforcement pipelines |
audit |
One at a time, chained | No | No | Logging, monitoring |
concurrent |
Parallel, fail-fast | Yes | Yes | Independent validations |
fire_and_forget |
Background, after all phases | No | No | Telemetry, audit logs |
disabled |
Not loaded | — | — | Plugin off |
Error handling is configured separately with on_error, independent of mode:
on_error |
Behavior |
|---|---|
fail |
Pipeline halts, error propagates (default) |
ignore |
Error logged; pipeline continues |
disable |
Error logged; plugin auto-disabled; pipeline continues |
The PluginManager orchestrates everything:
from cpex.framework import PluginManager, GlobalContext
from cpex.framework.hooks.tools import ToolPreInvokePayload
manager = PluginManager("plugins/config.yaml")
await manager.initialize()
context = GlobalContext(request_id="req-123", user="alice")
payload = ToolPreInvokePayload(name="web_search", args={"query": "CPEX framework"})
result, plugin_contexts = await manager.invoke_hook("tool_pre_invoke", payload, context)
if result.continue_processing:
# Proceed — use result.modified_payload if a plugin transformed it
pass
else:
# A plugin blocked execution
print(f"Blocked: {result.violation.reason}")Plugins are configured in YAML:
plugin_dirs:
- ./plugins
plugins:
- name: email_filter
kind: my_app.plugins.EmailFilterPlugin
version: 1.0.0
hooks:
- email_pre_send
mode: sequential
priority: 10
config:
allowed_domains:
- company.com
- partner.orgPlugins are scheduled by mode, and execute in priority order within sequential bands (lower number = higher priority). Use this to ensure validation runs before transformation, and transformation runs before logging.
Plugin Scheduling
At each hook invocation, plugins are grouped and scheduled by execution modes, following a strict group order:
sequential → audit → concurrent → fire_and_forget
Within sequential and audit groups, plugins execute in priority order (lower number = higher priority, e.g., 10 runs before 20).
Restrict plugins to specific contexts:
plugins:
- name: tenant_plugin
kind: my_app.plugins.TenantPlugin
hooks:
- tool_pre_invoke
mode: sequential
conditions:
- tenant_ids: [tenant-1, tenant-2]
server_ids: [server-prod]Plugins are plain async classes — test them directly:
import pytest
from cpex.framework import PluginConfig, GlobalContext, PluginContext
@pytest.mark.asyncio
async def test_email_filter_blocks_external_domain():
config = PluginConfig(
name="test_filter",
kind="my_app.plugins.EmailFilterPlugin",
version="1.0.0",
hooks=["email_pre_send"],
config={"allowed_domains": ["company.com"]}
)
plugin = EmailFilterPlugin(config)
payload = EmailPayload(recipient="user@external.com", subject="Hello", body="...")
context = PluginContext(global_context=GlobalContext(request_id="test-1"))
result = await plugin.block_external_domains(payload, context)
assert result.continue_processing is False
assert result.violation.code == "DOMAIN_BLOCKED"Plugins can run as standalone services, connected over MCP (Streamable HTTP), gRPC, or Unix domain sockets.
plugins:
- name: remote_validator
kind: external
hooks:
- tool_pre_invoke
mode: sequential
mcp:
proto: STREAMABLEHTTP
url: https://plugin-server.example.com
tls:
certfile: /path/to/client-cert.pem
keyfile: /path/to/client-key.pem
ca_bundle: /path/to/ca-bundle.pemBuild an external plugin server with the built-in ExternalPluginServer:
from cpex.framework import ExternalPluginServer
server = ExternalPluginServer(plugins=[MyPlugin(config)])
server.run()CPEX is under active development as part of the ContextForge ecosystem. The framework is designed to work across AI gateways, agent frameworks, LLM proxies, and tool servers.
Contributions are welcome. Open an issue, propose a plugin, or submit a pull request.
