From b7d13d7f86ba706269851e30f9f09e1665fc17c1 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 19 Oct 2022 11:42:13 +0200 Subject: [PATCH] Implement WebRTC messages framing (#2896) cc https://github.com/libp2p/specs/pull/412 cc https://github.com/paritytech/smoldot/issues/1712 This PR finishes implementing the WebRTC spec by adding the last remaining item: the messages framing. Implementing this messages framing while minimizing the amount of data copies is rather challenging. Instead of going for the complicated solution, I went for the more easy solution of having an intermediate read buffer where data is first copied. Going for the simple solution decreases the chances of bugs and increases the ease of debugging, so it's preferable at the moment. In the future, once WebRTC is fully working, we can rewrite this messages framing code in a more optimized way. Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- bin/light-base/src/json_rpc_service.rs | 2 +- bin/light-base/src/network_service.rs | 2 +- bin/light-base/src/network_service/tasks.rs | 91 ++-- bin/light-base/src/sync_service.rs | 2 +- bin/wasm-node/CHANGELOG.md | 4 + bin/wasm-node/javascript/src/index-browser.ts | 14 +- src/libp2p/collection/multi_stream.rs | 155 +++++- .../connection/established/multi_stream.rs | 457 ++++++++++++++---- src/network/protocol/storage_call_proof.rs | 2 +- src/network/service.rs | 2 +- src/util/protobuf.rs | 13 +- 11 files changed, 585 insertions(+), 159 deletions(-) diff --git a/bin/light-base/src/json_rpc_service.rs b/bin/light-base/src/json_rpc_service.rs index 37abeaa2e6..ece6b1ef6a 100644 --- a/bin/light-base/src/json_rpc_service.rs +++ b/bin/light-base/src/json_rpc_service.rs @@ -1289,7 +1289,7 @@ impl Background { async fn storage_query( &self, - keys: impl Iterator> + Clone, + keys: impl Iterator + Clone> + Clone, hash: &[u8; 32], total_attempts: u32, timeout_per_request: Duration, diff --git a/bin/light-base/src/network_service.rs b/bin/light-base/src/network_service.rs index 237e0f1790..406f8533af 100644 --- a/bin/light-base/src/network_service.rs +++ b/bin/light-base/src/network_service.rs @@ -585,7 +585,7 @@ impl NetworkService { self: Arc, chain_index: usize, target: PeerId, // TODO: takes by value because of futures longevity issue - config: protocol::StorageProofRequestConfig>>, + config: protocol::StorageProofRequestConfig + Clone>>, timeout: Duration, ) -> Result { let rx = { diff --git a/bin/light-base/src/network_service/tasks.rs b/bin/light-base/src/network_service/tasks.rs index 2e5c1a4509..2f5f96a89a 100644 --- a/bin/light-base/src/network_service/tasks.rs +++ b/bin/light-base/src/network_service/tasks.rs @@ -382,12 +382,7 @@ async fn multi_stream_connection_task( // from this slice the data to send. Consequently, the write buffer is held locally. This is // suboptimal compared to writing to a write buffer provided by the platform, but it is easier // to implement it this way. - let mut write_buffer = vec![0; 4096]; - - // When reading/writing substreams, the substream can ask to be woken up after a certain time. - // This variable stores the earliest time when we should be waking up. - // TODO: this is wrong; this code assumes that substreams will be found in `ready_substreams` while it is not the case now; however it seems more appropriate to modify `ready_substreams` rather than accomodate this limitation here - let mut wake_up_after = None; + let mut write_buffer = vec![0; 16384]; // TODO: the write buffer must not exceed 16kiB due to the libp2p WebRTC spec; this should ideally be enforced through the connection task API loop { // Start opening new outbound substreams, if needed. @@ -424,56 +419,52 @@ async fn multi_stream_connection_task( let now = TPlat::now(); - // Clear `wake_up_after` if necessary, otherwise it will always stay at a constant value. - // TODO: nit: can use `Option::is_some_and` after it's stable; https://github.com/rust-lang/rust/issues/93050 - if wake_up_after - .as_ref() - .map(|time| *time <= now) - .unwrap_or(false) - { - wake_up_after = None; - } + // When reading/writing substreams, the substream can ask to be woken up after a certain + // time. This variable stores the earliest time when we should be waking up. + let mut wake_up_after = None; // Perform a read-write on all substreams. // TODO: trying to read/write every single substream every single time is suboptimal, but making this not suboptimal is very complicated for substream_id in open_substreams.iter().map(|(id, _)| id).collect::>() { - let substream = &mut open_substreams[substream_id]; - - let mut read_write = ReadWrite { - now: now.clone(), - incoming_buffer: TPlat::read_buffer(substream), - outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None - read_bytes: 0, - written_bytes: 0, - wake_up_after: None, - }; - - let kill_substream = - connection_task.substream_read_write(&substream_id, &mut read_write); - - // Because the `read_write` object borrows the stream, we need to drop it before we - // can modify the connection. Before dropping the `read_write`, clone some important - // information from it. - let read_bytes = read_write.read_bytes; - let written_bytes = read_write.written_bytes; - match (&mut wake_up_after, &read_write.wake_up_after) { - (_, None) => {} - (val @ None, Some(t)) => *val = Some(t.clone()), - (Some(curr), Some(upd)) if *upd < *curr => *curr = upd.clone(), - (Some(_), Some(_)) => {} - } - drop(read_write); + loop { + let substream = &mut open_substreams[substream_id]; + + let mut read_write = ReadWrite { + now: now.clone(), + incoming_buffer: TPlat::read_buffer(substream), + outgoing_buffer: Some((&mut write_buffer, &mut [])), // TODO: this should be None if a previous read_write() produced None + read_bytes: 0, + written_bytes: 0, + wake_up_after, + }; + + let kill_substream = + connection_task.substream_read_write(&substream_id, &mut read_write); + + // Because the `read_write` object borrows the stream, we need to drop it before we + // can modify the connection. Before dropping the `read_write`, clone some important + // information from it. + let read_bytes = read_write.read_bytes; + let written_bytes = read_write.written_bytes; + wake_up_after = read_write.wake_up_after.take(); + drop(read_write); + + // Now update the connection. + if written_bytes != 0 { + TPlat::send(substream, &write_buffer[..written_bytes]); + } + TPlat::advance_read_cursor(substream, read_bytes); - // Now update the connection. - if written_bytes != 0 { - TPlat::send(substream, &write_buffer[..written_bytes]); - } - TPlat::advance_read_cursor(substream, read_bytes); + // If the `connection_task` requires this substream to be killed, we drop the `Stream` + // object. + if kill_substream { + open_substreams.remove(substream_id); + break; + } - // If the `connection_task` requires this substream to be killed, we drop the `Stream` - // object. - if kill_substream { - open_substreams.remove(substream_id); + if read_bytes == 0 && written_bytes == 0 { + break; + } } } diff --git a/bin/light-base/src/sync_service.rs b/bin/light-base/src/sync_service.rs index fdcb2ac96c..17d430fb85 100644 --- a/bin/light-base/src/sync_service.rs +++ b/bin/light-base/src/sync_service.rs @@ -398,7 +398,7 @@ impl SyncService { block_number: u64, block_hash: &[u8; 32], storage_trie_root: &[u8; 32], - requested_keys: impl Iterator> + Clone, + requested_keys: impl Iterator + Clone> + Clone, total_attempts: u32, timeout_per_request: Duration, _max_parallel: NonZeroU32, diff --git a/bin/wasm-node/CHANGELOG.md b/bin/wasm-node/CHANGELOG.md index ccc3ca3322..a1ae9cd09c 100644 --- a/bin/wasm-node/CHANGELOG.md +++ b/bin/wasm-node/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +### Changed + +- The WebRTC protocol implementation is now up to date with the specification. While the specification hasn't been finalized yet and could still evolve, the current version is believed to be likely to be final. ([#2896](https://github.com/paritytech/smoldot/pull/2896)) + ### Fixed - Fix timeout not being checked when opening a notifications substream. ([#2323](https://github.com/paritytech/smoldot/pull/2323)) diff --git a/bin/wasm-node/javascript/src/index-browser.ts b/bin/wasm-node/javascript/src/index-browser.ts index 6b83a0dab7..461d99d393 100644 --- a/bin/wasm-node/javascript/src/index-browser.ts +++ b/bin/wasm-node/javascript/src/index-browser.ts @@ -256,7 +256,7 @@ export function start(options?: ClientOptions): Client { "v=0" + "\n" + // Identifies the creator of the SDP document. We are allowed to use dummy values // (`-` and `0.0.0.0`) to remain anonymous, which we do. Note that "IN" means - // "Internet". (RFC8866) + // "Internet" (and not "input"). (RFC8866) "o=- 0 0 IN IP" + ipVersion + " " + targetIp + "\n" + // Name for the session. We are allowed to pass a dummy `-`. (RFC8866) "s=-" + "\n" + @@ -264,7 +264,7 @@ export function start(options?: ClientOptions): Client { // expires. (RFC8866) "t=0 0" + "\n" + // A lite implementation is only appropriate for devices that will - // *always* be connected to the public Internet and have a public + // always be connected to the public Internet and have a public // IP address at which it can receive packets from any // correspondent. ICE will not function when a lite implementation // is placed behind a NAT (RFC8445). @@ -273,12 +273,12 @@ export function start(options?: ClientOptions): Client { // The protocol in this line (i.e. `TCP/DTLS/SCTP` or `UDP/DTLS/SCTP`) must always be // the same as the one in the offer. We know that this is true because we tweak the // offer to match the protocol. - // The `` component must always be `pc-datachannel` for WebRTC. + // The `` component must always be `webrtc-datachannel` for WebRTC. // The rest of the SDP payload adds attributes to this specific media stream. // RFCs: 8839, 8866, 8841 "m=application " + targetPort + " " + "UDP/DTLS/SCTP webrtc-datachannel" + "\n" + // Indicates the IP address of the remote. - // Note that "IN" means "Internet". + // Note that "IN" means "Internet" (and not "input"). "c=IN IP" + ipVersion + " " + targetIp + "\n" + // Media ID - uniquely identifies this media stream (RFC9143). "a=mid:0" + "\n" + @@ -287,6 +287,7 @@ export function start(options?: ClientOptions): Client { // ICE username and password, which are used for establishing and // maintaining the ICE connection. (RFC8839) // MUST match ones used by the answerer (server). + // These values are set according to the libp2p WebRTC specification. "a=ice-ufrag:" + remoteCertMultibase + "\n" + "a=ice-pwd:" + remoteCertMultibase + "\n" + // Fingerprint of the certificate that the server will use during the TLS @@ -303,8 +304,9 @@ export function start(options?: ClientOptions): Client { // (UDP or TCP) "a=sctp-port:5000" + "\n" + // The maximum SCTP user message size (in bytes) (RFC8841) - "a=max-message-size:100000" + "\n" + - // A transport address for a candidate that can be used for connectivity checks (RFC8839). + "a=max-message-size:16384" + "\n" + // TODO: should this be part of the spec? + // A transport address for a candidate that can be used for connectivity + // checks (RFC8839). "a=candidate:1 1 UDP 1 " + targetIp + " " + targetPort + " typ host" + "\n"; await pc!.setRemoteDescription({ type: "answer", sdp: remoteSdp }); diff --git a/src/libp2p/collection/multi_stream.rs b/src/libp2p/collection/multi_stream.rs index 23ef41392d..48d09cf8b1 100644 --- a/src/libp2p/collection/multi_stream.rs +++ b/src/libp2p/collection/multi_stream.rs @@ -15,6 +15,8 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use crate::util::{self, protobuf}; + use super::{ super::{ connection::{established, noise}, @@ -25,8 +27,9 @@ use super::{ NotificationsOutErr, OverlayNetwork, PeerId, ShutdownCause, SubstreamId, }; -use alloc::{collections::VecDeque, string::ToString as _, sync::Arc, vec::Vec}; +use alloc::{collections::VecDeque, string::ToString as _, sync::Arc, vec, vec::Vec}; use core::{ + cmp, hash::Hash, ops::{Add, Sub}, time::Duration, @@ -45,6 +48,12 @@ enum MultiStreamConnectionTaskInner { /// Noise handshake in progress. Always `Some`, except to be temporarily extracted. handshake: Option, + /// All incoming data for the handshake substream is first transferred to this buffer. + // TODO: this is very suboptimal code, instead the parsing should be done in a streaming way + handshake_read_buffer: Vec, + + handshake_read_buffer_partial_read: usize, + /// Other substreams, besides [`MultiStreamConnectionTaskInner::Handshake::opened_substream`], /// that have been opened. For each substream, contains a boolean indicating whether the /// substream is outbound (`true`) or inbound (`false`). @@ -167,10 +176,12 @@ where connection: MultiStreamConnectionTaskInner::Handshake { handshake: Some(noise::HandshakeInProgress::new(noise::Config { key: &noise_key, - is_initiator: true, // TODO: is_initiator? + is_initiator: false, // TODO: is_initiator? prologue: &noise_prologue, })), opened_substream: None, + handshake_read_buffer: Vec::new(), + handshake_read_buffer_partial_read: 0, extra_open_substreams: hashbrown::HashMap::with_capacity_and_hasher( 0, Default::default(), @@ -730,9 +741,11 @@ where } MultiStreamConnectionTaskInner::Handshake { opened_substream: Some(opened_substream), + handshake_read_buffer, .. } if opened_substream == substream_id => { // TODO: the handshake has failed, kill the connection? + handshake_read_buffer.clear(); } MultiStreamConnectionTaskInner::Handshake { extra_open_substreams, @@ -768,6 +781,8 @@ where MultiStreamConnectionTaskInner::Handshake { handshake, opened_substream, + handshake_read_buffer, + handshake_read_buffer_partial_read, established, extra_open_substreams, } if opened_substream @@ -775,12 +790,144 @@ where .map_or(false, |s| s == substream_id) => { // TODO: check the handshake timeout - match handshake.take().unwrap().read_write(read_write) { + + // The Noise data is not directly the data of the substream. Instead, everything + // is wrapped within a Protobuf frame. For this reason, we first transfer the data + // to a buffer. + // + // According to the libp2p WebRTC spec, a frame and its length prefix must not be + // larger than 16kiB, meaning that the read buffer never has to exceed this size. + // TODO: this is very suboptimal; improve + if let Some(incoming_buffer) = read_write.incoming_buffer { + // TODO: reset the substream if `remote_writing_side_closed` + let max_to_transfer = + cmp::min(incoming_buffer.len(), 16384 - handshake_read_buffer.len()); + handshake_read_buffer.extend_from_slice(&incoming_buffer[..max_to_transfer]); + debug_assert!(handshake_read_buffer.len() <= 16384); + read_write.advance_read(max_to_transfer); + } + + // Try to parse the content of `handshake_read_buffer`. + // If the content of `handshake_read_buffer` is an incomplete frame, the flags + // will be `None` and the message will be `&[]`. + let (protobuf_frame_size, flags, message_within_frame) = { + let mut parser = nom::combinator::complete::<_, _, nom::error::Error<&[u8]>, _>( + nom::combinator::map_parser( + nom::multi::length_data(crate::util::leb128::nom_leb128_usize), + protobuf::message_decode! { + #[optional] flags = 1 => protobuf::enum_tag_decode, + #[optional] message = 2 => protobuf::bytes_tag_decode, + }, + ), + ); + + match nom::Finish::finish(parser(&handshake_read_buffer)) { + Ok((rest, framed_message)) => { + let protobuf_frame_size = handshake_read_buffer.len() - rest.len(); + ( + protobuf_frame_size, + framed_message.flags, + framed_message.message.unwrap_or(&[][..]), + ) + } + Err(err) if err.code == nom::error::ErrorKind::Eof => { + // TODO: reset the substream if incoming_buffer is full, as it means that the frame is too large, and remove the debug_assert below + debug_assert!(handshake_read_buffer.len() < 16384); + (0, None, &[][..]) + } + Err(_) => { + // Message decoding error. + // TODO: no, handshake failed + return true; + } + } + }; + + // We allocate a buffer where the Noise state machine will temporarily write out + // its data. The size of the buffer is capped in order to prevent the substream + // from generating data that wouldn't fit in a single protobuf frame. + let mut intermediary_write_buffer = + vec![ + 0; + cmp::min(read_write.outgoing_buffer_available(), 16384).saturating_sub(10) + ]; // TODO: this -10 calculation is hacky because we need to account for the variable length prefixes everywhere + + let mut sub_read_write = ReadWrite { + now: read_write.now.clone(), + incoming_buffer: Some( + &message_within_frame[*handshake_read_buffer_partial_read..], + ), + outgoing_buffer: Some((&mut intermediary_write_buffer, &mut [])), + read_bytes: 0, + written_bytes: 0, + wake_up_after: None, + }; + + let handshake_outcome = handshake.take().unwrap().read_write(&mut sub_read_write); + *handshake_read_buffer_partial_read += sub_read_write.read_bytes; + if let Some(wake_up_after) = &sub_read_write.wake_up_after { + read_write.wake_up_after(wake_up_after) + } + + // Send out the message that the Noise handshake has written + // into `intermediary_write_buffer`. + if sub_read_write.written_bytes != 0 { + let written_bytes = sub_read_write.written_bytes; + drop(sub_read_write); + + debug_assert!(written_bytes <= intermediary_write_buffer.len()); + + let protobuf_frame = + protobuf::bytes_tag_encode(2, &intermediary_write_buffer[..written_bytes]); + let protobuf_frame_len = protobuf_frame.clone().fold(0, |mut l, b| { + l += AsRef::<[u8]>::as_ref(&b).len(); + l + }); + + // The spec mentions that a frame plus its length prefix shouldn't exceed + // 16kiB. This is normally ensured by forbidding the substream from writing + // more data than would fit in 16kiB. + debug_assert!(protobuf_frame_len <= 16384); + debug_assert!( + util::leb128::encode_usize(protobuf_frame_len).count() + protobuf_frame_len + <= 16384 + ); + for byte in util::leb128::encode_usize(protobuf_frame_len) { + read_write.write_out(&[byte]); + } + for buffer in protobuf_frame { + read_write.write_out(AsRef::<[u8]>::as_ref(&buffer)); + } + } + + if protobuf_frame_size != 0 + && message_within_frame.len() <= *handshake_read_buffer_partial_read + { + // If the substream state machine has processed all the data within + // `read_buffer`, process the flags of the current protobuf frame and + // discard that protobuf frame so that at the next iteration we pick + // up the rest. + + // Discard the data. + *handshake_read_buffer_partial_read = 0; + *handshake_read_buffer = handshake_read_buffer + .split_at(protobuf_frame_size) + .1 + .to_vec(); + + // Process the flags. + // TODO: ignore FIN and treat any other flag as error + if flags.map_or(false, |f| f != 0) { + todo!() + } + } + + match handshake_outcome { Ok(noise::NoiseHandshake::InProgress(handshake_update)) => { *handshake = Some(handshake_update); false } - Err(_err) => todo!(), // TODO: /!\ + Err(_err) => todo!("{:?}", _err), // TODO: /!\ Ok(noise::NoiseHandshake::Success { cipher: _, remote_peer_id, diff --git a/src/libp2p/connection/established/multi_stream.rs b/src/libp2p/connection/established/multi_stream.rs index 1ce51812c2..25fc41cfe0 100644 --- a/src/libp2p/connection/established/multi_stream.rs +++ b/src/libp2p/connection/established/multi_stream.rs @@ -21,11 +21,11 @@ use super::{ super::super::read_write::ReadWrite, substream, Config, ConfigNotifications, ConfigRequestResponse, ConfigRequestResponseIn, Event, SubstreamId, SubstreamIdInner, }; -use crate::util; +use crate::util::{self, protobuf}; -use alloc::{collections::VecDeque, string::String, vec::Vec}; +use alloc::{collections::VecDeque, string::String, vec, vec::Vec}; use core::{ - fmt, + cmp, fmt, hash::Hash, iter, ops::{Add, Sub}, @@ -44,11 +44,8 @@ pub struct MultiStream { /// to notifications and requests, and "in substreams", used for API purposes when it comes to /// raw data sent/received on a substream. When the user for example resets an "in substream", /// the "out substream" remains valid. - in_substreams: hashbrown::HashMap< - TSubId, - (substream::Substream, u32), - util::SipHasherBuild, - >, + in_substreams: + hashbrown::HashMap, util::SipHasherBuild>, out_in_substreams_map: hashbrown::HashMap, @@ -59,7 +56,7 @@ pub struct MultiStream { /// Every time an outgoing substream is opened, an item is pulled from this list. /// /// Does not include the ping substream. - desired_out_substreams: VecDeque<(substream::Substream, u32)>, + desired_out_substreams: VecDeque>, /// Substream used for outgoing pings. /// @@ -92,6 +89,23 @@ pub struct MultiStream { ping_timeout: Duration, } +struct Substream { + id: u32, + /// Underlying state machine for the substream. Always `Some` while the substream is alive, + /// and `None` if it has been reset. + inner: Option>, + /// All incoming data is first transferred to this buffer. + // TODO: this is very suboptimal code, instead the parsing should be done in a streaming way + read_buffer: Vec, + /// The buffer within `read_buffer` might contain a full Protobuf frame, but not all of the + /// data within that frame was processed by the underlying substream. + /// Contains the number of bytes of the message in `read_buffer` that the substream state + /// machine has already processed. + read_buffer_partial_read: usize, + remote_writing_side_closed: bool, + local_writing_side_closed: bool, +} + const MAX_PENDING_EVENTS: usize = 4; impl MultiStream @@ -181,7 +195,7 @@ where /// Panics if there already exists a substream with an identical identifier. /// pub fn add_substream(&mut self, id: TSubId, outbound: bool) { - let (substream, out_substream_id) = if !outbound { + let substream = if !outbound { let out_substream_id = self.next_out_substream_id; self.next_out_substream_id += 1; @@ -194,20 +208,28 @@ where .chain(iter::once(self.ping_protocol.clone())) .collect::>(); - ( - substream::Substream::ingoing(supported_protocols), - out_substream_id, - ) + Substream { + id: out_substream_id, + inner: Some(substream::Substream::ingoing(supported_protocols)), + read_buffer: Vec::new(), + read_buffer_partial_read: 0, + local_writing_side_closed: false, + remote_writing_side_closed: false, + } } else if self.ping_substream.is_none() { let out_substream_id = self.next_out_substream_id; self.next_out_substream_id += 1; self.ping_substream = Some(id.clone()); - ( - substream::Substream::ping_out(self.ping_protocol.clone()), - out_substream_id, - ) + Substream { + id: out_substream_id, + inner: Some(substream::Substream::ping_out(self.ping_protocol.clone())), + read_buffer: Vec::new(), + read_buffer_partial_read: 0, + local_writing_side_closed: false, + remote_writing_side_closed: false, + } } else if let Some(desired) = self.desired_out_substreams.pop_front() { desired } else { @@ -215,7 +237,10 @@ where todo!() }; - let previous_value = self.in_substreams.insert(id, (substream, out_substream_id)); + let _prev_val = self.out_in_substreams_map.insert(substream.id, id.clone()); + debug_assert!(_prev_val.is_none()); + + let previous_value = self.in_substreams.insert(id, substream); if previous_value.is_some() { // There is already a substream with that identifier. This is forbidden by the API of // this function. @@ -232,15 +257,17 @@ where /// Panics if there is no substream with that identifier. /// pub fn reset_substream(&mut self, substream_id: &TSubId) { - let (substream, out_substream_id) = self.in_substreams.remove(substream_id).unwrap(); + let substream = self.in_substreams.remove(substream_id).unwrap(); + let _was_in = self.out_in_substreams_map.remove(&substream.id); + debug_assert!(!_was_in.is_some()); if Some(substream_id) == self.ping_substream.as_ref() { self.ping_substream = None; } - let maybe_event = substream.reset(); + let maybe_event = substream.inner.unwrap().reset(); if let Some(event) = maybe_event { - self.on_substream_event(out_substream_id, event); + Self::on_substream_event(&mut self.pending_events, substream.id, event); } } @@ -257,89 +284,317 @@ where /// /// Panics if there is no substream with that identifier. /// + // TODO: clarify docs to explain that in the case of WebRTC the reading and writing sides never close, and substream can only ever reset pub fn substream_read_write( &mut self, substream_id: &TSubId, read_write: &'_ mut ReadWrite<'_, TNow>, ) -> bool { + let mut substream = self.in_substreams.get_mut(substream_id).unwrap(); + + // Reading/writing the ping substream is used to queue new outgoing pings. + if Some(substream_id) == self.ping_substream.as_ref() { + if read_write.now >= self.next_ping { + let payload = self + .ping_payload_randomness + .sample(rand::distributions::Standard); + substream + .inner + .as_mut() + .unwrap() + .queue_ping(&payload, read_write.now.clone() + self.ping_timeout); + self.next_ping = read_write.now.clone() + self.ping_interval; + } + + read_write.wake_up_after(&self.next_ping); + } + + // TODO: make it explicit in the API that this is indeed the WebRTC protocol, as almost everything below is WebRTC-specific + loop { // Don't process any more data before events are pulled. if self.pending_events.len() >= MAX_PENDING_EVENTS { return false; } - // TODO: not great to remove then insert back the substream - let (substream_id, (mut substream, out_substream_id)) = - self.in_substreams.remove_entry(substream_id).unwrap(); - - // Reading/writing the ping substream is used to queue new outgoing pings. - if Some(&substream_id) == self.ping_substream.as_ref() { - if read_write.now >= self.next_ping { - let payload = self - .ping_payload_randomness - .sample(rand::distributions::Standard); - substream.queue_ping(&payload, read_write.now.clone() + self.ping_timeout); - self.next_ping = read_write.now.clone() + self.ping_interval; - } + // In the situation where there's not enough space in the outgoing buffer to write an + // outgoing Protobuf frame, we just return immediately. + // This is necessary because calling `substream.read_write` can generate a write + // close message. + // TODO: this is error-prone, as we have no guarantee that the outgoing buffer will ever be > 6 bytes, for example in principle the API user could decide to use only a write buffer of 2 bytes, although that would be a very stupid thing to do + if read_write.outgoing_buffer_available() < 6 { + return false; + } - read_write.wake_up_after(&self.next_ping); + // If this flag is still `false` at the end of the loop, we break out of it. + let mut continue_looping = false; + + // The incoming data is not directly the data of the substream. Instead, everything + // is wrapped within a Protobuf frame. For this reason, we first transfer the data to + // a buffer. + // + // According to the libp2p WebRTC spec, a frame and its length prefix must not be + // larger than 16kiB, meaning that the read buffer never has to exceed this size. + // TODO: this is very suboptimal; improve + if let Some(incoming_buffer) = read_write.incoming_buffer { + // TODO: reset the substream if `remote_writing_side_closed` + let max_to_transfer = + cmp::min(incoming_buffer.len(), 16384 - substream.read_buffer.len()); + substream + .read_buffer + .extend_from_slice(&incoming_buffer[..max_to_transfer]); + debug_assert!(substream.read_buffer.len() <= 16384); + if max_to_transfer != incoming_buffer.len() { + continue_looping = true; + } + read_write.advance_read(max_to_transfer); } - let (mut substream_update, event) = substream.read_write(read_write); + // Try to parse the content of `self.read_buffer`. + // If the content of `self.read_buffer` is an incomplete frame, the flags will be + // `None` and the message will be `&[]`. + let (protobuf_frame_size, flags, message_within_frame) = { + let mut parser = nom::combinator::complete::<_, _, nom::error::Error<&[u8]>, _>( + nom::combinator::map_parser( + nom::multi::length_data(crate::util::leb128::nom_leb128_usize), + protobuf::message_decode! { + #[optional] flags = 1 => protobuf::enum_tag_decode, + #[optional] message = 2 => protobuf::bytes_tag_decode, + }, + ), + ); + + match nom::Finish::finish(parser(&substream.read_buffer)) { + Ok((rest, framed_message)) => { + let protobuf_frame_size = substream.read_buffer.len() - rest.len(); + ( + protobuf_frame_size, + framed_message.flags, + framed_message.message.unwrap_or(&[][..]), + ) + } + Err(err) if err.code == nom::error::ErrorKind::Eof => { + // TODO: reset the substream if incoming_buffer is full, as it means that the frame is too large, and remove the debug_assert below + debug_assert!(substream.read_buffer.len() < 16384); + (0, None, &[][..]) + } + Err(_) => { + // Message decoding error. + // TODO: no, must ask the state machine to reset + return true; + } + } + }; + + let event = if protobuf_frame_size != 0 + && message_within_frame.len() <= substream.read_buffer_partial_read + { + // If the substream state machine has already processed all the data within + // `read_buffer`, process the flags of the current protobuf frame, discard that + // protobuf frame, and loop again. + continue_looping = true; + + // Discard the data. + substream.read_buffer_partial_read = 0; + substream.read_buffer = substream + .read_buffer + .split_at(protobuf_frame_size) + .1 + .to_vec(); + + // Process the flags. + // Note that the `STOP_SENDING` flag is ignored. + + // If the remote has sent a `FIN` or `RESET_STREAM` flag, mark the remote writing + // side as closed. + if flags.map_or(false, |f| f == 0 || f == 2) { + substream.remote_writing_side_closed = true; + } + + // If the remote has sent a `RESET_STREAM` flag, also reset the substream. + if flags.map_or(false, |f| f == 2) { + substream.inner.take().unwrap().reset() + } else { + None + } + } else { + // We allocate a buffer where the substream state machine will temporarily write + // out its data. The size of the buffer is capped in order to prevent the substream + // from generating data that wouldn't fit in a single protobuf frame. + let mut intermediary_write_buffer = + vec![ + 0; + cmp::min(read_write.outgoing_buffer_available(), 16384).saturating_sub(10) + ]; // TODO: this -10 calculation is hacky because we need to account for the variable length prefixes everywhere + + let mut sub_read_write = ReadWrite { + now: read_write.now.clone(), + incoming_buffer: if substream.remote_writing_side_closed { + None + } else { + Some(&message_within_frame[substream.read_buffer_partial_read..]) + }, + outgoing_buffer: if substream.local_writing_side_closed { + None + } else { + Some((&mut intermediary_write_buffer, &mut [])) + }, + read_bytes: 0, + written_bytes: 0, + wake_up_after: None, + }; + + let (substream_update, event) = substream + .inner + .take() + .unwrap() + .read_write(&mut sub_read_write); + + substream.inner = substream_update; + substream.read_buffer_partial_read += sub_read_write.read_bytes; + if let Some(wake_up_after) = &sub_read_write.wake_up_after { + read_write.wake_up_after(wake_up_after) + } + + // Continue looping as the substream might have more data to read or write. + if sub_read_write.read_bytes != 0 || sub_read_write.written_bytes != 0 { + continue_looping = true; + } + + // Determine whether we should send a message on that substream with a specific + // flag. + let flag_to_write_out = if substream.inner.is_none() + && (!substream.remote_writing_side_closed + || sub_read_write.outgoing_buffer.is_some()) + { + // Send a `RESET_STREAM` if the state machine has reset while a side was still + // open. + Some(2) + } else if !substream.local_writing_side_closed + && sub_read_write.outgoing_buffer.is_none() + { + // Send a `FIN` if the state machine has closed the writing side while it + // wasn't closed before. + substream.local_writing_side_closed = true; + Some(0) + } else { + None + }; + + // Send out message. + if flag_to_write_out.is_some() || sub_read_write.written_bytes != 0 { + let written_bytes = sub_read_write.written_bytes; + drop(sub_read_write); + + debug_assert!(written_bytes <= intermediary_write_buffer.len()); + + let protobuf_frame = { + let flag_out = flag_to_write_out + .into_iter() + .flat_map(|f| protobuf::enum_tag_encode(1, f)); + let message_out = if written_bytes != 0 { + Some(&intermediary_write_buffer[..written_bytes]) + } else { + None + } + .into_iter() + .flat_map(|m| protobuf::bytes_tag_encode(2, m)); + flag_out + .map(either::Left) + .chain(message_out.map(either::Right)) + }; + + let protobuf_frame_len = protobuf_frame.clone().fold(0, |mut l, b| { + l += AsRef::<[u8]>::as_ref(&b).len(); + l + }); + + // The spec mentions that a frame plus its length prefix shouldn't exceed + // 16kiB. This is normally ensured by forbidding the substream from writing + // more data than would fit in 16kiB. + debug_assert!(protobuf_frame_len <= 16384); + debug_assert!( + util::leb128::encode_usize(protobuf_frame_len).count() + protobuf_frame_len + <= 16384 + ); + for byte in util::leb128::encode_usize(protobuf_frame_len) { + read_write.write_out(&[byte]); + } + for buffer in protobuf_frame { + read_write.write_out(AsRef::<[u8]>::as_ref(&buffer)); + } + + // We continue looping because the substream might have more data to send. + continue_looping = true; + } + + event + }; + + match event { + None => {} + + Some(substream::Event::InboundNegotiated(protocol)) => { + continue_looping = true; - match (event, substream_update.as_mut()) { - (None, _) => {} - (Some(substream::Event::InboundNegotiated(protocol)), Some(substream)) => { if protocol == self.ping_protocol { - substream.set_inbound_ty(substream::InboundTy::Ping); + substream + .inner + .as_mut() + .unwrap() + .set_inbound_ty(substream::InboundTy::Ping); } else if let Some(protocol_index) = self .request_protocols .iter() .position(|p| p.name == protocol) { - substream.set_inbound_ty(substream::InboundTy::Request { - protocol_index, - request_max_size: if let ConfigRequestResponseIn::Payload { max_size } = - self.request_protocols[protocol_index].inbound_config - { - Some(max_size) - } else { - None + substream.inner.as_mut().unwrap().set_inbound_ty( + substream::InboundTy::Request { + protocol_index, + request_max_size: if let ConfigRequestResponseIn::Payload { + max_size, + } = + self.request_protocols[protocol_index].inbound_config + { + Some(max_size) + } else { + None + }, }, - }); + ); } else if let Some(protocol_index) = self .notifications_protocols .iter() .position(|p| p.name == protocol) { - substream.set_inbound_ty(substream::InboundTy::Notifications { - protocol_index, - max_handshake_size: self.notifications_protocols[protocol_index] - .max_handshake_size, - }); + substream.inner.as_mut().unwrap().set_inbound_ty( + substream::InboundTy::Notifications { + protocol_index, + max_handshake_size: self.notifications_protocols[protocol_index] + .max_handshake_size, + }, + ); } else { unreachable!(); } + } - // TODO: a bit crappy to put this insert here; DRY - self.in_substreams - .insert(substream_id, (substream_update.unwrap(), out_substream_id)); - continue; + Some(other) => { + continue_looping = true; + Self::on_substream_event(&mut self.pending_events, substream.id, other) } - (Some(substream::Event::InboundNegotiated(_)), None) => {} - (Some(other), _) => self.on_substream_event(out_substream_id, other), } - break if let Some(substream_update) = substream_update { - self.in_substreams - .insert(substream_id, (substream_update, out_substream_id)); - false - } else { - if Some(&substream_id) == self.ping_substream.as_ref() { + if substream.inner.is_none() { + if Some(substream_id) == self.ping_substream.as_ref() { self.ping_substream = None; } - true - }; + self.out_in_substreams_map.remove(&substream.id); + self.in_substreams.remove(&substream_id); + break true; + } else if !continue_looping { + break false; + } } } @@ -350,8 +605,12 @@ where /// Intentionally panics on [`substream::Event::InboundNegotiated`]. Please handle this /// variant separately. /// - fn on_substream_event(&mut self, substream_id: u32, event: substream::Event) { - self.pending_events.push_back(match event { + fn on_substream_event( + pending_events: &mut VecDeque>, + substream_id: u32, + event: substream::Event, + ) { + pending_events.push_back(match event { substream::Event::InboundNegotiated(_) => panic!(), substream::Event::InboundError(error) => Event::InboundError(error), substream::Event::RequestIn { @@ -450,8 +709,9 @@ where let substream_id = self.next_out_substream_id; self.next_out_substream_id += 1; - self.desired_out_substreams.push_back(( - substream::Substream::request_out( + self.desired_out_substreams.push_back(Substream { + id: substream_id, + inner: Some(substream::Substream::request_out( self.request_protocols[protocol_index].name.clone(), // TODO: clone :-/ timeout, if has_length_prefix { @@ -461,9 +721,12 @@ where }, self.request_protocols[protocol_index].max_response_size, user_data, - ), - substream_id, - )); + )), + read_buffer: Vec::new(), + read_buffer_partial_read: 0, + local_writing_side_closed: false, + remote_writing_side_closed: false, + }); // TODO: ? do this? substream.reserve_window(128 * 1024 * 1024 + 128); // TODO: proper max size @@ -487,7 +750,9 @@ where self.in_substreams .get_mut(inner_substream_id) .unwrap() - .0 + .inner + .as_mut() + .unwrap() .notifications_substream_user_data_mut() } @@ -522,16 +787,20 @@ where let substream_id = self.next_out_substream_id; self.next_out_substream_id += 1; - self.desired_out_substreams.push_back(( - substream::Substream::notifications_out( + self.desired_out_substreams.push_back(Substream { + id: substream_id, + inner: Some(substream::Substream::notifications_out( timeout, self.notifications_protocols[protocol_index].name.clone(), // TODO: clone :-/, handshake, max_handshake_size, user_data, - ), - substream_id, - )); + )), + read_buffer: Vec::new(), + read_buffer_partial_read: 0, + local_writing_side_closed: false, + remote_writing_side_closed: false, + }); SubstreamId(SubstreamIdInner::MultiStream(substream_id)) } @@ -561,7 +830,9 @@ where self.in_substreams .get_mut(inner_substream_id) .unwrap() - .0 + .inner + .as_mut() + .unwrap() .accept_in_notifications_substream(handshake, max_notification_size, user_data); } @@ -584,7 +855,9 @@ where self.in_substreams .get_mut(inner_substream_id) .unwrap() - .0 + .inner + .as_mut() + .unwrap() .reject_in_notifications_substream(); } @@ -620,7 +893,9 @@ where self.in_substreams .get_mut(inner_substream_id) .unwrap() - .0 + .inner + .as_mut() + .unwrap() .write_notification_unbounded(notification); } @@ -644,7 +919,9 @@ where self.in_substreams .get(inner_substream_id) .unwrap() - .0 + .inner + .as_ref() + .unwrap() .notification_substream_queued_bytes() } @@ -671,7 +948,9 @@ where self.in_substreams .get_mut(inner_substream_id) .unwrap() - .0 + .inner + .as_mut() + .unwrap() .close_notifications_substream(); } @@ -693,7 +972,9 @@ where self.in_substreams .get_mut(inner_substream_id) .ok_or(substream::RespondInRequestError::SubstreamClosed)? - .0 + .inner + .as_mut() + .unwrap() .respond_in_request(response) } } diff --git a/src/network/protocol/storage_call_proof.rs b/src/network/protocol/storage_call_proof.rs index 8a6621c5f3..bc7199b4f9 100644 --- a/src/network/protocol/storage_call_proof.rs +++ b/src/network/protocol/storage_call_proof.rs @@ -33,7 +33,7 @@ pub struct StorageProofRequestConfig { /// Builds the bytes corresponding to a storage proof request. pub fn build_storage_proof_request<'a>( - config: StorageProofRequestConfig + 'a> + 'a>, + config: StorageProofRequestConfig + Clone + 'a> + 'a>, ) -> impl Iterator + 'a> + 'a { protobuf::message_tag_encode( 2, diff --git a/src/network/service.rs b/src/network/service.rs index 0d1202221a..ba6235625d 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -735,7 +735,7 @@ where now: TNow, target: &PeerId, chain_index: usize, - config: protocol::StorageProofRequestConfig>>, + config: protocol::StorageProofRequestConfig + Clone>>, timeout: Duration, ) -> OutRequestId { let request_data = diff --git a/src/util/protobuf.rs b/src/util/protobuf.rs index 34e0046d14..470a3763e2 100644 --- a/src/util/protobuf.rs +++ b/src/util/protobuf.rs @@ -66,8 +66,8 @@ pub(crate) fn message_tag_encode<'a>( pub(crate) fn bytes_tag_encode<'a>( field: u64, - data: impl AsRef<[u8]> + 'a, -) -> impl Iterator + 'a> + 'a { + data: impl AsRef<[u8]> + Clone + 'a, +) -> impl Iterator + Clone + 'a> + Clone + 'a { // Protobuf only allows 2 GiB of data. debug_assert!(data.as_ref().len() <= 2 * 1024 * 1024 * 1024); delimited_tag_encode(field, data) @@ -75,8 +75,9 @@ pub(crate) fn bytes_tag_encode<'a>( pub(crate) fn string_tag_encode<'a>( field: u64, - data: impl AsRef + 'a, -) -> impl Iterator + 'a> + 'a { + data: impl AsRef + Clone + 'a, +) -> impl Iterator + Clone + 'a> + Clone + 'a { + #[derive(Clone)] struct Wrapper(T); impl> AsRef<[u8]> for Wrapper { fn as_ref(&self) -> &[u8] { @@ -97,8 +98,8 @@ pub(crate) fn varint_zigzag_tag_encode(field: u64, value: u64) -> impl Iterator< pub(crate) fn delimited_tag_encode<'a>( field: u64, - data: impl AsRef<[u8]> + 'a, -) -> impl Iterator + 'a> + 'a { + data: impl AsRef<[u8]> + Clone + 'a, +) -> impl Iterator + Clone + 'a> + Clone + 'a { tag_encode(field, 2) .chain(leb128::encode_usize(data.as_ref().len())) .map(|v| either::Right([v]))