Skip to content

Agentic Mesh — a lightweight, peer-to-peer JSON-RPC framework enabling autonomous agents to discover, communicate, and delegate tasks dynamically across a distributed network.

License

Notifications You must be signed in to change notification settings

apoorvanand/agentic-mesh

Repository files navigation

🧠 Agentic Mesh – JSON-RPC Peer-to-Peer System

Agentic Mesh: Distributed intelligence made simple — FastAPI-powered peer agents with real-time metrics and latency-aware task routing.

A minimal agent-to-agent JSON-RPC framework with:

  • 🔗 Service discovery registry
  • ⚙️ Dynamic peer delegation
  • 📊 CPU / memory metrics
  • Latency-aware routing

Built entirely in Python + FastAPI + asyncio, this demo shows how agentic systems can cooperate, delegate tasks, and make routing decisions based on live system telemetry.

🏗️ Architecture Overview


         ┌──────────────────────────┐
         │ Discovery Server (8500)  │
         │ • Registry of agents     │
         │ • Heartbeats every 10 s  │
         └──────────┬───────────────┘
                    │
      ┌─────────────┴─────────────┐
      │                           │

┌────────────┐ ┌────────────┐ │ Agent-A │ <────────▶ │ Agent-B │ │ JSON-RPC │ │ JSON-RPC │ │ + Metrics │ │ + Metrics │ └────────────┘ └────────────┘ │ │ └─────────────┬─────────────┘ │ ┌──────────────────────────┐ │ Orchestrator / Dashboard │ └──────────────────────────┘


Each agent:

  • Registers itself with the Discovery Server
  • Sends heartbeats every 10 seconds
  • Exposes JSON-RPC methods:
    • info – returns CPU/memory
    • work – simulates a job
    • delegate – routes work to the least-loaded peer

📂 Project Structure


agent_system/
├── discovery_server.py   # Central registry
├── agent_node.py         # Peer agent node (self-registering)
└── orchestrator_client.py# Orchestrator for monitoring/delegation


⚙️ Requirements

Install dependencies:

pip install fastapi uvicorn httpx psutil

🚀 Running the System

1. Start the Discovery Server

python discovery_server.py

Listens on http://localhost:8500/services Stores all active agents with their URL + last heartbeat.


2. Launch Agents

Run one or more peers (each auto-registers and heartbeats):

python agent_node.py

Each agent:

  • Gets a random name like agent-214
  • Binds to a random port between 8600–8700
  • Registers at the discovery service

Start multiple in different terminals to form a mesh.


3. Run the Orchestrator

python orchestrator_client.py

This script:

  • Discovers all live agents
  • Picks one to call the delegate RPC
  • That agent then queries all peers → picks least-loaded node
  • Work is delegated and metrics (CPU, memory, latency) are displayed

🧾 Example Output

Discovered: ['agent-214','agent-992']

🧠 Delegation Result:
 {
   'delegated_to': {
      'agent': 'agent-992', 'cpu': 8.7, 'mem': 47.3,
      'latency_ms': 7.4, 'url': '127.0.0.1:8612'
   },
   'peer_result': {
      'agent': 'agent-992',
      'result': 'Processed: parallel workload',
      'rpc_latency_ms': 309.8
   },
   'rpc_latency_ms': 512.6
 }

Network latency 11.2 ms

📊 Features

Feature Description
Service Discovery Agents auto-register and send periodic heartbeats
Peer Delegation Any agent can delegate work to the least-loaded peer
System Metrics Reports live CPU % and memory % for load balancing
Latency Tracking Measures both local RPC latency and network RTT
Async Parallelism All calls use asyncio + HTTPX for concurrency

🧠 Extending the Mesh

Enhancement Hint
Dashboard Add a FastAPI + WebSocket page to stream live metrics
Weighted routing Weight = 0.7 × CPU + 0.3 × latency
Persistent registry Store registry in Redis or etcd
Security Add API-key or mutual TLS auth between peers
Tracing Add OpenTelemetry IDs in JSON-RPC payloads
Autoscaling Launch new agents when cluster CPU > 70 %

🧩 Example RPC Payload

Request

{
  "jsonrpc": "2.0",
  "method": "work",
  "params": { "text": "Optimize Kafka consumer throughput" },
  "id": "1234"
}

Response

