A modular, production-ready JSON-RPC 2.0 implementation for Rust with security, observability, and multiple transport layers.
ash-rpc provides a complete JSON-RPC 2.0 ecosystem with enterprise-grade features for building distributed systems.
Core JSON-RPC 2.0
- Full JSON-RPC 2.0 specification support (requests, responses, notifications, batch operations)
- Multiple transport layers: TCP, TCP streaming, TLS-encrypted connections
- Built-in security: rate limiting, connection limits, request size controls, timeout management
- Structured audit logging with correlation IDs for distributed tracing
- Authentication and authorization hooks with connection-level context
- Error sanitization to prevent sensitive data leakage
- Stateful handlers with shared application state
- Streaming and subscription support for real-time events
- Graceful shutdown with connection draining
- Type-safe builders for requests, responses, and configurations
Contrib Features (Optional)
- HTTP transport with Axum web framework integration
- Health check endpoints for service monitoring
- Trait-based structured logging with tracing backend
- Prometheus metrics (request counters, duration histograms, error tracking)
- OpenTelemetry distributed tracing with Jaeger integration
- Unified observability API combining logging, metrics, and tracing
- Tower middleware integration for HTTP services
Installation
# Core features
cargo add ash-rpc --features tcp,stateful,streaming,shutdown
# With contrib features
cargo add ash-rpc --features axum,healthcheck,observabilityAvailable Features:
- Core:
tcp,tcp-stream,tcp-stream-tls,stateful,streaming,shutdown,audit-logging - Contrib:
axum,healthcheck,tower,logging,prometheus,opentelemetry,observability
use ash_rpc::*;
struct CalculatorMethod;
#[async_trait::async_trait]
impl JsonRPCMethod for CalculatorMethod {
fn method_name(&self) -> &'static str {
"calculate"
}
async fn call(&self, params: Option<serde_json::Value>, id: Option<RequestId>) -> Response {
let result = params
.and_then(|p| p.get("expression"))
.and_then(|e| e.as_str())
.map(|expr| format!("Result: {}", expr))
.unwrap_or_else(|| "Invalid expression".to_string());
rpc_success!(result, id)
}
}use ash_rpc_core::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let security_config = SecurityConfigBuilder::new()
.max_connections(1000)
.max_request_size(1024 * 1024)
.request_timeout(std::time::Duration::from_secs(30))
.build();
let registry = MethodRegistry::new(register_methods![CalculatorMethod]);
let processor = MessageProcessor::new(registry);
let server = TcpStreamServerBuilder::new("127.0.0.1:8080")
.processor(processor)
.security_config(security_config)
.build()?;
server.run().await?;
Ok(())
}use ash_rpc_core::*;
use ash_rpc_contrib::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let registry = MethodRegistry::new(register_methods![
CalculatorMethod,
HealthCheckMethod
]);
let processor = MessageProcessor::new(registry);
let app = axum::Router::new()
.route("/rpc", axum::routing::post(rpc_handler))
.with_state(processor);
axum::Server::bind(&"127.0.0.1:3000".parse()?)
.serve(app.into_make_service())
.await?;
Ok(())
}use ash_rpc_contrib::observable_setup;
let observability = observable_setup! {
service_name: "calculator-service",
metrics_prefix: "calculator",
otlp_endpoint: "http://jaeger:4317",
};
let processor = MessageProcessor::new(registry);
let observable_processor = ObservableProcessor::builder(processor)
.with_logger(observability.logger())
.with_metrics(observability.metrics())
.with_tracing(observability.tracer())
.build();use ash_rpc_core::*;
struct TokenAuth {
valid_tokens: Vec<String>,
}
impl auth::AuthPolicy for TokenAuth {
fn can_access(
&self,
method: &str,
params: Option<&serde_json::Value>,
ctx: &auth::ConnectionContext,
) -> bool {
params
.and_then(|p| p.get("token"))
.and_then(|t| t.as_str())
.map(|t| self.valid_tokens.contains(&t.to_string()))
.unwrap_or(false)
}
}
let registry = MethodRegistry::new(register_methods![CalculatorMethod])
.with_auth(TokenAuth {
valid_tokens: vec!["secret_token".to_string()]
});use ash_rpc_core::*;
use tokio::sync::mpsc;
struct PriceStreamHandler;
#[async_trait::async_trait]
impl StreamHandler for PriceStreamHandler {
fn subscription_method(&self) -> &'static str {
"subscribe_prices"
}
async fn start_stream(
&self,
stream_id: StreamId,
params: Option<serde_json::Value>,
sender: mpsc::UnboundedSender<StreamEvent>,
) -> Result<(), Error> {
tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let event = StreamEvent::new(
stream_id.clone(),
"price_update",
serde_json::json!({"symbol": "BTC", "price": 50000.0}),
);
if sender.send(event).is_err() {
break;
}
}
});
Ok(())
}
}View the examples/ directory for full implementations of servers, clients, authentication, TLS, streaming, and observability setups.
- API documentation:
cargo doc --open - Core package: core/README.md
- Contrib package: contrib/README.md
Licensed under the Apache License, Version 2.0. See LICENSE for details.
This project follows Conventional Commits for commit messages.