-
Notifications
You must be signed in to change notification settings - Fork 11.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data ingestion] change remote reader implementation #16469
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
if self.remote_fetcher_receiver.is_none() { | ||
self.remote_fetcher_receiver = Some(self.start_remote_fetcher()); | ||
} | ||
Ok(checkpoints) | ||
while !self.exceeds_capacity(self.current_checkpoint_number + checkpoints.len() as u64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this does exactly as you expect, you may have some configured "limit" but the underlying fetcher disrespects this and will always try to fetch and enqueue batch_size*2
number of checkpoints. 1 batch_size
due to the buffered stream and 1 batch_size
due to the size of the channel. I suppose if this is sufficiently smaller than MAX_CHECKPOINTS_IN_PROGRESS this could be fine but seems a waste
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a totally different check to overall guarantee that system doesn't have more than MAX_CHECKPOINTS_IN_PROGRESS
tasks in progress.
batch_size << MAX_CHECKPOINTS_IN_PROGRESS. To give specific numbers currently it's 100 vs 10_000
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to elaborate more, this is required so we don't attempt to read a message from remote fetcher actor that we are not allowed to process at the moment
Ok(Ok(checkpoint)) => checkpoints.push(checkpoint), | ||
Ok(Err(err)) => { | ||
info!("remote reader transient error {:?}", err); | ||
self.remote_fetcher_receiver = None; | ||
return checkpoints; | ||
} | ||
Err(_) => break, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There seems to be two different error conditions that probably need to be handled differently.
The first Ok(Err(err))
seems to be a fetch that errored. Err(_)
has two cases, one where the channel is empty and we just need to wait some time till the other side can fill it, and disconnected where the channel will never have anymore data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, Ok(Err(err)) is the fetch that errored. It's ok to just log it and move on, self.remote_fetcher_receiver
is getting canceled further in the block and will get respawned on the next tick.
Other break clause was written specifically for Empty clause(i.e. move on until next tick so fetcher can accumulate more tasks).
So yeah, probably Disconnected
is not handled properly, although I'm not sure how it will get triggered without an error in corresponding task. But will add explicit handler
PR makes remote fetch implementation more similar to the indexer implementation
PR makes remote fetch implementation more similar to the indexer implementation