Skip to content

Commit b3e873a

Browse files
committed
Use blocking IO in tokio worker
Remove additional logging
1 parent b337d9d commit b3e873a

File tree

1 file changed

+54
-122
lines changed

1 file changed

+54
-122
lines changed

parquet/src/arrow/async_reader.rs

Lines changed: 54 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ use std::ops::Range;
8282
use std::pin::Pin;
8383
use std::sync::Arc;
8484
use std::task::{Context, Poll};
85-
use std::time::{Duration, Instant};
8685

8786
use async_trait::async_trait;
8887
use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
@@ -145,19 +144,20 @@ impl FileStorage {
145144
F: FnOnce(&mut File) -> Result<T> + Send + 'static,
146145
T: Send + 'static,
147146
{
148-
let mut file = self.file.take().expect("FileStorage poisoned");
149-
let (file, result) = tokio::task::spawn_blocking(move || {
150-
let result = f(&mut file);
151-
(file, result)
152-
})
153-
.await
154-
.expect("background task panicked");
155-
156-
self.file = Some(file);
157-
result
158-
159-
// let file = self.file.as_mut().unwrap();
160-
// f(file)
147+
// let mut file = self.file.take().expect("FileStorage poisoned");
148+
// let (file, result) = tokio::task::spawn_blocking(move || {
149+
// let result = f(&mut file);
150+
// (file, result)
151+
// })
152+
// .await
153+
// .expect("background task panicked");
154+
//
155+
// self.file = Some(file);
156+
// result
157+
158+
// TODO: Temporary use blocking file IO in tokio worker
159+
let file = self.file.as_mut().unwrap();
160+
f(file)
161161
}
162162
}
163163

@@ -187,29 +187,13 @@ impl Storage for FileStorage {
187187
let result = ranges
188188
.into_iter()
189189
.map(|range| {
190-
let a = Instant::now();
191-
192190
file.seek(SeekFrom::Start(range.start as u64))?;
193191
let len = range.end - range.start;
194192

195-
let b = Instant::now();
196-
197193
let mut buffer = Vec::with_capacity(len);
198-
199-
let c = Instant::now();
200-
201194
let mut take = file.try_clone()?.take(len as u64);
202195
take.read_to_end(&mut buffer)?;
203196

204-
let d = Instant::now();
205-
206-
println!(
207-
"Seek: {}s, Allocation: {}s, Read: {}s",
208-
b.duration_since(a).as_secs_f64(),
209-
c.duration_since(b).as_secs_f64(),
210-
d.duration_since(c).as_secs_f64()
211-
);
212-
213197
Ok(ByteBufferPtr::new(buffer))
214198
})
215199
.collect();
@@ -350,11 +334,6 @@ impl<T: Storage> ParquetRecordBatchStreamBuilder<T> {
350334
batch_reader: None,
351335
batch_size: self.batch_size,
352336
schema: self.schema,
353-
last_record: Instant::now(),
354-
last_load: Instant::now(),
355-
total_read_duration: Default::default(),
356-
max_read_duration: Default::default(),
357-
min_read_duration: Default::default(),
358337
})
359338
}
360339
}
@@ -370,16 +349,6 @@ pub struct ParquetRecordBatchStream<T: Storage> {
370349
inner: Peekable<RowGroupStream<T>>,
371350

372351
batch_reader: Option<ParquetRecordBatchReader>,
373-
374-
last_record: Instant,
375-
376-
last_load: Instant,
377-
378-
total_read_duration: Duration,
379-
380-
max_read_duration: Duration,
381-
382-
min_read_duration: Duration,
383352
}
384353

