Skip to content

Commit

Permalink
Merge branch 'main' into t/h-3426-implement-folding-for-data-type-con…
Browse files Browse the repository at this point in the history
…straints
  • Loading branch information
TimDiekmann authored Oct 17, 2024
2 parents d49eb1e + 958822e commit a65f484
Show file tree
Hide file tree
Showing 79 changed files with 1,652 additions and 449 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- uses: ./.github/actions/warm-up-repo

- name: Create Release Pull Request or Publish to npm
uses: changesets/action@3de3850952bec538fde60aac71731376e57b9b57 # v1.4.8
uses: changesets/action@c8bada60c408975afd1a20b3db81d6eee6789308 # v1.4.9
with:
publish: yarn changeset publish
env:
Expand Down
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ proptest = { version = "=1.5.0", default-features = false, features = ["alloc"]
rand = { version = "=0.8.5", default-features = false }
refinery = { version = "=0.8.14", default-features = false }
rustc_version = { version = "=0.4.1", default-features = false }
scc = { version = "=2.2.1", default-features = false }
scc = { version = "=2.2.2", default-features = false }
sentry = { version = "=0.34.0", default-features = false, features = ["backtrace", "contexts", "debug-images", "panic", "reqwest", "rustls", "tracing", "tower-http"] }
seq-macro = { version = "=0.3.5", default-features = false }
serde_plain = { version = "=1.0.2", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions libs/@local/harpc/codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ futures-util = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true, public = true }
pin-project-lite = { workspace = true, optional = true }
memchr = { workspace = true, optional = true }
thiserror = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
Expand Down
57 changes: 36 additions & 21 deletions libs/@local/harpc/codec/src/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,26 @@ use core::{
};

use bytes::{Buf, BufMut, Bytes, BytesMut};
use error_stack::Report;
use error_stack::{Report, ResultExt};
use futures_core::Stream;
use futures_util::stream::StreamExt;
use harpc_types::error_code::ErrorCode;
use serde::{de::DeserializeOwned, ser::Error as _};
use serde::de::DeserializeOwned;

use crate::{
decode::{Decoder, ErrorDecoder},
encode::{Encoder, ErrorEncoder},
error::{EncodedError, ErrorBuffer, NetworkError},
};

#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, thiserror::Error)]
pub enum JsonError {
#[error("unable to encode JSON value")]
Encode,
#[error("unable to decode JSON value")]
Decode,
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct JsonCodec;

Expand All @@ -27,7 +35,7 @@ impl JsonCodec {

impl Encoder for JsonCodec {
type Buf = Bytes;
type Error = serde_json::Error;
type Error = Report<JsonError>;

fn encode<T>(
self,
Expand All @@ -40,17 +48,19 @@ impl Encoder for JsonCodec {
let buf = BytesMut::new();
let mut writer = buf.writer();

serde_json::to_writer(&mut writer, &item).map(|()| {
let mut buf = writer.into_inner();
buf.put_u8(Self::SEPARATOR);
buf.freeze()
})
serde_json::to_writer(&mut writer, &item)
.map(|()| {
let mut buf = writer.into_inner();
buf.put_u8(Self::SEPARATOR);
buf.freeze()
})
.change_context(JsonError::Encode)
})
}
}

impl Decoder for JsonCodec {
type Error = serde_json::Error;
type Error = Report<JsonError>;

fn decode<T, B, E>(
self,
Expand Down Expand Up @@ -104,13 +114,13 @@ where
B: Buf,
T: DeserializeOwned,
{
type Item = Result<T, serde_json::Error>;
type Item = Result<T, Report<JsonError>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// check if we have a full message in the buffer, in theory multiple messages could be in
// the buffer at the same time, this guards against that.
if let Some(value) = Self::poll_item(self.as_mut(), 0) {
return Poll::Ready(Some(value));
return Poll::Ready(Some(value.change_context(JsonError::Decode)));
}

loop {
Expand All @@ -123,7 +133,7 @@ where
// the underlying stream has already returned `None`, we now only flush the
// remaining buffer.
if let Some(value) = Self::poll_item(self.as_mut(), 0) {
return Poll::Ready(Some(value));
return Poll::Ready(Some(value.change_context(JsonError::Decode)));
}

return Poll::Ready(None);
Expand All @@ -135,7 +145,7 @@ where

// potentially we still have items in the buffer, try to decode them.
if let Some(value) = Self::poll_item(self.as_mut(), 0) {
return Poll::Ready(Some(value));
return Poll::Ready(Some(value.change_context(JsonError::Decode)));
}

return Poll::Ready(None);
Expand All @@ -148,16 +158,21 @@ where
offset
}
// TODO: we lose quite a bit of information here, any way to retrieve it?
// The problem is that we don't know if the underlying error is a report, **or** if
// it is a plain error.
// in **theory** we could do: `impl Into<Report<C>> + Debug + Display`, but then we
// don't know what `C` should be.
Err(_error) => {
return Poll::Ready(Some(Err(serde_json::Error::custom(
"underlying stream returned an error",
))));
// return Poll::Ready(Some(Err(serde_json::Error::custom(
// "underlying stream returned an error",
// ))));
return Poll::Ready(Some(Err(Report::new(JsonError::Decode))));
}
};

// look if we found a separator between the offset and the end of the buffer
if let Some(value) = Self::poll_item(self.as_mut(), offset) {
return Poll::Ready(Some(value));
return Poll::Ready(Some(value.change_context(JsonError::Decode)));
}

// if not we continue to the next iteration
Expand All @@ -166,7 +181,7 @@ where
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct JsonError<T> {
struct JsonErrorRepr<T> {
message: String,
details: T,
}
Expand All @@ -181,7 +196,7 @@ impl ErrorEncoder for JsonCodec {
let buffer = ErrorBuffer::error();
let mut writer = buffer.writer();

if let Err(error) = serde_json::to_writer(&mut writer, &JsonError {
if let Err(error) = serde_json::to_writer(&mut writer, &JsonErrorRepr {
message: error.to_string(),
details: error,
}) {
Expand Down Expand Up @@ -235,7 +250,7 @@ impl ErrorDecoder for JsonCodec {
let bytes: BytesMut = bytes.collect().await;
let bytes = bytes.freeze();

serde_json::from_slice::<JsonError<E>>(&bytes).map(|error| error.details)
serde_json::from_slice::<JsonErrorRepr<E>>(&bytes).map(|error| error.details)
}

async fn decode_report<C>(
Expand Down Expand Up @@ -446,7 +461,7 @@ mod tests {
))));
let mut decoder = JsonCodec.decode::<serde_json::Value, _, _>(input);

decoder
let _report = decoder
.next()
.await
.expect("should have a value")
Expand Down
2 changes: 1 addition & 1 deletion libs/@local/harpc/net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ scc = { workspace = true }
serde = { workspace = true, features = ["derive"] }
tachyonix = { workspace = true }
thiserror = { workspace = true }
tokio-stream = { workspace = true, features = ["time"] }
tokio-stream = { workspace = true, features = ["time", "sync"] }
tokio-util = { workspace = true, features = ["codec", "compat", "rt", "tracing"] }
tracing = { workspace = true }

Expand Down
76 changes: 72 additions & 4 deletions libs/@local/harpc/net/src/session/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use core::{
task::{Context, Poll, ready},
};

use error_stack::{Result, ResultExt};
use error_stack::{Report, ResultExt};
use futures::{Stream, stream::FusedStream};
use harpc_codec::encode::ErrorEncoder;
use libp2p::Multiaddr;
Expand All @@ -33,6 +33,22 @@ pub enum SessionEvent {
SessionDropped { id: SessionId },
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, thiserror::Error)]
pub enum SessionEventError {
/// The receiving stream lagged too far behind. Attempting to receive again will
/// return the oldest message still retained by the underlying broadcast channel.
///
/// Includes the number of skipped messages.
#[error("The receiving stream lagged to far behind, and {amount} messages were dropped.")]
Lagged { amount: u64 },
}

impl From<!> for SessionEventError {
fn from(never: !) -> Self {
never
}
}

pub struct ListenStream {
inner: mpsc::Receiver<Transaction>,

Expand Down Expand Up @@ -77,6 +93,53 @@ impl FusedStream for ListenStream {
}
}

pin_project_lite::pin_project! {
// Wrapper around a broadcast, allowing for a more controlled API, and our own error, making the underlying broadcast
// channel an implementation detail.
pub struct EventStream {
#[pin]
inner: tokio_stream::wrappers::BroadcastStream<SessionEvent>,

is_finished: bool,
}
}

impl Stream for EventStream {
type Item = Result<SessionEvent, SessionEventError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();

if *this.is_finished {
return Poll::Ready(None);
}

// we're purposefully not implementing `From<BroadcastStreamRecvError>` as that would
// require us to mark `tokio_stream` as a public dependency, something we want to avoid with
// this specifically.
match ready!(this.inner.poll_next(cx)) {
Some(Ok(event)) => Poll::Ready(Some(Ok(event))),
Some(Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(amount))) => {
Poll::Ready(Some(Err(SessionEventError::Lagged { amount })))
}
None => {
*this.is_finished = true;

Poll::Ready(None)
}
}
}
}

// there is no chance this stream will ever be picked-up again, because receivers are only created
// from this one sender, and only expose a stream API, and will be alive as long as the task is
// alive, once all senders are dropped, it indicates that the task has completely shutdown.
impl FusedStream for EventStream {
fn is_terminated(&self) -> bool {
self.is_finished
}
}

/// Session Layer
///
/// The session layer is responsible for accepting incoming connections, and splitting them up into
Expand Down Expand Up @@ -119,8 +182,13 @@ where
}

#[must_use]
pub fn events(&self) -> broadcast::Receiver<SessionEvent> {
self.events.subscribe()
pub fn events(&self) -> EventStream {
let receiver = self.events.subscribe();

EventStream {
inner: receiver.into(),
is_finished: false,
}
}

#[must_use]
Expand All @@ -133,7 +201,7 @@ where
/// # Errors
///
/// Returns an error if the transport layer fails to listen on the given address.
pub async fn listen(self, address: Multiaddr) -> Result<ListenStream, SessionError> {
pub async fn listen(self, address: Multiaddr) -> Result<ListenStream, Report<SessionError>> {
self.transport
.listen_on(address)
.await
Expand Down
5 changes: 3 additions & 2 deletions libs/@local/harpc/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ harpc-service = { workspace = true, public = true }

# Public third-party dependencies
frunk_core = { version = "0.4.3", public = true }
tower = { workspace = true, public = true }
tower = { workspace = true, public = true, features = ["make"] }

# Private workspace dependencies
harpc-net = { workspace = true }
Expand All @@ -31,8 +31,9 @@ tokio = { workspace = true, features = ["macros"] }
tokio-util = { workspace = true, features = ["rt"] }
tracing = { workspace = true }
harpc-codec = { workspace = true }
derive_more = { version = "1.0.0", features = ["display"] }
derive_more = { version = "1.0.0", features = ["display", "error"] }
serde = { workspace = true, features = ["derive"] }
bytes.workspace = true

[lints]
workspace = true
Expand Down
Loading

0 comments on commit a65f484

Please sign in to comment.