Skip to content

Commit 42d8750

Browse files
committed
Make it clearer that there is only ever a single outstanding IO
1 parent 671938b commit 42d8750

1 file changed

Lines changed: 23 additions & 4 deletions

File tree

datafusion/datasource/src/file_stream/scan_state.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use datafusion_common::internal_datafusion_err;
1819
use std::collections::VecDeque;
1920
use std::task::{Context, Poll};
2021

@@ -36,6 +37,11 @@ use super::{FileStreamMetrics, OnError};
3637
/// unopened files, CPU-ready planners, pending planner I/O, ready morsels,
3738
/// the active reader, and the metrics associated with processing that work.
3839
///
40+
/// # I/O
41+
///
42+
/// To avoid challenges controlling buffering, the ScanState only ever has a
43+
/// single I/O outstanding at any time.
44+
///
3945
/// # State Transitions
4046
///
4147
/// ```text
@@ -70,7 +76,7 @@ pub(super) struct ScanState {
7076
ready_morsels: VecDeque<Box<dyn Morsel>>,
7177
/// The active reader, if any.
7278
reader: Option<BoxStream<'static, Result<RecordBatch>>>,
73-
/// The single planner currently blocked on I/O, if any.
79+
/// The currently outstanding I/O, if any.
7480
pending_planner: Option<PendingMorselPlanner>,
7581
/// Metrics for the active scan queues.
7682
metrics: FileStreamMetrics,
@@ -118,18 +124,18 @@ impl ScanState {
118124
let _processing_timer: ScopedTimerGuard<'_> =
119125
self.metrics.time_processing.timer();
120126

121-
// Try and resolve outstanding IO first
127+
// Try and resolve outstanding IO first. If it is still pending, check
128+
// the current reader or ready morsels before yielding. New planning
129+
// work must still wait for this I/O to resolve.
122130
if let Some(mut pending_planner) = self.pending_planner.take() {
123131
match pending_planner.poll_unpin(cx) {
124132
// IO is still pending
125133
Poll::Pending => {
126134
self.pending_planner = Some(pending_planner);
127-
return ScanAndReturn::Return(Poll::Pending);
128135
}
129136
// IO resolved, and the planner is ready for CPU work
130137
Poll::Ready(Ok(planner)) => {
131138
self.ready_planners.push_back(planner);
132-
return ScanAndReturn::Continue;
133139
}
134140
// IO Error
135141
Poll::Ready(Err(err)) => {
@@ -213,6 +219,13 @@ impl ScanState {
213219
return ScanAndReturn::Continue;
214220
}
215221

222+
// Do not start CPU planning or open another file while planner I/O is
223+
// still outstanding because they may need additional IO and ScanState
224+
// currently only permits a single outstanding IO
225+
if self.pending_planner.is_some() {
226+
return ScanAndReturn::Return(Poll::Pending);
227+
}
228+
216229
// No reader or morsel, so try to produce more work via CPU planning.
217230
if let Some(planner) = self.ready_planners.pop_front() {
218231
return match planner.plan() {
@@ -221,6 +234,12 @@ impl ScanState {
221234
self.ready_morsels.extend(plan.take_morsels());
222235
self.ready_planners.extend(plan.take_ready_planners());
223236
if let Some(pending_planner) = plan.take_pending_planner() {
237+
// should not have planned if we have outstanding I/O
238+
if self.pending_planner.is_some() {
239+
return ScanAndReturn::Error(internal_datafusion_err!(
240+
"Conflicting pending planner state in FileStream ScanState"
241+
));
242+
}
224243
self.pending_planner = Some(pending_planner);
225244
}
226245
ScanAndReturn::Continue

0 commit comments

Comments
 (0)