385354
impl<T: Storage> std::fmt::Debug for ParquetRecordBatchStream<T> {
@@ -401,7 +370,10 @@ impl<T: Storage> ParquetRecordBatchStream<T> {
401370
impl<T: Storage> Stream for ParquetRecordBatchStream<T> {
402371
type Item = Result<RecordBatch>;
403372

404-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
373+
fn poll_next(
374+
mut self: Pin<&mut Self>,
375+
cx: &mut Context<'_>,
376+
) -> Poll<Option<Self::Item>> {
405377
loop {
406378
if self.error {
407379
return Poll::Pending;
@@ -412,95 +384,52 @@ impl<T: Storage> Stream for ParquetRecordBatchStream<T> {
412384

413385
// Fetch records from batch reader if any available
414386
if let Some(batch_reader) = self.batch_reader.as_mut() {
415-
let t = Instant::now();
416-
let next = batch_reader.next();
417-
418-
let read_duration = t.elapsed();
419-
420-
self.min_read_duration = self.min_read_duration.min(read_duration);
421-
self.max_read_duration = self.max_read_duration.max(read_duration);
422-
self.total_read_duration = self.total_read_duration + read_duration;
423-
424-
let stall = t.duration_since(self.last_record);
425-
426-
if stall > Duration::from_millis(1) {
427-
println!("outer stall for {:.4}s", stall.as_secs_f64());
428-
}
429-
430-
// TODO: Temporary
431-
self.last_record = Instant::now();
432-
match next {
387+
match batch_reader.next() {
433388
Some(Ok(batch)) => return Poll::Ready(Some(Ok(batch))),
434389
Some(Err(e)) => {
435390
self.error = true;
436-
return Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
391+
return Poll::Ready(Some(Err(ParquetError::ArrowError(
392+
e.to_string(),
393+
))));
437394
}
438395
None => {
439396
self.batch_reader = None;
440-
println!(
441-
"Dropped read in: {}",
442-
self.last_record.elapsed().as_secs_f64()
443-
);
444397
}
445398
}
446399
}
447400

448-
println!(
449-
"Inner Pending: {}, {:.4}s",
450-
inner_pending,
451-
self.last_record.elapsed().as_secs_f64()
452-
);
453-
454401
// Batch reader is exhausted, need to wait for inner
455402
match inner_pending {
456403
true => return Poll::Pending,
457-
false => {
458-
let t = Instant::now();
459-
println!(
460-
"inner stall for {:.4}s, last load: {:.4}s, total read: {:.4}s, min read: {:.4}s, max read: {:.4}s",
461-
t.duration_since(self.last_record).as_secs_f64(),
462-
t.duration_since(self.last_load).as_secs_f64(),
463-
self.total_read_duration.as_secs_f64(),
464-
self.min_read_duration.as_secs_f64(),
465-
self.max_read_duration.as_secs_f64(),
466-
);
467-
self.last_record = t;
468-
self.last_load = t;
469-
470-
self.total_read_duration = Duration::from_secs(0);
471-
self.min_read_duration = Duration::from_secs(0);
472-
self.max_read_duration = Duration::from_secs(0);
473-
474-
match self.inner.poll_next_unpin(cx) {
475-
Poll::Ready(Some(Ok(row_group))) => {
476-
let start = Instant::now();
477-
478-
let inner = self.inner.get_ref();
479-
480-
let parquet_schema = inner.metadata.file_metadata().schema_descr_ptr();
481-
482-
let array_reader = build_array_reader(
483-
parquet_schema,
484-
self.schema.clone(),
485-
inner.columns.iter().cloned(),
486-
Box::new(row_group),
487-
)?;
488-
489-
self.batch_reader = Some(
490-
ParquetRecordBatchReader::try_new(self.batch_size, array_reader)
491-
.expect("reader"),
492-
);
493-
494-
println!("Build reader in {:.4}s", start.elapsed().as_secs_f64());
495-
}
496-
Poll::Ready(Some(Err(e))) => {
497-
self.error = true;
498-
return Poll::Ready(Some(Err(e)));
499-
}
500-
Poll::Ready(None) => return Poll::Ready(None),
501-
Poll::Pending => unreachable!("contents peeked"),
404+
false => match self.inner.poll_next_unpin(cx) {
405+
Poll::Ready(Some(Ok(row_group))) => {
406+
let inner = self.inner.get_ref();
407+
408+
let parquet_schema =
409+
inner.metadata.file_metadata().schema_descr_ptr();
410+
411+
let array_reader = build_array_reader(
412+
parquet_schema,
413+
self.schema.clone(),
414+
inner.columns.iter().cloned(),
415+
Box::new(row_group),
416+
)?;
417+
418+
self.batch_reader = Some(
419+
ParquetRecordBatchReader::try_new(
420+
self.batch_size,
421+
array_reader,
422+
)
423+
.expect("reader"),
424+
);
502425
}
503-
}
426+
Poll::Ready(Some(Err(e))) => {
427+
self.error = true;
428+
return Poll::Ready(Some(Err(e)));
429+
}
430+
Poll::Ready(None) => return Poll::Ready(None),
431+
Poll::Pending => unreachable!("contents peeked"),
432+
},
504433
}
505434
}
506435
}
@@ -526,7 +455,10 @@ enum RowGroupStreamState<T> {
526455
impl<T: Storage> Stream for RowGroupStream<T> {
527456
type Item = Result<InMemoryRowGroup>;
528457

529-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
458+
fn poll_next(
459+
mut self: Pin<&mut Self>,
460+
cx: &mut Context<'_>,
461+
) -> Poll<Option<Self::Item>> {
530462
loop {
531463
match &mut self.state {
532464
RowGroupStreamState::Init(storage) => {

0 commit comments

Comments
 (0)