Skip to content

A comprehensive, modular JSON-RPC 2.0 implementation for Rust with multiple transport layers and extra features.

License

Notifications You must be signed in to change notification settings

ashforge-rs/ash-rpc

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

90 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ASH-RPC

ash-rpc on crates.io ash-rpc docs codecov License

A modular, production-ready JSON-RPC 2.0 implementation for Rust with security, observability, and multiple transport layers.

Overview

ash-rpc provides a complete JSON-RPC 2.0 ecosystem with enterprise-grade features for building distributed systems.

Features

Core JSON-RPC 2.0

  • Full JSON-RPC 2.0 specification support (requests, responses, notifications, batch operations)
  • Multiple transport layers: TCP, TCP streaming, TLS-encrypted connections
  • Built-in security: rate limiting, connection limits, request size controls, timeout management
  • Structured audit logging with correlation IDs for distributed tracing
  • Authentication and authorization hooks with connection-level context
  • Error sanitization to prevent sensitive data leakage
  • Stateful handlers with shared application state
  • Streaming and subscription support for real-time events
  • Graceful shutdown with connection draining
  • Type-safe builders for requests, responses, and configurations

Contrib Features (Optional)

  • HTTP transport with Axum web framework integration
  • Health check endpoints for service monitoring
  • Trait-based structured logging with tracing backend
  • Prometheus metrics (request counters, duration histograms, error tracking)
  • OpenTelemetry distributed tracing with Jaeger integration
  • Unified observability API combining logging, metrics, and tracing
  • Tower middleware integration for HTTP services

Installation

# Core features
cargo add ash-rpc --features tcp,stateful,streaming,shutdown

# With contrib features
cargo add ash-rpc --features axum,healthcheck,observability

Available Features:

  • Core: tcp, tcp-stream, tcp-stream-tls, stateful, streaming, shutdown, audit-logging
  • Contrib: axum, healthcheck, tower, logging, prometheus, opentelemetry, observability

Quick Start

Basic Method Handler

use ash_rpc::*;

struct CalculatorMethod;

#[async_trait::async_trait]
impl JsonRPCMethod for CalculatorMethod {
    fn method_name(&self) -> &'static str {
        "calculate"
    }

    async fn call(&self, params: Option<serde_json::Value>, id: Option<RequestId>) -> Response {
        let result = params
            .and_then(|p| p.get("expression"))
            .and_then(|e| e.as_str())
            .map(|expr| format!("Result: {}", expr))
            .unwrap_or_else(|| "Invalid expression".to_string());

        rpc_success!(result, id)
    }
}

TCP Server with Security

use ash_rpc_core::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let security_config = SecurityConfigBuilder::new()
        .max_connections(1000)
        .max_request_size(1024 * 1024)
        .request_timeout(std::time::Duration::from_secs(30))
        .build();

    let registry = MethodRegistry::new(register_methods![CalculatorMethod]);
    let processor = MessageProcessor::new(registry);

    let server = TcpStreamServerBuilder::new("127.0.0.1:8080")
        .processor(processor)
        .security_config(security_config)
        .build()?;

    server.run().await?;
    Ok(())
}

HTTP Server with Axum

use ash_rpc_core::*;
use ash_rpc_contrib::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let registry = MethodRegistry::new(register_methods![
        CalculatorMethod,
        HealthCheckMethod
    ]);

    let processor = MessageProcessor::new(registry);

    let app = axum::Router::new()
        .route("/rpc", axum::routing::post(rpc_handler))
        .with_state(processor);

    axum::Server::bind(&"127.0.0.1:3000".parse()?)
        .serve(app.into_make_service())
        .await?;

    Ok(())
}

Observability Integration

use ash_rpc_contrib::observable_setup;

let observability = observable_setup! {
    service_name: "calculator-service",
    metrics_prefix: "calculator",
    otlp_endpoint: "http://jaeger:4317",
};

let processor = MessageProcessor::new(registry);
let observable_processor = ObservableProcessor::builder(processor)
    .with_logger(observability.logger())
    .with_metrics(observability.metrics())
    .with_tracing(observability.tracer())
    .build();

Authentication and Authorization

use ash_rpc_core::*;

struct TokenAuth {
    valid_tokens: Vec<String>,
}

impl auth::AuthPolicy for TokenAuth {
    fn can_access(
        &self,
        method: &str,
        params: Option<&serde_json::Value>,
        ctx: &auth::ConnectionContext,
    ) -> bool {
        params
            .and_then(|p| p.get("token"))
            .and_then(|t| t.as_str())
            .map(|t| self.valid_tokens.contains(&t.to_string()))
            .unwrap_or(false)
    }
}

let registry = MethodRegistry::new(register_methods![CalculatorMethod])
    .with_auth(TokenAuth {
        valid_tokens: vec!["secret_token".to_string()]
    });

Streaming and Subscriptions

use ash_rpc_core::*;
use tokio::sync::mpsc;

struct PriceStreamHandler;

#[async_trait::async_trait]
impl StreamHandler for PriceStreamHandler {
    fn subscription_method(&self) -> &'static str {
        "subscribe_prices"
    }

    async fn start_stream(
        &self,
        stream_id: StreamId,
        params: Option<serde_json::Value>,
        sender: mpsc::UnboundedSender<StreamEvent>,
    ) -> Result<(), Error> {
        tokio::spawn(async move {
            loop {
                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                let event = StreamEvent::new(
                    stream_id.clone(),
                    "price_update",
                    serde_json::json!({"symbol": "BTC", "price": 50000.0}),
                );
                if sender.send(event).is_err() {
                    break;
                }
            }
        });
        Ok(())
    }
}

Examples

View the examples/ directory for full implementations of servers, clients, authentication, TLS, streaming, and observability setups.

Documentation

License

Licensed under the Apache License, Version 2.0. See LICENSE for details.

Project Conventions

This project follows Conventional Commits for commit messages.

About

A comprehensive, modular JSON-RPC 2.0 implementation for Rust with multiple transport layers and extra features.

Topics

Resources

License

Security policy

Stars

Watchers

Forks

Contributors 2

  •  
  •