Skip to content

Commit

Permalink
fix: continue reading ipc on data error (alloy-rs#605)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored and ben186 committed Jul 27, 2024
1 parent af801c9 commit 248b42a
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ http-body-util = "0.1"
tokio = "1"
tokio-util = "0.7"
tokio-stream = "0.1"
tokio-test = "0.4"
tower = { version = "0.4", features = ["util"] }

# tracing
Expand Down
3 changes: 3 additions & 0 deletions crates/transport-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ interprocess = { version = "1.2.1", features = ["tokio", "tokio_support"] }
serde = { workspace = true, optional = true }
tempfile = { workspace = true, optional = true }

[dev-dependencies]
tokio-test.workspace = true

[features]
default = []
mock = ["dep:serde", "dep:tempfile"]
71 changes: 69 additions & 2 deletions crates/transport-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ impl IpcBackend {
}
}

/// Default capacity for the IPC buffer.
const CAPACITY: usize = 4096;

/// A stream of JSON-RPC items, read from an [`AsyncRead`] stream.
#[derive(Debug)]
#[pin_project::pin_project]
Expand All @@ -127,7 +130,7 @@ where
T: AsyncRead,
{
fn new(reader: T) -> Self {
Self { reader: reader.compat(), buf: BytesMut::with_capacity(4096), drained: true }
Self { reader: reader.compat(), buf: BytesMut::with_capacity(CAPACITY), drained: true }
}
}

Expand Down Expand Up @@ -170,7 +173,21 @@ where
return Ready(Some(response));
}
Some(Err(err)) => {
if err.is_eof() {
if err.is_data() {
trace!(
buffer = %String::from_utf8_lossy(this.buf.as_ref()),
"IPC buffer contains invalid JSON data",
);

if this.buf.len() > CAPACITY {
// buffer is full, we can't decode any more
error!("IPC response too large to decode");
return Ready(None);
}

// this happens if the deserializer is unable to decode a partial object
*this.drained = true;
} else if err.is_eof() {
trace!("partial object in IPC buffer");
// nothing decoded
*this.drained = true;
Expand Down Expand Up @@ -213,3 +230,53 @@ where
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::future::poll_fn;
use tokio_util::compat::TokioAsyncReadCompatExt;

#[tokio::test]
async fn test_partial_stream() {
let mock = tokio_test::io::Builder::new()
// partial object
.read(b"{\"jsonrpc\":\"2.0\",\"method\":\"eth_subscription\"")
// trigger pending read
.wait(std::time::Duration::from_millis(1))
// complete object
.read(r#", "params": {"subscription": "0xcd0c3e8af590364c09d0fa6a1210faf5", "result": {"difficulty": "0xd9263f42a87", "uncles": []}} }"#.as_bytes())
.build();

let mut reader = ReadJsonStream::new(mock.compat());
poll_fn(|cx| {
let res = reader.poll_next_unpin(cx);
assert!(res.is_pending());
Ready(())
})
.await;
let _obj = reader.next().await.unwrap();
}

#[tokio::test]
async fn test_large_invalid() {
let mock = tokio_test::io::Builder::new()
// partial object
.read(b"{\"jsonrpc\":\"2.0\",\"method\":\"eth_subscription\"")
// trigger pending read
.wait(std::time::Duration::from_millis(1))
// complete object
.read(vec![b"a"[0]; CAPACITY].as_ref())
.build();

let mut reader = ReadJsonStream::new(mock.compat());
poll_fn(|cx| {
let res = reader.poll_next_unpin(cx);
assert!(res.is_pending());
Ready(())
})
.await;
let obj = reader.next().await;
assert!(obj.is_none());
}
}

0 comments on commit 248b42a

Please sign in to comment.