Skip to content

Commit

Permalink
fix comments and namings.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Nov 29, 2023
1 parent 8e33b8d commit 3c0c4c0
Show file tree
Hide file tree
Showing 13 changed files with 33 additions and 40 deletions.
8 changes: 4 additions & 4 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::{
chain::{ChainConfig, ChainIterator},
dedup::DedupIterator,
merge::{MergeBuilder, MergeConfig, MergeIterator},
IterOptions, RecordBatchWithKeyIterator,
FetchingRecordBatchIterator, IterOptions,
},
table::{
data::TableData,
Expand Down Expand Up @@ -168,7 +168,7 @@ impl Instance {
fn build_partitioned_streams(
&self,
request: &ReadRequest,
partitioned_iters: Vec<impl RecordBatchWithKeyIterator + 'static>,
partitioned_iters: Vec<impl FetchingRecordBatchIterator + 'static>,
) -> Result<PartitionedStreams> {
let read_parallelism = request.opts.read_parallelism;

Expand Down Expand Up @@ -365,7 +365,7 @@ struct StreamStateOnMultiIters<I> {
projected_schema: ProjectedSchema,
}

impl<I: RecordBatchWithKeyIterator + 'static> StreamStateOnMultiIters<I> {
impl<I: FetchingRecordBatchIterator + 'static> StreamStateOnMultiIters<I> {
fn is_exhausted(&self) -> bool {
self.curr_iter_idx >= self.iters.len()
}
Expand Down Expand Up @@ -397,7 +397,7 @@ impl<I: RecordBatchWithKeyIterator + 'static> StreamStateOnMultiIters<I> {
}

fn iters_to_stream(
iters: Vec<impl RecordBatchWithKeyIterator + 'static>,
iters: Vec<impl FetchingRecordBatchIterator + 'static>,
projected_schema: ProjectedSchema,
) -> SendableRecordBatchStream {
let mut state = StreamStateOnMultiIters {
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/instance/reorder_memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub enum Error {
define_result!(Error);

pub type DfResult<T> = std::result::Result<T, DataFusionError>;
type SendableRecordBatchWithkeyStream =
type SendableFetchingRecordBatchStream =
Pin<Box<dyn Stream<Item = Result<FetchingRecordBatch>> + Send>>;

impl From<DataFusionError> for Error {
Expand Down Expand Up @@ -253,7 +253,7 @@ impl Reorder {

// TODO: In theory we can construct a physical plan directly, here we choose
// logical because it has a convenient builder API for use.
pub async fn into_stream(self) -> Result<SendableRecordBatchWithkeyStream> {
pub async fn into_stream(self) -> Result<SendableFetchingRecordBatchStream> {
// 1. Init datafusion context
let runtime = Arc::new(RuntimeEnv::default());
let state = SessionState::with_config_rt(SessionConfig::new(), runtime);
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/row_iter/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::{
record_batch_stream::{
self, BoxedPrefetchableRecordBatchStream, MemtableStreamContext, SstStreamContext,
},
RecordBatchWithKeyIterator,
FetchingRecordBatchIterator,
},
space::SpaceId,
sst::{
Expand Down Expand Up @@ -377,7 +377,7 @@ impl Drop for ChainIterator {
}

#[async_trait]
impl RecordBatchWithKeyIterator for ChainIterator {
impl FetchingRecordBatchIterator for ChainIterator {
type Error = Error;

fn schema(&self) -> &RecordSchemaWithKey {
Expand Down
6 changes: 3 additions & 3 deletions analytic_engine/src/row_iter/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use logger::{info, trace};
use macros::define_result;
use snafu::{ResultExt, Snafu};

use crate::row_iter::{IterOptions, RecordBatchWithKeyIterator};
use crate::row_iter::{FetchingRecordBatchIterator, IterOptions};

#[derive(Debug, Snafu)]
pub enum Error {
Expand Down Expand Up @@ -67,7 +67,7 @@ pub struct DedupIterator<I> {
total_selected_rows: usize,
}

impl<I: RecordBatchWithKeyIterator> DedupIterator<I> {
impl<I: FetchingRecordBatchIterator> DedupIterator<I> {
pub fn new(request_id: RequestId, iter: I, iter_options: IterOptions) -> Self {
let schema_with_key = iter.schema();
let primary_key_indexes = schema_with_key.primary_key_idx().to_vec();
Expand Down Expand Up @@ -173,7 +173,7 @@ impl<I: RecordBatchWithKeyIterator> DedupIterator<I> {
}

#[async_trait]
impl<I: RecordBatchWithKeyIterator> RecordBatchWithKeyIterator for DedupIterator<I> {
impl<I: FetchingRecordBatchIterator> FetchingRecordBatchIterator for DedupIterator<I> {
type Error = Error;

fn schema(&self) -> &RecordSchemaWithKey {
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/row_iter/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::{
self, BoxedPrefetchableRecordBatchStream, MemtableStreamContext, SequencedRecordBatch,
SstStreamContext,
},
IterOptions, RecordBatchWithKeyIterator,
FetchingRecordBatchIterator, IterOptions,
},
space::SpaceId,
sst::{
Expand Down Expand Up @@ -900,7 +900,7 @@ impl MergeIterator {
}

#[async_trait]
impl RecordBatchWithKeyIterator for MergeIterator {
impl FetchingRecordBatchIterator for MergeIterator {
type Error = Error;

fn schema(&self) -> &RecordSchemaWithKey {
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/row_iter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct IterOptions {
/// The `schema()` should be the same as the RecordBatch from `read()`.
/// The reader is exhausted if the `read()` returns the `Ok(None)`.
#[async_trait]
pub trait RecordBatchWithKeyIterator: Send {
pub trait FetchingRecordBatchIterator: Send {
type Error: std::error::Error + Send + Sync + 'static;

fn schema(&self) -> &RecordSchemaWithKey;
Expand All @@ -47,7 +47,7 @@ pub trait RecordBatchWithKeyIterator: Send {
-> std::result::Result<Option<FetchingRecordBatch>, Self::Error>;
}

pub fn record_batch_with_key_iter_to_stream<I: RecordBatchWithKeyIterator + Unpin + 'static>(
pub fn record_batch_with_key_iter_to_stream<I: FetchingRecordBatchIterator + Unpin + 'static>(
mut iter: I,
) -> RecordBatchStream {
let record_batch_stream = try_stream! {
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/row_iter/record_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub enum Error {

define_result!(Error);

// TODO(yingwen): Can we move sequence to RecordBatchWithKey and remove this
// TODO(yingwen): Can we move sequence to FetchingRecordBatch and remove this
// struct? But what is the sequence after merge?
#[derive(Debug)]
pub struct SequencedRecordBatch {
Expand Down
6 changes: 3 additions & 3 deletions analytic_engine/src/row_iter/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use common_types::{
use macros::define_result;
use snafu::Snafu;

use crate::row_iter::RecordBatchWithKeyIterator;
use crate::row_iter::FetchingRecordBatchIterator;

#[derive(Debug, Snafu)]
pub enum Error {}
Expand All @@ -49,7 +49,7 @@ impl VectorIterator {
}

#[async_trait]
impl RecordBatchWithKeyIterator for VectorIterator {
impl FetchingRecordBatchIterator for VectorIterator {
type Error = Error;

fn schema(&self) -> &RecordSchemaWithKey {
Expand Down Expand Up @@ -105,7 +105,7 @@ pub fn build_fetching_record_batch_with_key(schema: Schema, rows: Vec<Row>) -> F
builder.build().unwrap()
}

pub async fn check_iterator<T: RecordBatchWithKeyIterator>(iter: &mut T, expected_rows: Vec<Row>) {
pub async fn check_iterator<T: FetchingRecordBatchIterator>(iter: &mut T, expected_rows: Vec<Row>) {
let mut visited_rows = 0;
while let Some(batch) = iter.next_batch().await.unwrap() {
for row_idx in 0..batch.num_rows() {
Expand Down
12 changes: 6 additions & 6 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ use crate::{

const PRUNE_ROW_GROUPS_METRICS_COLLECTOR_NAME: &str = "prune_row_groups";
type SendableRecordBatchStream = Pin<Box<dyn Stream<Item = Result<ArrowRecordBatch>> + Send>>;
type RecordBatchWithKeyStream = Box<dyn Stream<Item = Result<FetchingRecordBatch>> + Send + Unpin>;
type FetchingRecordBatchStream = Box<dyn Stream<Item = Result<FetchingRecordBatch>> + Send + Unpin>;

pub struct Reader<'a> {
/// The path where the data is persisted.
Expand Down Expand Up @@ -147,7 +147,7 @@ impl<'a> Reader<'a> {
async fn maybe_read_parallelly(
&mut self,
read_parallelism: usize,
) -> Result<Vec<RecordBatchWithKeyStream>> {
) -> Result<Vec<FetchingRecordBatchStream>> {
assert!(read_parallelism > 0);

self.init_if_necessary().await?;
Expand All @@ -156,13 +156,13 @@ impl<'a> Reader<'a> {
return Ok(Vec::new());
}

let row_projector = self.record_fetching_ctx.take().unwrap();
let record_fetching_ctx = self.record_fetching_ctx.take().unwrap();
let streams: Vec<_> = streams
.into_iter()
.map(|stream| {
Box::new(RecordBatchProjector::new(
stream,
row_projector.clone(),
record_fetching_ctx.clone(),
self.metrics.metrics_collector.clone(),
)) as _
})
Expand Down Expand Up @@ -240,7 +240,7 @@ impl<'a> Reader<'a> {
assert!(self.meta_data.is_some());

let meta_data = self.meta_data.as_ref().unwrap();
let row_projector = self.record_fetching_ctx.as_ref().unwrap();
let record_fetching_ctx = self.record_fetching_ctx.as_ref().unwrap();
let arrow_schema = meta_data.custom().schema.to_arrow_schema_ref();
// Get target row groups.
let target_row_groups = {
Expand Down Expand Up @@ -296,7 +296,7 @@ impl<'a> Reader<'a> {
let parquet_metadata = meta_data.parquet();
let proj_mask = ProjectionMask::leaves(
meta_data.parquet().file_metadata().schema_descr(),
row_projector.existed_source_projection().iter().copied(),
record_fetching_ctx.existed_source_projection().iter().copied(),
);
debug!(
"Reader fetch record batches, parallelism suggest:{}, real:{}, chunk_size:{}, project:{:?}",
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/merge_memtable_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use analytic_engine::{
row_iter::{
dedup::DedupIterator,
merge::{MergeBuilder, MergeConfig},
IterOptions, RecordBatchWithKeyIterator,
FetchingRecordBatchIterator, IterOptions,
},
space::SpaceId,
sst::{
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/merge_sst_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use analytic_engine::{
chain::ChainConfig,
dedup::DedupIterator,
merge::{MergeBuilder, MergeConfig},
IterOptions, RecordBatchWithKeyIterator,
FetchingRecordBatchIterator, IterOptions,
},
space::SpaceId,
sst::{
Expand Down
15 changes: 4 additions & 11 deletions common_types/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,18 +456,11 @@ impl FetchingRecordBatch {
Row::from_datums(datums)
}

/// Project the [RecordBatchWithKey] into a [RecordBatch] according to
/// Project the [FetchingRecordBatch] into a [RecordBatch] according to
/// [ProjectedSchema].
///
/// REQUIRE: The schema_with_key of the [RecordBatchWithKey] is the same as
/// the schema_with_key of [ProjectedSchema].
// TODO: how do we ensure `ProjectedSchema` passed here is same as the source
// `ProjectedSchema` of `RecordSchema` here?
pub fn try_project(mut self, projected_schema: &ProjectedSchema) -> Result<RecordBatch> {
// FIXME
// debug_assert_eq!(
// &self.schema,
// projected_schema.as_record_schema_with_key()
// );

// Get the schema after projection.
let record_schema = projected_schema.to_record_schema();
let mut column_blocks = Vec::with_capacity(record_schema.num_columns());
Expand Down Expand Up @@ -717,7 +710,7 @@ impl FetchingRecordBatchBuilder {
}
}

/// Build [RecordBatchWithKey] and reset the builder.
/// Build [FetchingRecordBatch] and reset the builder.
pub fn build(&mut self) -> Result<FetchingRecordBatch> {
let column_blocks: Vec<_> = self
.builders
Expand Down
4 changes: 2 additions & 2 deletions common_types/src/row/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,8 @@ pub trait RowView {
fn column_by_idx(&self, column_idx: usize) -> Datum;
}

// TODO(yingwen): Add a method to get row view on RecordBatchWithKey.
/// A row view on the [RecordBatchWithKey].
// TODO(yingwen): Add a method to get row view on FetchingRecordBatch.
/// A row view on the [FetchingRecordBatch].
///
/// `row_idx < record_batch.num_rows()` is ensured.
#[derive(Debug)]
Expand Down

0 comments on commit 3c0c4c0

Please sign in to comment.