Skip to content

Commit

Permalink
revert the original partitioned table scan logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Aug 14, 2023
1 parent 248af2c commit 78a2b5e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
10 changes: 8 additions & 2 deletions table_engine/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::{
any::Any,
fmt,
sync::{Arc, Mutex},
sync::{Arc, Mutex, atomic::{AtomicUsize, Ordering}},
time::{Duration, Instant},
};

Expand Down Expand Up @@ -312,14 +312,19 @@ pub struct ScanTable {
table: TableRef,
request: ReadRequest,
stream_state: Mutex<ScanStreamState>,

// FIXME: in origin partitioned table scan need to modify the parallelism when initializing stream...
parallelism: AtomicUsize,
}

impl ScanTable {
pub fn new(table: TableRef, request: ReadRequest) -> Self {
let parallelism = AtomicUsize::new(request.opts.read_parallelism);
Self {
table,
request,
stream_state: Mutex::new(ScanStreamState::default()),
parallelism,
}
}

Expand All @@ -331,6 +336,7 @@ impl ScanTable {
return Ok(());
}
stream_state.init(read_res);
self.parallelism.store(stream_state.streams.len(), Ordering::Relaxed);

Ok(())
}
Expand All @@ -348,7 +354,7 @@ impl ExecutionPlan for ScanTable {
fn output_partitioning(&self) -> Partitioning {
// It represents how current node map the inputs to outputs.
// However, we have no inputs here, so `UnknownPartitioning` is suitable.
Partitioning::UnknownPartitioning(self.request.opts.read_parallelism)
Partitioning::UnknownPartitioning(self.parallelism.load(Ordering::Relaxed))
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
Expand Down
2 changes: 1 addition & 1 deletion table_engine/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl RecordBatchStream for FromDfStream {
pub struct ScanStreamState {
inited: bool,
err: Option<table::Error>,
streams: Vec<Option<SendableRecordBatchStream>>,
pub streams: Vec<Option<SendableRecordBatchStream>>,
}

impl ScanStreamState {
Expand Down

0 comments on commit 78a2b5e

Please sign in to comment.