Skip to content

Strict parsing for core #448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core-client/transports/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,10 @@ mod tests {
});
tokio::run(fut);
assert_eq!(called.load(Ordering::SeqCst), true);
assert!(!received.lock().unwrap().is_empty(), "Expected at least one received item.");
assert!(
!received.lock().unwrap().is_empty(),
"Expected at least one received item."
);
}

}
62 changes: 39 additions & 23 deletions core-client/transports/src/transports/duplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ struct Subscription {
}

impl Subscription {
fn new(
channel: mpsc::Sender<Result<Value, RpcError>>,
notification: String,
unsubscribe: String,
) -> Self {
fn new(channel: mpsc::Sender<Result<Value, RpcError>>, notification: String, unsubscribe: String) -> Self {
Subscription {
id: None,
notification,
Expand Down Expand Up @@ -120,7 +116,11 @@ where
RpcMessage::Call(msg) => {
let (id, request_str) = self.request_builder.call_request(&msg);

if self.pending_requests.insert(id.clone(), PendingRequest::Call(msg.sender)).is_some() {
if self
.pending_requests
.insert(id.clone(), PendingRequest::Call(msg.sender))
.is_some()
{
log::error!("reuse of request id {:?}", id);
}
request_str
Expand All @@ -132,13 +132,14 @@ where
notification,
unsubscribe,
} = msg.subscription;
let (id, request_str) = self.request_builder.subscribe_request(
subscribe,
subscribe_params,
);
let (id, request_str) = self.request_builder.subscribe_request(subscribe, subscribe_params);
log::debug!("subscribing to {}", notification);
let subscription = Subscription::new(msg.sender, notification, unsubscribe);
if self.pending_requests.insert(id.clone(), PendingRequest::Subscription(subscription)).is_some() {
if self
.pending_requests
.insert(id.clone(), PendingRequest::Subscription(subscription))
.is_some()
{
log::error!("reuse of request id {:?}", id);
}
request_str
Expand Down Expand Up @@ -166,7 +167,13 @@ where
};
log::debug!("incoming: {}", response_str);
for (id, result, method, sid) in super::parse_response(&response_str)? {
log::debug!("id: {:?} (sid: {:?}) result: {:?} method: {:?}", id, sid, result, method);
log::debug!(
"id: {:?} (sid: {:?}) result: {:?} method: {:?}",
id,
sid,
result,
method
);
self.incoming.push_back((id, result, method, sid));
}
}
Expand All @@ -184,7 +191,7 @@ where
tx.send(result)
.map_err(|_| RpcError::Other(format_err!("oneshot channel closed")))?;
continue;
},
}
// It was a subscription request,
// turn it into a proper subscription.
Some(PendingRequest::Subscription(mut subscription)) => {
Expand All @@ -193,10 +200,14 @@ where

if let Some(sid) = sid {
subscription.id = Some(sid.clone());
if self.subscriptions.insert((sid.clone(), method.clone()), subscription).is_some() {
if self
.subscriptions
.insert((sid.clone(), method.clone()), subscription)
.is_some()
{
log::warn!(
"Overwriting existing subscription under {:?} ({:?}). \
Seems that server returned the same subscription id.",
Seems that server returned the same subscription id.",
sid,
method,
);
Expand All @@ -210,12 +221,12 @@ where
);
}
continue;
},
}
// It's not a pending request nor a notification
None if sid_and_method.is_none() => {
log::warn!("Got unexpected response with id {:?} ({:?})", id, sid_and_method);
continue;
},
}
// just fall-through in case it's a notification
None => {}
};
Expand All @@ -229,20 +240,25 @@ where
if let Some(subscription) = self.subscriptions.get_mut(&sid_and_method) {
match subscription.channel.poll_ready() {
Ok(Async::Ready(())) => {
subscription.channel.try_send(result).expect("The channel is ready; qed");
subscription
.channel
.try_send(result)
.expect("The channel is ready; qed");
}
Ok(Async::NotReady) => {
let (sid, method) = sid_and_method;
self.incoming.push_front((id, result, Some(method), Some(sid)));
break;
}
Err(_) => {
let subscription = self.subscriptions
let subscription = self
.subscriptions
.remove(&sid_and_method)
.expect("Subscription was just polled; qed");
let sid = subscription.id
.expect("Every subscription that ends up in `self.subscriptions` has id already \
assigned; assignment happens during response to subscribe request.");
let sid = subscription.id.expect(
"Every subscription that ends up in `self.subscriptions` has id already \
assigned; assignment happens during response to subscribe request.",
);
let (_id, request_str) =
self.request_builder.unsubscribe_request(subscription.unsubscribe, sid);
log::debug!("outgoing: {}", request_str);
Expand All @@ -253,7 +269,7 @@ where
} else {
log::warn!("Received unexpected subscription notification: {:?}", sid_and_method);
}
},
}
None => break,
}
}
Expand Down
2 changes: 1 addition & 1 deletion core-client/transports/src/transports/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ where
/// Creates a new `LocalRpc` with default metadata.
pub fn new(handler: THandler) -> Self
where
TMetadata: Default
TMetadata: Default,
{
Self::with_metadata(handler, Default::default())
}
Expand Down
4 changes: 3 additions & 1 deletion core-client/transports/src/transports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ impl RequestBuilder {
}

