Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: avoid prefetching all sst streams at once #1069

Merged
merged 1 commit into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl Instance {
let scan_options_for_compaction = ScanOptions {
background_read_parallelism: 1,
max_record_batches_in_flight: MAX_RECORD_BATCHES_IN_FLIGHT_WHEN_COMPACTION_READ,
num_streams_to_prefetch: ctx.config.num_streams_to_prefetch,
};
let compaction_runtime = ctx.runtimes.compact_runtime.clone();
let compaction_scheduler = Arc::new(SchedulerImpl::new(
Expand All @@ -101,6 +102,7 @@ impl Instance {
let scan_options = ScanOptions {
background_read_parallelism: ctx.config.sst_background_read_parallelism,
max_record_batches_in_flight: ctx.config.scan_max_record_batches_in_flight,
num_streams_to_prefetch: ctx.config.num_streams_to_prefetch,
};

let iter_options = ctx
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ impl Instance {
request_id: request.request_id,
metrics_collector: Some(metrics_collector),
deadline: request.opts.deadline,
num_streams_to_prefetch: self.scan_options.num_streams_to_prefetch,
space_id: table_data.space_id,
table_id: table_data.id,
projected_schema: projected_schema.clone(),
Expand Down
4 changes: 4 additions & 0 deletions analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod instance;
mod manifest;
pub mod memtable;
mod payload;
pub mod prefetchable_stream;
pub mod row_iter;
mod sampler;
pub mod setup;
Expand Down Expand Up @@ -81,6 +82,8 @@ pub struct Config {
pub scan_max_record_batches_in_flight: usize,
/// Sst background reading parallelism
pub sst_background_read_parallelism: usize,
/// Number of streams to prefetch
pub num_streams_to_prefetch: usize,
/// Max buffer size for writing sst
pub write_sst_max_buffer_size: ReadableSize,
/// Max retry limit After flush failed
Expand Down Expand Up @@ -134,6 +137,7 @@ impl Default for Config {
preflush_write_buffer_size_ratio: 0.75,
scan_batch_size: None,
sst_background_read_parallelism: 8,
num_streams_to_prefetch: 2,
scan_max_record_batches_in_flight: 1024,
write_sst_max_buffer_size: ReadableSize::mb(10),
max_retry_flush_limit: 0,
Expand Down
172 changes: 172 additions & 0 deletions analytic_engine/src/prefetchable_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

// A stream can be prefetchable.

use async_stream::stream;
use async_trait::async_trait;
use futures::{Stream, StreamExt};

pub type BoxedStream<T> = Box<dyn Stream<Item = T> + Send + Unpin>;

#[async_trait]
pub trait PrefetchableStream: Send {
type Item;

/// Start the prefetch procedure in background. In most implementation, this
/// method should not block the caller, that is to say, the prefetching
/// procedure should be run in the background.
async fn start_prefetch(&mut self);

/// Fetch the next record batch.
///
/// If None is returned, all the following batches will be None.
async fn fetch_next(&mut self) -> Option<Self::Item>;
}

pub trait PrefetchableStreamExt: PrefetchableStream {
fn into_boxed_stream(mut self) -> BoxedStream<Self::Item>
where
Self: 'static + Sized,
Self::Item: Send,
{
let stream = stream! {
while let Some(v) = self.fetch_next().await {
yield v;
}
};

// FIXME: Will this conversion to a stream introduce overhead?
Box::new(Box::pin(stream))
}

fn filter_map<F, O>(self, f: F) -> FilterMap<Self, F>
where
F: FnMut(Self::Item) -> Option<O>,
Self: Sized,
{
FilterMap { stream: self, f }
}

fn map<F, O>(self, f: F) -> Map<Self, F>
where
F: FnMut(Self::Item) -> O,
Self: Sized,
{
Map { stream: self, f }
}
}

impl<T: ?Sized> PrefetchableStreamExt for T where T: PrefetchableStream {}

#[async_trait]
impl<T> PrefetchableStream for Box<dyn PrefetchableStream<Item = T>> {
type Item = T;

async fn start_prefetch(&mut self) {
(**self).start_prefetch().await;
}

async fn fetch_next(&mut self) -> Option<T> {
(**self).fetch_next().await
}
}

/// The implementation for `filter_map` operator on the PrefetchableStream.
pub struct FilterMap<St, F> {
stream: St,
f: F,
}

#[async_trait]
impl<St, F, O> PrefetchableStream for FilterMap<St, F>
where
St: PrefetchableStream,
F: FnMut(St::Item) -> Option<O> + Send,
O: Send,
{
type Item = O;

async fn start_prefetch(&mut self) {
self.stream.start_prefetch().await;
}

async fn fetch_next(&mut self) -> Option<O> {
loop {
match self.stream.fetch_next().await {
Some(v) => {
let filtered_batch = (self.f)(v);
if filtered_batch.is_some() {
return filtered_batch;
}
// If the filtered batch is none, just continue to fetch and
// filter until the underlying stream is exhausted.
}
None => return None,
}
}
}
}

/// The implementation for `map` operator on the PrefetchableStream.
pub struct Map<St, F> {
stream: St,
f: F,
}

#[async_trait]
impl<St, F, O> PrefetchableStream for Map<St, F>
where
St: PrefetchableStream,
F: FnMut(St::Item) -> O + Send,
O: Send,
{
type Item = O;

async fn start_prefetch(&mut self) {
self.stream.start_prefetch().await;
}

async fn fetch_next(&mut self) -> Option<O> {
self.stream.fetch_next().await.map(|v| (self.f)(v))
}
}

/// A noop prefetcher.
///
/// A wrapper with a underlying stream without prefetch logic.
pub struct NoopPrefetcher<T>(pub BoxedStream<T>);

#[async_trait]
impl<T> PrefetchableStream for NoopPrefetcher<T> {
type Item = T;

async fn start_prefetch(&mut self) {
// It's just a noop operation.
}

async fn fetch_next(&mut self) -> Option<T> {
self.0.next().await
}
}

#[cfg(test)]
mod tests {
use futures::stream;

use super::*;

#[tokio::test]
async fn test_trait_object_prefetchable_stream() {
let numbers = vec![1, 2, 3];
let stream = stream::iter(numbers.clone());
let stream = NoopPrefetcher(Box::new(stream));
let mut stream: Box<dyn PrefetchableStream<Item = i32>> = Box::new(stream);

let mut fetched_numbers = Vec::with_capacity(numbers.len());
while let Some(v) = stream.fetch_next().await {
fetched_numbers.push(v);
}

assert_eq!(numbers, fetched_numbers);
}
}
34 changes: 28 additions & 6 deletions analytic_engine/src/row_iter/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use common_types::{
projected_schema::ProjectedSchema, record_batch::RecordBatchWithKey, request_id::RequestId,
schema::RecordSchemaWithKey,
};
use futures::StreamExt;
use generic_error::GenericError;
use log::debug;
use macros::define_result;
Expand All @@ -20,7 +19,7 @@ use trace_metric::{MetricsCollector, TraceMetricWhenDrop};

use crate::{
row_iter::{
record_batch_stream, record_batch_stream::SequencedRecordBatchStream,
record_batch_stream, record_batch_stream::BoxedPrefetchableRecordBatchStream,
RecordBatchWithKeyIterator,
},
space::SpaceId,
Expand Down Expand Up @@ -61,6 +60,7 @@ pub struct ChainConfig<'a> {
pub projected_schema: ProjectedSchema,
/// Predicate of the query.
pub predicate: PredicateRef,
pub num_streams_to_prefetch: usize,

pub sst_read_options: SstReadOptions,
/// Sst factory
Expand Down Expand Up @@ -172,8 +172,10 @@ impl<'a> Builder<'a> {
request_id: self.config.request_id,
schema: self.config.projected_schema.to_record_schema_with_key(),
streams,
num_streams_to_prefetch: self.config.num_streams_to_prefetch,
ssts: self.ssts,
next_stream_idx: 0,
next_prefetch_stream_idx: 0,
inited_at: None,
created_at: Instant::now(),
metrics: Metrics::new(
Expand Down Expand Up @@ -248,17 +250,19 @@ pub struct ChainIterator {
table_id: TableId,
request_id: RequestId,
schema: RecordSchemaWithKey,
streams: Vec<SequencedRecordBatchStream>,
streams: Vec<BoxedPrefetchableRecordBatchStream>,
num_streams_to_prefetch: usize,
/// ssts are kept here to avoid them from being purged.
#[allow(dead_code)]
ssts: Vec<Vec<FileHandle>>,
/// The range of the index is [0, streams.len()] and the iterator is
/// exhausted if it reaches `streams.len()`.
next_stream_idx: usize,
next_prefetch_stream_idx: usize,

inited_at: Option<Instant>,
created_at: Instant,
// metrics for the iterator.
/// metrics for the iterator.
metrics: Metrics,
}

Expand All @@ -273,6 +277,18 @@ impl ChainIterator {
self.space_id, self.table_id, self.request_id, self.streams.len(), self.schema
);
}

/// Maybe prefetch the necessary stream for future reading.
async fn maybe_prefetch(&mut self) {
while self.next_prefetch_stream_idx < self.next_stream_idx + self.num_streams_to_prefetch
&& self.next_prefetch_stream_idx < self.streams.len()
{
self.streams[self.next_prefetch_stream_idx]
.start_prefetch()
.await;
self.next_prefetch_stream_idx += 1;
}
}
}

impl Drop for ChainIterator {
Expand All @@ -294,11 +310,12 @@ impl RecordBatchWithKeyIterator for ChainIterator {

async fn next_batch(&mut self) -> Result<Option<RecordBatchWithKey>> {
self.init_if_necessary();
self.maybe_prefetch().await;

while self.next_stream_idx < self.streams.len() {
let read_stream = &mut self.streams[self.next_stream_idx];
let sequenced_record_batch = read_stream
.next()
.fetch_next()
.await
.transpose()
.context(PollNextRecordBatch)?;
Expand All @@ -313,7 +330,10 @@ impl RecordBatchWithKeyIterator for ChainIterator {
}
}
// Fetch next stream only if the current sequence_record_batch is None.
None => self.next_stream_idx += 1,
None => {
self.next_stream_idx += 1;
self.maybe_prefetch().await;
}
}
}

Expand Down Expand Up @@ -357,8 +377,10 @@ mod tests {
request_id: RequestId::next_id(),
schema: schema.to_record_schema_with_key(),
streams,
num_streams_to_prefetch: 2,
ssts: Vec::new(),
next_stream_idx: 0,
next_prefetch_stream_idx: 0,
inited_at: None,
created_at: Instant::now(),
metrics: Metrics::new(0, 0, None),
Expand Down
Loading