-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transport
trait definition and in-memory implementation
#363
Conversation
Some hooks to make it compile but nothing really works until I have in memory transport implementation
I really need to change `TransportCommand` definition, so interim commit is required
They require `Transport` adoption
Also fix a reordering bug inside the `Gateway` - you must wait until `Gateway` subscribes to the given query before returning
but it doesn't. Abort simply puts task into detached state and does not cause drop.
@@ -9,7 +9,7 @@ async fn main() -> Result<(), Error> { | |||
let mut config = TestWorldConfig::default(); | |||
config.gateway_config.send_buffer_config.items_in_batch = 1; | |||
config.gateway_config.send_buffer_config.batch_count = 1000; | |||
let world = TestWorld::new_with(config); | |||
let world = TestWorld::new_with(config).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lots of files changed just because TestWorld
constructor is async
now. It must be async
because Transport
is capable of serving multiple queries running in parallel (i.e. there must be an event loop somewhere) and Gateway
needs to subscribe and wait until Transport
acknowledges the request to route query-specific commands to that gateway
|
||
let control_handle = tokio::spawn(async move { | ||
const INTERVAL: Duration = Duration::from_secs(3); | ||
|
||
let mut receive_buf = ReceiveBuffer::default(); | ||
let mut send_buf = SendBuffer::new(config.send_buffer_config); | ||
|
||
let sleep = ::tokio::time::sleep(INTERVAL); | ||
let mut pending_sends = FuturesUnordered::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
send
may block if flow control is in place. this allows event loop to continue serving other traffic if send is blocked. There will be at most one in-flight send
/// resolve this identifier into something (Uri, encryption keys, etc) must consult configuration | ||
#[derive(Debug, Clone, Eq, PartialEq, Hash)] | ||
pub struct HelperIdentity { | ||
id: u8, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as we discussed, helper identity needs to be an opaque identifier. Maybe u8 is not the right choice for it, once we shape out helper config, we may decide to change it. but something lightweight is needed (this struct must be cloned often)
/// if `roles_to_helpers` does not have all 3 roles | ||
pub async fn send(&self, message_chunks: MessageChunks) -> Result<(), Error> { | ||
let (channel, payload) = message_chunks; | ||
let destination = self.roles.identity(channel.role); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the key responsibility of this struct: resolve helper identifiers (something that transport understand) to roles (infra) before sending them up
|
||
/// Network interface for components that require communication. | ||
#[async_trait] | ||
pub trait Network: Sync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
everything in this file was moved from original network
and is deprecated now
/// make it a no-op. | ||
#[cfg(not(all(test, feature = "shuttle")))] | ||
#[pin_project::pin_project] | ||
pub struct Timer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized that shuttle tests are broken again after I added tokio sleep because it requires Tokio runtime. So I had to add a wrapper to make it work in both environments
@@ -29,20 +29,11 @@ pub mod peer { | |||
#[derive(Clone, Debug)] | |||
#[cfg_attr(feature = "enable-serde", derive(serde::Deserialize))] | |||
pub struct Config { | |||
#[cfg_attr(feature = "enable-serde", serde(deserialize_with = "uri_from_str"))] | |||
#[cfg_attr(feature = "enable-serde", serde(with = "crate::uri"))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
URI struct serialization is needed in more than one place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
loop { | ||
::tokio::select! { | ||
Some(command) = rx.recv() => { | ||
match command { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we get this deep, it might pay to have a command dispatch function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very likely yes, I would probably postpone that change till I implement query management commands - would have a more clear picture how it would look like
/// Starts listening to the incoming messages in a separate task. Can only be called once | ||
/// and only when it is in the `Idle` state. | ||
pub fn listen(&mut self) { | ||
let State::Idle(mut rx, peers) = std::mem::replace(&mut self.state, State::Preparing) else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a fan of having this be impossible; that is, you have a type that manages preparation with a consuming function that does the preparation and returns the final, functional entity.
That pattern tends to bubble upwards, but I would be inclined to let it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% agree, it is me being lazy again
|
||
impl Switch { | ||
pub fn new(id: HelperIdentity) -> Self { | ||
let (tx, rx) = mpsc::channel(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some amount of buffering here is likely to be very useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good, and i like the changes you made to my code
#[must_use] | ||
pub fn role(&self, id: &HelperIdentity) -> Role { | ||
for (idx, item) in self.helper_roles.iter().enumerate() { | ||
if item == id { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe you could use .find
here? easier to read than early return.
alternatively, would it be worth it to flip the map (HelperIdentity
-> Role
) and store that for usage instead of a short loop every time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I can make .find
work because it returns the value but I need an index here. itertools
can do it but I am still hesitant to bring it :(
wrt to reverse - this lookup is basically free, the whole array fits inside L1 entry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's fine. minor detail
#[cfg(any(test, feature = "test-fixture"))] | ||
impl From<tokio::sync::mpsc::error::SendError<TransportCommand>> for Error { | ||
fn from(value: tokio::sync::mpsc::error::SendError<TransportCommand>) -> Self { | ||
Self::SendFailed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this is easier than what i had
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if it is going to be good enough for long term, I'd like to see the destination in the error message. but good enough for now I think
/// Query/step data received from a helper peer. | ||
/// TODO: this is really bad for performance, once we have channel per step all the way | ||
/// from gateway to network, this definition should be (QueryId, Step, Stream<Item = Vec<u8>>) instead | ||
StepData(QueryId, Step, Vec<u8>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so how will QPL respond to commands?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking that we would need something at the TransportCommand
layer, but not sure. I decided to keep it out of scope to make this review easier
src/test_fixture/transport/mod.rs
Outdated
// during runtime shutdown. Other schedulers (ahem shuttle) may not do that and what | ||
// happens is 3 switch tasks remain blocked awaiting messages from each other. In this | ||
// case a deadlock is detected. Hence this code just tries to explicitly close the switch | ||
// but because async drop is not a thing yet, we must hot loop here to drive it to completion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I switched to use weak references and it looks much better now - I don't need this hack anymore
I think the semantic is correct - once test world goes out of scope, it invalidates all transport references
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @thurstonsand and @martinthomson for prompt review!
#[must_use] | ||
pub fn role(&self, id: &HelperIdentity) -> Role { | ||
for (idx, item) in self.helper_roles.iter().enumerate() { | ||
if item == id { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I can make .find
work because it returns the value but I need an index here. itertools
can do it but I am still hesitant to bring it :(
wrt to reverse - this lookup is basically free, the whole array fits inside L1 entry.
#[cfg(any(test, feature = "test-fixture"))] | ||
impl From<tokio::sync::mpsc::error::SendError<TransportCommand>> for Error { | ||
fn from(value: tokio::sync::mpsc::error::SendError<TransportCommand>) -> Self { | ||
Self::SendFailed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if it is going to be good enough for long term, I'd like to see the destination in the error message. but good enough for now I think
/// Query/step data received from a helper peer. | ||
/// TODO: this is really bad for performance, once we have channel per step all the way | ||
/// from gateway to network, this definition should be (QueryId, Step, Stream<Item = Vec<u8>>) instead | ||
StepData(QueryId, Step, Vec<u8>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking that we would need something at the TransportCommand
layer, but not sure. I decided to keep it out of scope to make this review easier
src/test_fixture/transport/mod.rs
Outdated
// during runtime shutdown. Other schedulers (ahem shuttle) may not do that and what | ||
// happens is 3 switch tasks remain blocked awaiting messages from each other. In this | ||
// case a deadlock is detected. Hence this code just tries to explicitly close the switch | ||
// but because async drop is not a thing yet, we must hot loop here to drive it to completion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I switched to use weak references and it looks much better now - I don't need this hack anymore
/// Starts listening to the incoming messages in a separate task. Can only be called once | ||
/// and only when it is in the `Idle` state. | ||
pub fn listen(&mut self) { | ||
let State::Idle(mut rx, peers) = std::mem::replace(&mut self.state, State::Preparing) else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% agree, it is me being lazy again
loop { | ||
::tokio::select! { | ||
Some(command) = rx.recv() => { | ||
match command { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very likely yes, I would probably postpone that change till I implement query management commands - would have a more clear picture how it would look like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah way easier to understand
As we discussed in #352, we need another abstraction on the network layer that is capable of sending messages and delivering events. This change brings the trait defition to main branch along with in-memory implementation for it (both @thurstonsand and I were working on it).
TestWorld
have been migrated to use transport layer too - hence the size of this change. The reason why I want to merge it to main is because @martinthomson, @andyleiserson are working on the same codebase at the moment and merging it later will be super painful.We can focus this review around
Transport
trait definition,InMemoryTransport
andNetwork
structs. Other changes are pretty mechanical.