Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2796,6 +2796,7 @@ async fn finalizes_after_resuming_from_db() {
);
}

#[allow(clippy::large_stack_frames)]
#[tokio::test]
async fn revert_minority_fork_on_resume() {
let validator_count = 16;
Expand Down
1 change: 1 addition & 0 deletions common/eth2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ slashing_protection = { workspace = true }
mediatype = "0.19.13"
pretty_reqwest_error = { workspace = true }
derivative = { workspace = true }
reqwest-eventsource = "0.5.0"

[dev-dependencies]
tokio = { workspace = true }
Expand Down
43 changes: 33 additions & 10 deletions common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use reqwest::{
Body, IntoUrl, RequestBuilder, Response,
};
pub use reqwest::{StatusCode, Url};
use reqwest_eventsource::{Event, EventSource};
pub use sensitive_url::{SensitiveError, SensitiveUrl};
use serde::{de::DeserializeOwned, Serialize};
use ssz::Encode;
Expand All @@ -52,6 +53,8 @@ pub const SSZ_CONTENT_TYPE_HEADER: &str = "application/octet-stream";
pub enum Error {
/// The `reqwest` client raised an error.
HttpClient(PrettyReqwestError),
/// The `reqwest_eventsource` client raised an error.
SseClient(reqwest_eventsource::Error),
/// The server returned an error message where the body was able to be parsed.
ServerMessage(ErrorMessage),
/// The server returned an error message with an array of errors.
Expand Down Expand Up @@ -93,6 +96,13 @@ impl Error {
pub fn status(&self) -> Option<StatusCode> {
match self {
Error::HttpClient(error) => error.inner().status(),
Error::SseClient(error) => {
if let reqwest_eventsource::Error::InvalidStatusCode(status, _) = error {
Some(*status)
} else {
None
}
}
Error::ServerMessage(msg) => StatusCode::try_from(msg.code).ok(),
Error::ServerIndexedMessage(msg) => StatusCode::try_from(msg.code).ok(),
Error::StatusCode(status) => Some(*status),
Expand Down Expand Up @@ -2592,16 +2602,29 @@ impl BeaconNodeHttpClient {
.join(",");
path.query_pairs_mut().append_pair("topics", &topic_string);

Ok(self
.client
.get(path)
.send()
.await?
.bytes_stream()
.map(|next| match next {
Ok(bytes) => EventKind::from_sse_bytes(bytes.as_ref()),
Err(e) => Err(Error::HttpClient(e.into())),
}))
let mut es = EventSource::get(path);
// If we don't await `Event::Open` here, then the consumer
// will not get any Message events until they start awaiting the stream.
// This is a way to register the stream with the sse server before
// message events start getting emitted.
while let Some(event) = es.next().await {
match event {
Ok(Event::Open) => break,
Err(err) => return Err(Error::SseClient(err)),
// This should never happen as we are guaranteed to get the
// Open event before any message starts coming through.
Ok(Event::Message(_)) => continue,
}
}
Ok(Box::pin(es.filter_map(|event| async move {
match event {
Ok(Event::Open) => None,
Ok(Event::Message(message)) => {
Some(EventKind::from_sse_bytes(&message.event, &message.data))
}
Err(err) => Some(Err(Error::SseClient(err))),
}
})))
}

/// `POST validator/duties/sync/{epoch}`
Expand Down
21 changes: 2 additions & 19 deletions common/eth2/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde_json::Value;
use ssz::{Decode, DecodeError};
use ssz_derive::{Decode, Encode};
use std::fmt::{self, Display};
use std::str::{from_utf8, FromStr};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use types::beacon_block_body::KzgCommitments;
Expand Down Expand Up @@ -1153,24 +1153,7 @@ impl<E: EthSpec> EventKind<E> {
}
}

pub fn from_sse_bytes(message: &[u8]) -> Result<Self, ServerError> {
let s = from_utf8(message)
.map_err(|e| ServerError::InvalidServerSentEvent(format!("{:?}", e)))?;

let mut split = s.split('\n');
let event = split
.next()
.ok_or_else(|| {
ServerError::InvalidServerSentEvent("Could not parse event tag".to_string())
})?
.trim_start_matches("event:");
let data = split
.next()
.ok_or_else(|| {
ServerError::InvalidServerSentEvent("Could not parse data tag".to_string())
})?
.trim_start_matches("data:");

pub fn from_sse_bytes(event: &str, data: &str) -> Result<Self, ServerError> {
match event {
"attestation" => Ok(EventKind::Attestation(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Attestation: {:?}", e)),
Expand Down