Skip to content

Commit

Permalink
chore: refactor implement stream for batch of streams
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Mar 23, 2023
1 parent da03cb3 commit d46717c
Showing 1 changed file with 85 additions and 54 deletions.
139 changes: 85 additions & 54 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,21 @@ use std::{
};

use common_types::{
projected_schema::ProjectedSchema, record_batch::RecordBatch, schema::RecordSchema,
projected_schema::ProjectedSchema,
record_batch::{RecordBatch, RecordBatchWithKey},
schema::RecordSchema,
time::TimeRange,
};
use common_util::{define_result, error::BoxError, runtime::Runtime};
use common_util::{define_result, error::BoxError};
use futures::stream::Stream;
use log::{debug, error, trace};
use log::debug;
use snafu::{ResultExt, Snafu};
use table_engine::{
stream::{
self, ErrWithSource, PartitionedStreams, RecordBatchStream, SendableRecordBatchStream,
},
table::ReadRequest,
};
use tokio::sync::mpsc::{self, Receiver};
use trace_metric::Metric;

use crate::{
Expand Down Expand Up @@ -66,7 +67,6 @@ pub enum Error {

define_result!(Error);

const RECORD_BATCH_READ_BUF_SIZE: usize = 1000;
const MERGE_SORT_METRIC_NAME: &str = "do_merge_sort";
const ITER_NUM_METRIC_NAME: &str = "iter_num";
const MERGE_ITER_METRICS_COLLECTOR_NAME_PREFIX: &str = "merge_iter";
Expand Down Expand Up @@ -140,7 +140,7 @@ impl Instance {

let mut streams = Vec::with_capacity(read_parallelism);
for iters in splitted_iters {
let stream = iters_to_stream(iters, self.read_runtime(), &request.projected_schema);
let stream = iters_to_stream(iters, request.projected_schema.clone());
streams.push(stream);
}

Expand Down Expand Up @@ -334,70 +334,101 @@ impl Instance {
}
}

// TODO(xikai): this is a hack way to implement SendableRecordBatchStream for
// MergeIterator.
fn iters_to_stream<T>(
collection: T,
runtime: &Runtime,
schema: &ProjectedSchema,
) -> SendableRecordBatchStream
where
T: IntoIterator + Send + 'static,
T::Item: RecordBatchWithKeyIterator,
T::IntoIter: Send,
{
let (tx, rx) = mpsc::channel(RECORD_BATCH_READ_BUF_SIZE);
let projected_schema = schema.clone();

runtime.spawn(async move {
for mut iter in collection {
while let Some(record_batch) = iter.next_batch().await.transpose() {
let record_batch = record_batch.box_err().context(ErrWithSource {
msg: "read record batch",
});

// Apply the projection to RecordBatchWithKey and gets the final RecordBatch.
let record_batch = record_batch.and_then(|batch_with_key| {
// TODO(yingwen): Try to use projector to do this, which precompute row
// indexes to project.
batch_with_key
.try_project(&projected_schema)
.box_err()
.context(ErrWithSource {
msg: "project record batch",
})
});

trace!("send next record batch:{:?}", record_batch);
if tx.send(record_batch).await.is_err() {
error!("Failed to send record batch from the merge iterator");
break;
}
struct StreamStateOnMultiIters<I> {
iters: Vec<I>,
curr_iter_idx: usize,
projected_schema: ProjectedSchema,
}

impl<I: RecordBatchWithKeyIterator + 'static> StreamStateOnMultiIters<I> {
fn is_exhausted(&self) -> bool {
self.curr_iter_idx >= self.iters.len()
}

fn advance(&mut self) {
self.curr_iter_idx += 1;
}

fn curr_iter_mut(&mut self) -> &mut I {
&mut self.iters[self.curr_iter_idx]
}

async fn fetch_next_batch(
&mut self,
) -> Option<std::result::Result<RecordBatchWithKey, I::Error>> {
loop {
if self.is_exhausted() {
return None;
}

let iter = self.curr_iter_mut();
if let Some(v) = iter.next_batch().await.transpose() {
return Some(v);
}

self.advance();
}
}
}

fn iters_to_stream(
iters: Vec<impl RecordBatchWithKeyIterator + 'static>,
projected_schema: ProjectedSchema,
) -> SendableRecordBatchStream {
let state = StreamStateOnMultiIters {
projected_schema: projected_schema.clone(),
iters,
curr_iter_idx: 0,
};

let record_batch_stream = futures::stream::unfold(state, |mut state| async move {
let projected_schema = state.projected_schema.clone();
state
.fetch_next_batch()
.await
.map(|record_batch| {
record_batch
.box_err()
.context(ErrWithSource {
msg: "read record batch",
})
.and_then(|batch_with_key| {
// TODO(yingwen): Try to use projector to do this, which pre-compute row
// indexes to project.
batch_with_key
.try_project(&projected_schema)
.box_err()
.context(ErrWithSource {
msg: "project record batch",
})
})
})
.map(|record_batch| (record_batch, state))
});

Box::pin(ChannelledRecordBatchStream {
schema: schema.to_record_schema(),
rx,
})
let record_schema = projected_schema.to_record_schema();
let stream_with_schema = RecordBatchStreamWithSchema {
schema: record_schema,
inner_stream: Box::pin(Box::pin(record_batch_stream)),
};
Box::pin(stream_with_schema)
}

pub struct ChannelledRecordBatchStream {
pub struct RecordBatchStreamWithSchema {
schema: RecordSchema,
rx: Receiver<stream::Result<RecordBatch>>,
inner_stream: Pin<Box<dyn Stream<Item = stream::Result<RecordBatch>> + Send + Unpin>>,
}

impl Stream for ChannelledRecordBatchStream {
impl Stream for RecordBatchStreamWithSchema {
type Item = stream::Result<RecordBatch>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
Pin::new(&mut this.rx).poll_recv(cx)
this.inner_stream.as_mut().poll_next(cx)
}
}

impl RecordBatchStream for ChannelledRecordBatchStream {
impl RecordBatchStream for RecordBatchStreamWithSchema {
fn schema(&self) -> &RecordSchema {
&self.schema
}
Expand Down

0 comments on commit d46717c

Please sign in to comment.