Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions parquet/src/util/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::{cell::RefCell, cmp, fmt, io::*};
use std::{cmp, fmt, io::*};

use crate::file::reader::Length;

Expand Down Expand Up @@ -47,7 +47,7 @@ impl<T: Read + Seek + Length + TryClone> ParquetReader for T {}
///
/// Designed after `arrow::io::RandomAccessFile` and `std::io::BufReader`
pub struct FileSource<R: ParquetReader> {
reader: RefCell<R>,
reader: R,
start: u64, // start position in a file
end: u64, // end position in a file
buf: Vec<u8>, // buffer where bytes read in advance are stored
Expand All @@ -71,7 +71,7 @@ impl<R: ParquetReader> fmt::Debug for FileSource<R> {
impl<R: ParquetReader> FileSource<R> {
/// Creates new file reader with start and length from a file handle
pub fn new(fd: &R, start: u64, length: usize) -> Self {
let reader = RefCell::new(fd.try_clone().unwrap());
let reader = fd.try_clone().unwrap();
Self {
reader,
start,
Expand All @@ -89,9 +89,8 @@ impl<R: ParquetReader> FileSource<R> {
// Branch using `>=` instead of the more correct `==`
// to tell the compiler that the pos..cap slice is always valid.
debug_assert!(self.buf_pos == self.buf_cap);
let mut reader = self.reader.borrow_mut();
reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading
self.buf_cap = reader.read(&mut self.buf)?;
self.reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading
Copy link
Contributor Author

@tustvold tustvold Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is technically still racy because cloned file descriptor share a seek position, but the refcell wasn't helping prevent this

self.buf_cap = self.reader.read(&mut self.buf)?;
self.buf_pos = 0;
}
Ok(&self.buf[self.buf_pos..self.buf_cap])
Expand All @@ -102,9 +101,8 @@ impl<R: ParquetReader> FileSource<R> {
self.buf_pos = 0;
self.buf_cap = 0;
// read directly into param buffer
let mut reader = self.reader.borrow_mut();
reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading
let nread = reader.read(buf)?;
self.reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading
let nread = self.reader.read(buf)?;
self.start += nread as u64;
Ok(nread)
}
Expand Down