Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport trait definition and in-memory implementation #363

Merged
merged 27 commits into from
Dec 23, 2022

Conversation

akoshelev
Copy link
Collaborator

As we discussed in #352, we need another abstraction on the network layer that is capable of sending messages and delivering events. This change brings the trait defition to main branch along with in-memory implementation for it (both @thurstonsand and I were working on it). TestWorld have been migrated to use transport layer too - hence the size of this change. The reason why I want to merge it to main is because @martinthomson, @andyleiserson are working on the same codebase at the moment and merging it later will be super painful.

We can focus this review around Transport trait definition, InMemoryTransport and Network structs. Other changes are pretty mechanical.

@@ -9,7 +9,7 @@ async fn main() -> Result<(), Error> {
let mut config = TestWorldConfig::default();
config.gateway_config.send_buffer_config.items_in_batch = 1;
config.gateway_config.send_buffer_config.batch_count = 1000;
let world = TestWorld::new_with(config);
let world = TestWorld::new_with(config).await;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lots of files changed just because TestWorld constructor is async now. It must be async because Transport is capable of serving multiple queries running in parallel (i.e. there must be an event loop somewhere) and Gateway needs to subscribe and wait until Transport acknowledges the request to route query-specific commands to that gateway


let control_handle = tokio::spawn(async move {
const INTERVAL: Duration = Duration::from_secs(3);

let mut receive_buf = ReceiveBuffer::default();
let mut send_buf = SendBuffer::new(config.send_buffer_config);

let sleep = ::tokio::time::sleep(INTERVAL);
let mut pending_sends = FuturesUnordered::new();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send may block if flow control is in place. this allows event loop to continue serving other traffic if send is blocked. There will be at most one in-flight send

/// resolve this identifier into something (Uri, encryption keys, etc) must consult configuration
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct HelperIdentity {
id: u8,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we discussed, helper identity needs to be an opaque identifier. Maybe u8 is not the right choice for it, once we shape out helper config, we may decide to change it. but something lightweight is needed (this struct must be cloned often)

/// if `roles_to_helpers` does not have all 3 roles
pub async fn send(&self, message_chunks: MessageChunks) -> Result<(), Error> {
let (channel, payload) = message_chunks;
let destination = self.roles.identity(channel.role);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the key responsibility of this struct: resolve helper identifiers (something that transport understand) to roles (infra) before sending them up


/// Network interface for components that require communication.
#[async_trait]
pub trait Network: Sync {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everything in this file was moved from original network and is deprecated now

/// make it a no-op.
#[cfg(not(all(test, feature = "shuttle")))]
#[pin_project::pin_project]
pub struct Timer {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that shuttle tests are broken again after I added tokio sleep because it requires Tokio runtime. So I had to add a wrapper to make it work in both environments

@@ -29,20 +29,11 @@ pub mod peer {
#[derive(Clone, Debug)]
#[cfg_attr(feature = "enable-serde", derive(serde::Deserialize))]
pub struct Config {
#[cfg_attr(feature = "enable-serde", serde(deserialize_with = "uri_from_str"))]
#[cfg_attr(feature = "enable-serde", serde(with = "crate::uri"))]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

URI struct serialization is needed in more than one place

@akoshelev akoshelev marked this pull request as ready for review December 23, 2022 01:56
Copy link
Member

@martinthomson martinthomson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

src/test_fixture/transport/network.rs Outdated Show resolved Hide resolved
loop {
::tokio::select! {
Some(command) = rx.recv() => {
match command {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we get this deep, it might pay to have a command dispatch function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very likely yes, I would probably postpone that change till I implement query management commands - would have a more clear picture how it would look like

/// Starts listening to the incoming messages in a separate task. Can only be called once
/// and only when it is in the `Idle` state.
pub fn listen(&mut self) {
let State::Idle(mut rx, peers) = std::mem::replace(&mut self.state, State::Preparing) else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a fan of having this be impossible; that is, you have a type that manages preparation with a consuming function that does the preparation and returns the final, functional entity.

That pattern tends to bubble upwards, but I would be inclined to let it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agree, it is me being lazy again


impl Switch {
pub fn new(id: HelperIdentity) -> Self {
let (tx, rx) = mpsc::channel(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some amount of buffering here is likely to be very useful.

Copy link

@thurstonsand thurstonsand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, and i like the changes you made to my code

#[must_use]
pub fn role(&self, id: &HelperIdentity) -> Role {
for (idx, item) in self.helper_roles.iter().enumerate() {
if item == id {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you could use .find here? easier to read than early return.

alternatively, would it be worth it to flip the map (HelperIdentity -> Role) and store that for usage instead of a short loop every time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I can make .find work because it returns the value but I need an index here. itertools can do it but I am still hesitant to bring it :(

wrt to reverse - this lookup is basically free, the whole array fits inside L1 entry.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's fine. minor detail

src/helpers/network.rs Outdated Show resolved Hide resolved
src/helpers/network.rs Outdated Show resolved Hide resolved
src/helpers/network.rs Show resolved Hide resolved
#[cfg(any(test, feature = "test-fixture"))]
impl From<tokio::sync::mpsc::error::SendError<TransportCommand>> for Error {
fn from(value: tokio::sync::mpsc::error::SendError<TransportCommand>) -> Self {
Self::SendFailed {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this is easier than what i had

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if it is going to be good enough for long term, I'd like to see the destination in the error message. but good enough for now I think

src/helpers/transport/mod.rs Outdated Show resolved Hide resolved
/// Query/step data received from a helper peer.
/// TODO: this is really bad for performance, once we have channel per step all the way
/// from gateway to network, this definition should be (QueryId, Step, Stream<Item = Vec<u8>>) instead
StepData(QueryId, Step, Vec<u8>),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so how will QPL respond to commands?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking that we would need something at the TransportCommand layer, but not sure. I decided to keep it out of scope to make this review easier

// during runtime shutdown. Other schedulers (ahem shuttle) may not do that and what
// happens is 3 switch tasks remain blocked awaiting messages from each other. In this
// case a deadlock is detected. Hence this code just tries to explicitly close the switch
// but because async drop is not a thing yet, we must hot loop here to drive it to completion

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched to use weak references and it looks much better now - I don't need this hack anymore

I think the semantic is correct - once test world goes out of scope, it invalidates all transport references
Copy link
Collaborator Author

@akoshelev akoshelev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @thurstonsand and @martinthomson for prompt review!

#[must_use]
pub fn role(&self, id: &HelperIdentity) -> Role {
for (idx, item) in self.helper_roles.iter().enumerate() {
if item == id {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I can make .find work because it returns the value but I need an index here. itertools can do it but I am still hesitant to bring it :(

wrt to reverse - this lookup is basically free, the whole array fits inside L1 entry.

src/helpers/network.rs Outdated Show resolved Hide resolved
#[cfg(any(test, feature = "test-fixture"))]
impl From<tokio::sync::mpsc::error::SendError<TransportCommand>> for Error {
fn from(value: tokio::sync::mpsc::error::SendError<TransportCommand>) -> Self {
Self::SendFailed {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if it is going to be good enough for long term, I'd like to see the destination in the error message. but good enough for now I think

src/helpers/transport/mod.rs Outdated Show resolved Hide resolved
/// Query/step data received from a helper peer.
/// TODO: this is really bad for performance, once we have channel per step all the way
/// from gateway to network, this definition should be (QueryId, Step, Stream<Item = Vec<u8>>) instead
StepData(QueryId, Step, Vec<u8>),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking that we would need something at the TransportCommand layer, but not sure. I decided to keep it out of scope to make this review easier

// during runtime shutdown. Other schedulers (ahem shuttle) may not do that and what
// happens is 3 switch tasks remain blocked awaiting messages from each other. In this
// case a deadlock is detected. Hence this code just tries to explicitly close the switch
// but because async drop is not a thing yet, we must hot loop here to drive it to completion
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched to use weak references and it looks much better now - I don't need this hack anymore

/// Starts listening to the incoming messages in a separate task. Can only be called once
/// and only when it is in the `Idle` state.
pub fn listen(&mut self) {
let State::Idle(mut rx, peers) = std::mem::replace(&mut self.state, State::Preparing) else {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agree, it is me being lazy again

loop {
::tokio::select! {
Some(command) = rx.recv() => {
match command {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very likely yes, I would probably postpone that change till I implement query management commands - would have a more clear picture how it would look like

Copy link

@thurstonsand thurstonsand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah way easier to understand

src/test_fixture/transport/network.rs Outdated Show resolved Hide resolved
@akoshelev akoshelev merged commit 82f7a48 into private-attribution:main Dec 23, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants