Skip to content

WIP: feat: constraint params to valid JSONRPC params #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tokio-jsonrpc"
version = "0.7.3"
version = "0.8.0"
authors = [
"Michal 'vorner' Vaner <vorner@vorner.cz>",
"Ignacio Corderi <icorderi@msn.com>"]
Expand Down Expand Up @@ -30,3 +30,8 @@ slog = "~2.0.0-3"
[dev-dependencies]
slog-term = "~2.0.0-3"
slog-async = "~2.0.0-3"
tokio-jsonrpc-derive = { path = "tokio-jsonrpc-derive" }

[workspace]
members = ["tokio-jsonrpc-derive"]

14 changes: 8 additions & 6 deletions examples/time_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

#[macro_use]
extern crate tokio_jsonrpc;
#[macro_use]
extern crate tokio_jsonrpc_derive;
extern crate tokio_core;
extern crate tokio_io;
#[macro_use]
Expand All @@ -35,10 +37,10 @@ use serde_json::Value;
use slog::{Drain, Logger};
use slog_term::{FullFormat, PlainSyncDecorator};

use tokio_jsonrpc::{Endpoint, LineCodec, RpcError, Server, ServerCtl};
use tokio_jsonrpc::{Endpoint, LineCodec, Params, RpcError, Server, ServerCtl};

/// A helper struct to deserialize the parameters
#[derive(Deserialize)]
#[derive(Deserialize, Params)]
struct SubscribeParams {
secs: u64,
#[serde(default)]
Expand Down Expand Up @@ -67,7 +69,7 @@ impl Server for TimeServer {
/// Just a formality, we don't need this one
type NotificationResult = Result<(), ()>;
/// The actual implementation of the RPC methods
fn rpc(&self, ctl: &ServerCtl, method: &str, params: &Option<Value>)
fn rpc(&self, ctl: &ServerCtl, method: &str, params: &Option<Params>)
-> Option<Self::RpcCallResult> {
match method {
// Return the number of seconds since epoch (eg. unix timestamp)
Expand All @@ -79,20 +81,20 @@ impl Server for TimeServer {
"subscribe" => {
debug!(self.1, "Subscribing");
// Some parsing and bailing out on errors
let (s_params,) = jsonrpc_params!(params, "s_params" => SubscribeParams);
let params: SubscribeParams = parse_params!(params.clone());
// We need to have a client to be able to send notifications
let client = ctl.client();
let handle = self.0.clone();
let logger = self.1.clone();
// Get a stream that „ticks“
let result = Interval::new(Duration::new(s_params.secs, s_params.nsecs), &self.0)
let result = Interval::new(Duration::new(params.secs, params.nsecs), &self.0)
.or_else(|e| Err(RpcError::server_error(Some(format!("Interval: {}", e)))))
.map(move |interval| {
let logger_cloned = logger.clone();
// And send the notification on each tick (and pass the client through)
let notified = interval.fold(client, move |client, _| {
debug!(logger_cloned, "Tick");
client.notify("time".to_owned(), Some(json!([now()])))
client.notify("time".to_owned(), params!([now()]))
})
// So it can be spawned, spawn needs ().
.map(|_| ())
Expand Down
10 changes: 6 additions & 4 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use serde_json::{Value, to_value};
use slog::{Discard, Logger};
use tokio_core::reactor::{Handle, Timeout};

use message::{Broken, Message, Notification, Parsed, Request, Response, RpcError};
use message::{Broken, Message, Notification, Params, Parsed, Request, Response, RpcError};
use server::{Empty as EmptyServer, Server};

/// Thing that terminates the connection once dropped.
Expand Down Expand Up @@ -398,7 +398,8 @@ impl Client {
/// once the message is sent. It yields the Client back (it is blocked for the time of sending)
/// and another future that resolves once the answer is received (or once a timeout happens, in
/// which case the result is None).
pub fn call(self, method: String, params: Option<Value>, timeout: Option<Duration>) -> RpcSent {
pub fn call(self, method: String, params: Option<Params>, timeout: Option<Duration>)
-> RpcSent {
// We have to deconstruct self now, because the sender's send takes ownership for it for a
// while. We construct it back once the message is passed on.
let data = self.data;
Expand Down Expand Up @@ -469,7 +470,7 @@ impl Client {
///
/// It creates a notification message and sends it. It returs a future that resolves once the
/// message is sent and yields the client back for further use.
pub fn notify(self, method: String, params: Option<Value>) -> Notified {
pub fn notify(self, method: String, params: Option<Params>) -> Notified {
let data = self.data;
trace!(data.logger, "Sending notification {}", method);
let future = self.sender
Expand Down Expand Up @@ -503,6 +504,7 @@ impl Client {
/// ```rust,no_run
/// # extern crate tokio_core;
/// # extern crate tokio_io;
/// # #[macro_use]
/// # extern crate tokio_jsonrpc;
/// # extern crate futures;
/// # #[macro_use]
Expand All @@ -528,7 +530,7 @@ impl Client {
/// .start(&handle);
/// // Call a method with some parameters and a 10 seconds timeout
/// client.call("request".to_owned(),
/// Some(json!(["param1", "param2"])),
/// params!(["param1", "param2"]),
/// Some(Duration::new(10, 0)))
/// .and_then(|(_client, future_result)| future_result)
/// .map(|response| {
Expand Down
9 changes: 5 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
//! # use tokio_core::reactor::Core;
//! # use tokio_core::net::TcpListener;
//! # use tokio_io::AsyncRead;
//! # use tokio_jsonrpc::{LineCodec, Server, ServerCtl, RpcError, Endpoint};
//! # use tokio_jsonrpc::{LineCodec, Server, ServerCtl, Params, RpcError, Endpoint};
//! # use futures::{Future, Stream};
//! # use serde_json::Value;
//! #
Expand All @@ -81,7 +81,7 @@
//! fn rpc(&self,
//! ctl: &ServerCtl,
//! method: &str,
//! _params: &Option<Value>)
//! _params: &Option<Params>)
//! -> Option<Self::RpcCallResult> {
//! match method {
//! // Accept a hello message and finish the greeting
Expand Down Expand Up @@ -127,19 +127,20 @@ extern crate slog;

pub mod codec;
pub mod endpoint;
#[macro_use]
pub mod message;
pub mod server;

/// This contains some reexports so macros can find them.
///
/// It isn't for the direct use of the library consumer.
pub mod macro_exports {
pub use serde_json::{Value, from_value};
pub use serde_json::{Map, Value, from_value, to_value};
pub use std::option::Option;
pub use std::result::Result;
}

pub use codec::{Boundary as BoundaryCodec, Line as LineCodec};
pub use endpoint::{Client, Endpoint, ServerCtl};
pub use message::{Message, Parsed, RpcError};
pub use message::{Message, Params, Parsed, RpcError};
pub use server::Server;
Loading