/// Parse raw string into JSON values, together with the request Id
pub fn parse_response(response: &str) -> Result<Vec<(Id, Result<Value, RpcError>, Option<String>, Option<SubscriptionId>)>, RpcError> {
pub fn parse_response(
response: &str,
) -> Result<Vec<(Id, Result<Value, RpcError>, Option<String>, Option<SubscriptionId>)>, RpcError> {
serde_json::from_str::<Response>(&response)
.map_err(|e| RpcError::ParseError(e.to_string(), e.into()))
.map(|response| {
Expand Down
2 changes: 1 addition & 1 deletion core/examples/params.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use jsonrpc_core::*;
use serde::Deserialize;
use serde_derive::Deserialize;

#[derive(Deserialize)]
struct HelloParams {
Expand Down
1 change: 1 addition & 0 deletions core/src/types/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl Serialize for ErrorCode {

/// Error object as defined in Spec
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Error {
/// Code
pub code: ErrorCode,
Expand Down
1 change: 1 addition & 0 deletions core/src/types/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

/// Request Id
#[derive(Debug, PartialEq, Clone, Hash, Eq, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(untagged)]
pub enum Id {
/// No id (notification)
Expand Down
1 change: 1 addition & 0 deletions core/src/types/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::{Error, Value};

/// Request parameters
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(untagged)]
pub enum Params {
/// No parameters
Expand Down
4 changes: 3 additions & 1 deletion core/src/types/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl From<Notification> for Call {

/// Represents jsonrpc request.
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(untagged)]
pub enum Request {
/// Single request (call)
Expand Down Expand Up @@ -187,7 +188,7 @@ mod tests {

let s = r#"{"jsonrpc": "2.0", "method": "update", "params": [1,2], "id": 1}"#;
let deserialized: Result<Notification, _> = serde_json::from_str(s);
assert!(deserialized.is_err())
assert!(deserialized.is_err());
}

#[test]
Expand Down Expand Up @@ -285,6 +286,7 @@ mod tests {

let s = r#"{"id":120,"method":"my_method","params":["foo", "bar"],"extra_field":[]}"#;
let deserialized: Request = serde_json::from_str(s).unwrap();

match deserialized {
Request::Single(Call::Invalid { id: Id::Num(120) }) => {}
_ => panic!("Request wrongly deserialized: {:?}", deserialized),
Expand Down
27 changes: 27 additions & 0 deletions core/src/types/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::Result as CoreResult;

/// Successful response
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Success {
/// Protocol version
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -16,6 +17,7 @@ pub struct Success {

/// Unsuccessful response
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Failure {
/// Protocol Version
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -28,6 +30,7 @@ pub struct Failure {

/// Represents output - failure or success
#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(untagged)]
pub enum Output {
/// Notification
Expand Down Expand Up @@ -114,6 +117,7 @@ impl From<Output> for CoreResult<Value> {

/// Synchronous response
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
#[serde(untagged)]
pub enum Response {
/// Single response
Expand Down Expand Up @@ -290,3 +294,26 @@ fn notification_deserialize() {
}))
);
}

#[test]
fn handle_incorrect_responses() {
use serde_json;

let dsr = r#"
{
"id": 2,
"jsonrpc": "2.0",
"result": "0x62d3776be72cc7fa62cad6fe8ed873d9bc7ca2ee576e400d987419a3f21079d5",
"error": {
"message": "VM Exception while processing transaction: revert",
"code": -32000,
"data": {}
}
}"#;

let deserialized: Result<Response, _> = serde_json::from_str(dsr);
assert!(
deserialized.is_err(),
"Expected error when deserializing invalid payload."
);
}
8 changes: 4 additions & 4 deletions pubsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ impl SubscriptionId {
core::Params::Map(map) => match map.get("subscription") {
Some(value) => Self::parse_value(value),
None => None,
}
_ => None
}
_ => None
},
_ => None,
},
_ => None,
}
}
}
Expand Down