diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs index f5813f82c7..a2929ff3aa 100644 --- a/table_engine/src/provider.rs +++ b/table_engine/src/provider.rs @@ -17,7 +17,7 @@ use std::{ any::Any, fmt, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, atomic::{AtomicUsize, Ordering}}, time::{Duration, Instant}, }; @@ -312,14 +312,19 @@ pub struct ScanTable { table: TableRef, request: ReadRequest, stream_state: Mutex, + + // FIXME: in origin partitioned table scan need to modify the parallelism when initializing stream... + parallelism: AtomicUsize, } impl ScanTable { pub fn new(table: TableRef, request: ReadRequest) -> Self { + let parallelism = AtomicUsize::new(request.opts.read_parallelism); Self { table, request, stream_state: Mutex::new(ScanStreamState::default()), + parallelism, } } @@ -331,6 +336,7 @@ impl ScanTable { return Ok(()); } stream_state.init(read_res); + self.parallelism.store(stream_state.streams.len(), Ordering::Relaxed); Ok(()) } @@ -348,7 +354,7 @@ impl ExecutionPlan for ScanTable { fn output_partitioning(&self) -> Partitioning { // It represents how current node map the inputs to outputs. // However, we have no inputs here, so `UnknownPartitioning` is suitable. - Partitioning::UnknownPartitioning(self.request.opts.read_parallelism) + Partitioning::UnknownPartitioning(self.parallelism.load(Ordering::Relaxed)) } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { diff --git a/table_engine/src/stream.rs b/table_engine/src/stream.rs index ccfa4e026d..6420fcaf1a 100644 --- a/table_engine/src/stream.rs +++ b/table_engine/src/stream.rs @@ -140,7 +140,7 @@ impl RecordBatchStream for FromDfStream { pub struct ScanStreamState { inited: bool, err: Option, - streams: Vec>, + pub streams: Vec>, } impl ScanStreamState {