-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transport
trait definition and in-memory implementation
#363
Merged
Merged
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
3299c4d
first draft of network + transport
6459371
move ChannelId, MessageChunks
3ecabfb
better roles_to_helpers
675e839
make HelperIdentity more opaque
c08f599
Make network struct use transport
akoshelev a25d210
First look at InMemoryTransport
akoshelev 36a9d8d
InMemoryTransport is almost functional
akoshelev 8deb0ef
Everything works except role resolving
akoshelev e65f6e7
Now everything works except e2e tests
akoshelev aa5675d
Some code cleanup
akoshelev 1b438d4
Temporarily disable e2e tests
akoshelev d4d1f8b
Lints and formatting
akoshelev 3f5d719
rename some things
akoshelev fa9ffa1
Make shuttle code compile and run the tests
akoshelev 44aa3ae
Lots of changes trying to make shuttle work
akoshelev 223dd43
Mamma mia, it works now
akoshelev 572f055
Lint and formatting
akoshelev 7f76a77
Fix benchmarks
akoshelev 038f0ab
Fix concurrency tests
akoshelev ff2b5b2
Some more fixes
akoshelev 2b6d288
Remove commented code
akoshelev 49eea5b
TryInto instead of Into for identity
akoshelev df2ced1
Doc changes
akoshelev f5267f7
Use weak references instead of halt hack
akoshelev 562939d
Remove `NetworkEventData`
akoshelev 522d2e5
Simplify Switch and InMemoryNetwork
akoshelev e5bdcd1
Remove commented code
akoshelev File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,29 +6,30 @@ | |
//! corresponding helper without needing to know the exact location - this is what this module | ||
//! enables MPC protocols to do. | ||
//! | ||
use crate::ff::{Field, Int}; | ||
use crate::helpers::buffers::{SendBuffer, SendBufferConfig}; | ||
use crate::helpers::{MessagePayload, MESSAGE_PAYLOAD_SIZE_BYTES}; | ||
use crate::task::JoinHandle; | ||
use crate::telemetry::labels::STEP; | ||
use crate::{ | ||
helpers::buffers::ReceiveBuffer, | ||
helpers::error::Error, | ||
helpers::network::{ChannelId, MessageEnvelope, Network}, | ||
helpers::Role, | ||
ff::{Field, Int}, | ||
helpers::{ | ||
buffers::{ReceiveBuffer, SendBuffer, SendBufferConfig}, | ||
network::ChannelId, | ||
Error, MessagePayload, Role, MESSAGE_PAYLOAD_SIZE_BYTES, | ||
}, | ||
protocol::{RecordId, Step}, | ||
task::JoinHandle, | ||
telemetry::{labels::STEP, metrics::RECORDS_SENT}, | ||
}; | ||
use ::tokio::sync::{mpsc, oneshot}; | ||
use ::tokio::time::Instant; | ||
use futures::SinkExt; | ||
use futures::StreamExt; | ||
use std::fmt::{Debug, Formatter}; | ||
use std::time::Duration; | ||
use std::{io, panic}; | ||
use tinyvec::array_vec; | ||
use tracing::Instrument; | ||
|
||
use crate::telemetry::metrics::RECORDS_SENT; | ||
use crate::helpers::buffers::PushError; | ||
use crate::helpers::network::{MessageEnvelope, Network}; | ||
use crate::helpers::time::Timer; | ||
use crate::helpers::transport::Transport; | ||
use ::tokio::sync::{mpsc, oneshot}; | ||
use futures_util::stream::FuturesUnordered; | ||
#[cfg(all(feature = "shuttle", test))] | ||
use shuttle::future as tokio; | ||
|
||
|
@@ -160,19 +161,18 @@ pub struct GatewayConfig { | |
} | ||
|
||
impl Gateway { | ||
pub fn new<N: Network>(role: Role, network: &N, config: GatewayConfig) -> Self { | ||
pub async fn new<T: Transport>(role: Role, network: Network<T>, config: GatewayConfig) -> Self { | ||
let (recv_tx, mut recv_rx) = mpsc::channel::<ReceiveRequest>(config.recv_outstanding); | ||
let (send_tx, mut send_rx) = mpsc::channel::<SendRequest>(config.send_outstanding); | ||
let mut message_stream = network.recv_stream(); | ||
let mut network_sink = network.sink(); | ||
let mut message_stream = network.recv_stream().await; | ||
|
||
let control_handle = tokio::spawn(async move { | ||
const INTERVAL: Duration = Duration::from_secs(3); | ||
|
||
let mut receive_buf = ReceiveBuffer::default(); | ||
let mut send_buf = SendBuffer::new(config.send_buffer_config); | ||
|
||
let sleep = ::tokio::time::sleep(INTERVAL); | ||
let mut pending_sends = FuturesUnordered::new(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
let sleep = Timer::new(INTERVAL); | ||
::tokio::pin!(sleep); | ||
|
||
loop { | ||
|
@@ -189,9 +189,14 @@ impl Gateway { | |
tracing::trace!("received {} bytes from {:?}", messages.len(), channel_id); | ||
receive_buf.receive_messages(&channel_id, &messages); | ||
} | ||
Some(send_req) = send_rx.recv() => { | ||
tracing::trace!("new SendRequest({:?})", send_req); | ||
send_message::<N>(&mut network_sink, &mut send_buf, send_req).await; | ||
Some((channel_id, envelope)) = send_rx.recv(), if pending_sends.is_empty() => { | ||
tracing::trace!("new SendRequest({:?})", (&channel_id, &envelope)); | ||
metrics::increment_counter!(RECORDS_SENT, STEP => channel_id.step.as_ref().to_string()); | ||
let data = send_buf.push(&channel_id, &envelope); | ||
pending_sends.push(send_message(&network, channel_id, data)); | ||
} | ||
Some(_) = &mut pending_sends.next() => { | ||
pending_sends.clear(); | ||
} | ||
_ = &mut sleep => { | ||
#[cfg(debug_assertions)] | ||
|
@@ -204,7 +209,7 @@ impl Gateway { | |
} | ||
|
||
// reset the timer on every action | ||
sleep.as_mut().reset(Instant::now() + INTERVAL); | ||
sleep.as_mut().reset(); | ||
} | ||
}.instrument(tracing::info_span!("gateway_loop", role=role.as_static_str()).or_current())); | ||
|
||
|
@@ -287,19 +292,24 @@ impl Debug for ReceiveRequest { | |
} | ||
} | ||
|
||
async fn send_message<N: Network>(sink: &mut N::Sink, buf: &mut SendBuffer, req: SendRequest) { | ||
let (channel_id, msg) = req; | ||
metrics::increment_counter!(RECORDS_SENT, STEP => channel_id.step.as_ref().to_string()); | ||
match buf.push(&channel_id, &msg) { | ||
async fn send_message<T: Transport>( | ||
network: &Network<T>, | ||
channel_id: ChannelId, | ||
data: Result<Option<Vec<u8>>, PushError>, | ||
) { | ||
// let (channel_id, msg) = req; | ||
// metrics::increment_counter!(RECORDS_SENT, STEP => channel_id.step.as_ref().to_string()); | ||
match data { | ||
Ok(Some(buf_to_send)) => { | ||
tracing::trace!("sending {} bytes to {:?}", buf_to_send.len(), &channel_id); | ||
sink.send((channel_id, buf_to_send)) | ||
network | ||
.send((channel_id, buf_to_send)) | ||
.await | ||
.expect("Failed to send data to the network"); | ||
} | ||
Ok(None) => {} | ||
Err(err) => panic!("failed to send to the {channel_id:?}: {err}"), | ||
}; | ||
} | ||
} | ||
|
||
#[cfg(debug_assertions)] | ||
|
@@ -329,7 +339,7 @@ mod tests { | |
config.gateway_config.send_buffer_config.items_in_batch = 1; // Send every record | ||
config.gateway_config.send_buffer_config.batch_count = 3; // keep 3 at a time | ||
|
||
let world = Box::leak(Box::new(TestWorld::new_with(config))); | ||
let world = Box::leak(Box::new(TestWorld::new_with(config).await)); | ||
let contexts = world.contexts::<Fp31>(); | ||
let sender_ctx = contexts[0].narrow("reordering-test"); | ||
let recv_ctx = contexts[1].narrow("reordering-test"); | ||
|
@@ -361,7 +371,7 @@ mod tests { | |
#[tokio::test] | ||
#[should_panic(expected = "Record RecordId(1) has been received twice")] | ||
async fn duplicate_message() { | ||
let world = TestWorld::new(); | ||
let world = TestWorld::new().await; | ||
let (v1, v2) = (Fp31::from(1u128), Fp31::from(2u128)); | ||
let peer = Role::H2; | ||
let record_id = 1.into(); | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lots of files changed just because
TestWorld
constructor isasync
now. It must beasync
becauseTransport
is capable of serving multiple queries running in parallel (i.e. there must be an event loop somewhere) andGateway
needs to subscribe and wait untilTransport
acknowledges the request to route query-specific commands to that gateway