-
Notifications
You must be signed in to change notification settings - Fork 184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge batcher generic over containers #474
Merge batcher generic over containers #474
Conversation
fc409d5
to
ca9d2e7
Compare
This change splits the default merge batcher implementation into a type that maintains the outer part of its algorithm, specifically knows how to maintain chains, and an inner part that knows how to maintain the individual batches in chains. The benefit is that the outer part does not need to know about the contents of the containers it holds on to because that's encapsulated in the inner trait's implementation. Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
ca9d2e7
to
b07aa9e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This generally looks good! I left some comments from our review, one of which is correctness-y, but others are nits that we can clean up as you like. On bonus ask is that perhaps we could find a name other than Batch
to avoid clashing with pre-existing uses. We discussed Block
or Chunk
, neither of which are especially more insightful .. but if another name presents itself amazing! :D
for mut buffer in merged { | ||
for (data, time, diff) in buffer.drain(..) { | ||
if upper.less_equal(&time) { | ||
frontier.insert(time.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider insert_ref
here to avoid a clone! :D
Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read through, and it all seems plausible! Hard to be 100% certain, but it seems like a great path forward.
let form_chain = |this: &mut Self, final_chain: &mut Vec<Self::Chunk>, stash: &mut _| { | ||
if this.pending.len() == this.pending.capacity() { | ||
consolidate_updates(&mut this.pending); | ||
if this.pending.len() > this.pending.capacity() / 2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, but I think this can be >=
. More generally, I think we are looking for this.pending.len() >= this.chunk_capacity()
, if we ever end up not maintaining exactly twice the capacity. Idk if it's worth switching over to reveal the intent. If that is the intent (I inferred it, but it could be wrong).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, and I had the same hunch at some point. Changed it to what you suggest, because it seems to be easier to reason about. The amount of data we compact isn't affected by this because we'll merge the chains at some point anyways.
Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
/// TODO | ||
type Time; | ||
/// TODO | ||
fn accept(&mut self, batch: RefOrMut<C>, stash: &mut Vec<Self::Batch>) -> Self::Batch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return type should probably be an iterator over batches.
let form_chain = |this: &mut Self, final_chain: &mut Vec<Self::Chunk>, stash: &mut _| { | ||
if this.pending.len() == this.pending.capacity() { | ||
consolidate_updates(&mut this.pending); | ||
if this.pending.len() > this.pending.capacity() / 2 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, and I had the same hunch at some point. Changed it to what you suggest, because it seems to be easier to reason about. The amount of data we compact isn't affected by this because we'll merge the chains at some point anyways.
Merge batcher that's generic over input containers and internal chains, with specific implementations.
Ideas
At the moment, a merge batcher receives a stream of vectors. It consolidates the input vectors, and inserts them into its queue structure. When sealing, it extracts ready data and presents it record-by-record to the builder. It inserts future updates into its queue.
This introduces several opportunities to introduce containers:
Vec<usize>
and copying into a new container in sorted order.