-
Notifications
You must be signed in to change notification settings - Fork 11.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[server + bench] Now start the batch maker, and integrate into bench #1154
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,9 +4,11 @@ | |
|
||
use async_trait::async_trait; | ||
use futures::channel::mpsc::{channel, Receiver}; | ||
use futures::SinkExt; | ||
use futures::Stream; | ||
use futures::{SinkExt, StreamExt}; | ||
use std::io; | ||
use sui_network::network::{parse_recv_bytes, NetworkClient}; | ||
use sui_network::transport::TcpDataStream; | ||
use sui_types::batch::UpdateItem; | ||
use sui_types::{error::SuiError, messages::*, serialize::*}; | ||
|
||
|
@@ -141,7 +143,7 @@ impl AuthorityAPI for AuthorityClient { | |
loop { | ||
let next_data = tcp_stream.read_data().await.transpose(); | ||
let data_result = parse_recv_bytes(next_data); | ||
match deserialize_batch_info(data_result) { | ||
match data_result.and_then(deserialize_batch_info) { | ||
Ok(batch_info_response_item) => { | ||
// send to the caller via the channel | ||
let _ = tx_output.send(Ok(batch_info_response_item.clone())).await; | ||
|
@@ -172,3 +174,56 @@ impl AuthorityAPI for AuthorityClient { | |
Ok(tr_output) | ||
} | ||
} | ||
|
||
impl AuthorityClient { | ||
/// Handle Batch information requests for this authority. | ||
pub async fn handle_batch_streaming_as_stream( | ||
&self, | ||
request: BatchInfoRequest, | ||
) -> Result<impl Stream<Item = Result<BatchInfoResponseItem, SuiError>>, io::Error> { | ||
let tcp_stream = self | ||
.0 | ||
.connect_for_stream(serialize_batch_request(&request)) | ||
.await?; | ||
|
||
let mut error_count = 0; | ||
let TcpDataStream { framed_read, .. } = tcp_stream; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have never seen this syntax before, this is useful. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah if you own something outright (not just & or &mut), you can break it up subject to some visibility constraints I think. |
||
|
||
let stream = framed_read | ||
.map(|item| { | ||
item | ||
// Convert io error to SuiCLient error | ||
.map_err(|err| SuiError::ClientIoError { | ||
error: format!("io error: {:?}", err), | ||
}) | ||
// If no error try to deserialize | ||
.and_then(|bytes| match deserialize_message(&bytes[..]) { | ||
Ok(SerializedMessage::Error(error)) => Err(SuiError::ClientIoError { | ||
error: format!("io error: {:?}", error), | ||
}), | ||
Ok(message) => Ok(message), | ||
Err(_) => Err(SuiError::InvalidDecoding), | ||
}) | ||
// If deserialized try to parse as Batch Item | ||
.and_then(deserialize_batch_info) | ||
}) | ||
// Establish conditions to stop taking from the stream | ||
.take_while(move |item| { | ||
let flag = match item { | ||
Ok(BatchInfoResponseItem(UpdateItem::Batch(signed_batch))) => { | ||
signed_batch.batch.next_sequence_number < request.end | ||
} | ||
Ok(BatchInfoResponseItem(UpdateItem::Transaction((seq, _digest)))) => { | ||
*seq < request.end | ||
} | ||
Err(_e) => { | ||
// TODO: record e | ||
error_count += 1; | ||
error_count < MAX_ERRORS | ||
} | ||
}; | ||
futures::future::ready(flag) | ||
}); | ||
Ok(stream) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahhhh. This is how we would do this functionally. I see the map function allows us to turn a "stream" into an iterator, and this is possible because the framed_read part of the tcpDataStream is iterable. I attempted to iterate on the entire stream and that doesn't work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah the read part is a stream, and the write part is a sink. They offer interesting combinators to build more fancy streams and sinks. |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have run this locally and confirmed that it is functioning as a stream now. 👍
We are only generating transaction load and not batch load so far, is that right or are we making batches somewhere, as alluded to in the title?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see batches when I increase the number of transactions in the microbench. See
Client received batch up to sequence ...
:Maybe what makes the difference is
--num-transactions 1000000 --batch-size 100
. The default values are too low number of transactions, and too high level of batching them to observe a follower batch.