{
  "jsonrpc": "2.0",
  "result": {
    "agent": "agent-214",
    "result": "Processed: Optimize Kafka consumer throughput",
    "rpc_latency_ms": 312.4
  },
  "id": "1234"
}
%%{init: {'theme': 'neutral', 'themeVariables': { 'fontSize': '15px', 'primaryColor': '#e6f0ff', 'edgeLabelBackground':'#ffffff', 'primaryTextColor': '#2b2b2b', 'nodeBorder':'#007acc', 'tertiaryColor':'#dae8fc'}}}%%
graph TD
    subgraph Discovery["📍 Discovery Server (port:8500)"]
    D[Discovery Registry<br/>• Stores Agent URLs<br/>• Tracks Heartbeats<br/>• Responds to /services]
    end

    subgraph Agents["🤖 Agent Mesh Network"]
    A1["Agent-A<br/>JSON-RPC<br/>CPU: 9% | MEM: 47%"]
    A2["Agent-B<br/>JSON-RPC<br/>CPU: 6% | MEM: 51%"]
    A3["Agent-C<br/>JSON-RPC<br/>CPU: 13% | MEM: 59%"]
    end

    subgraph Orchestrator["🧠 Orchestrator / Dashboard"]
    O1["Monitor + Delegator<br/>Queries Discovery → Agents<br/>Displays CPU / Latency / Memory"]
    end

    O1 -->|GET /services| D
    D -->|Active Agents JSON| O1
    A1 -->|POST /register| D
    A2 -->|POST /register| D
    A3 -->|POST /register| D

    A1 <--> A2
    A2 <--> A3
    A1 <--> A3

    A1 -- "delegate(work)" --> A2
    A2 -- "respond(result, latency=312ms)" --> A1
Loading

🧠 Explanation

  • Discovery Server (port 8500)
    Acts as the registry. Every agent periodically heartbeats here via /register.
    Orchestrator or peers use /services to fetch the active agent list.

  • Agents (peer mesh)
    Each exposes /rpc with JSON-RPC 2.0 methods:

    • info → returns CPU, memory
    • work → executes synthetic task
    • delegate → queries other peers → picks least-loaded target
  • Orchestrator
    Optional — visualizes cluster health, aggregates metrics, or injects tasks.


🧮 Latency-Aware Delegation Flow

Step Action Example Timing
1 Agent-A receives client task (delegate)
2 Agent-A queries all peers’ /rpc(info) 5–10 ms each
3 Agent-A ranks peers by CPU + latency chooses Agent-B
4 Agent-A sends /rpc(work) to Agent-B 300 ms
5 Agent-B returns result → Agent-A → Client end-to-end ≈ 520 ms

📈 Visualization Idea (optional live dashboard)

Later, you can integrate a FastAPI + WebSocket dashboard that:

  • Polls /services every few seconds
  • Plots agent CPU/memory as a live chart
  • Shows peer-to-peer latency arrows with changing color intensity
    (green = fast, orange = moderate, red = slow)

🔍 Comparison with Existing Frameworks

Framework Core Purpose Architecture Type Networking Load Awareness Complexity Best Suited For
Agentic Mesh (this project) Peer-to-peer JSON-RPC orchestration with discovery and metrics Decentralized mesh ✅ Native HTTP / JSON-RPC ✅ CPU + Memory + Latency built-in 🟢 Lightweight (~300 LOC) Research, AI orchestration, P2P agents
LangGraph (LangChain) Graph-based orchestration of LLM nodes Centralized DAG (in-memory) ❌ Local only ⚠️ Limited 🟠 Medium Complex reasoning / chain orchestration
AutoGen (Microsoft) Multi-agent conversation simulation Centralized process ⚠️ Partial ❌ None 🟠 Medium LLM dialogues and coordination
Ray / Ray Serve Distributed compute actors & microservices Centralized cluster (Ray runtime) ✅ TCP RPC ⚠️ Partial 🔴 Heavy ML training, scalable serving
Temporal / Netflix Conductor Persistent workflow engine Centralized orchestrator ✅ gRPC / REST ⚠️ Optional 🔴 Heavy Enterprise workflow automation
Celery Distributed task queue (workers + broker) Centralized queue (Redis / RabbitMQ) ✅ AMQP / Redis ❌ None 🟠 Medium Background job execution
NATS / ZeroMQ Pub/Sub messaging bus Decentralized sockets ✅ Binary transport ❌ None 🟠 Medium Low-latency message passing

🧭 Where Agentic Mesh Stands Out

  • 🧩 Decentralized by design: Every node can delegate or accept work; no single point of control.
  • ⚙️ Lightweight infrastructure: Run multiple agents locally or across machines with just Python + FastAPI.
  • 📊 Built-in observability: Agents report CPU, memory, and latency to make routing decisions.
  • 🔄 Self-healing discovery: Stale nodes expire automatically; new ones self-register.
  • 🧠 Perfect for experimentation: Ideal for AI orchestration research, distributed reasoning, and performance testing.

📜 License

MIT – use freely for educational and internal research purposes.


🙌 Credits

Designed as a lightweight demonstration of agentic orchestration patterns Inspired by real-world distributed frameworks like Netflix Conductor, Temporal, and LangGraph.

About

Agentic Mesh — a lightweight, peer-to-peer JSON-RPC framework enabling autonomous agents to discover, communicate, and delegate tasks dynamically across a distributed network.

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages