Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(python): Handle current position of file objects #17543

Merged
merged 2 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::io::{Read, Seek};
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;

use arrow::datatypes::ArrowSchemaRef;
Expand Down Expand Up @@ -150,6 +150,8 @@ impl<R: MmapBytesReader + 'static> ParquetReader<R> {
let metadata = self.get_metadata()?.clone();
let schema = self.schema()?;

// XXX: Can a parquet file starts at an offset?
self.reader.seek(SeekFrom::Start(0))?;
let row_group_fetcher = FetchRowGroupsFromMmapReader::new(Box::new(self.reader))?.into();
BatchedParquetReader::new(
row_group_fetcher,
Expand Down
9 changes: 7 additions & 2 deletions crates/polars-io/src/utils/other.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>(
reader: &'a mut R,
) -> PolarsResult<ReaderBytes<'a>> {
// we have a file so we can mmap
if let Some(file) = reader.to_file() {
let mmap = unsafe { memmap::Mmap::map(file)? };
// only seekable files are mmap-able
if let Some((file, offset)) = reader
.stream_position()
.ok()
.and_then(|offset| Some((reader.to_file()?, offset)))
{
let mmap = unsafe { memmap::MmapOptions::new().offset(offset).map(file)? };

// somehow bck thinks borrows alias
// this is sound as file was already bound to 'a
Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/io/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def read_avro(
Path to a file or a file-like object (by "file-like object" we refer to objects
that have a `read()` method, such as a file handler like the builtin `open`
function, or a `BytesIO` instance).
For file-like objects,
stream position may not be updated accordingly after reading.
columns
Columns to select. Accepts a list of column indices (starting at zero) or a list
of column names.
Expand Down
4 changes: 4 additions & 0 deletions py-polars/polars/io/csv/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def read_csv(
that have a `read()` method, such as a file handler like the builtin `open`
function, or a `BytesIO` instance). If `fsspec` is installed, it will be used
to open remote files.
For file-like objects,
stream position may not be updated accordingly after reading.
has_header
Indicate if the first row of the dataset is a header or not. If set to False,
column names will be autogenerated in the following format: `column_x`, with
Expand Down Expand Up @@ -646,6 +648,8 @@ def read_csv_batched(
that have a `read()` method, such as a file handler like the builtin `open`
function, or a `BytesIO` instance). If `fsspec` is installed, it will be used
to open remote files.
For file-like objects,
stream position may not be updated accordingly after reading.
has_header
Indicate if the first row of the dataset is a header or not. If set to False,
column names will be autogenerated in the following format: `column_x`, with
Expand Down
6 changes: 6 additions & 0 deletions py-polars/polars/io/ipc/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ def read_ipc(
that have a `read()` method, such as a file handler like the builtin `open`
function, or a `BytesIO` instance). If `fsspec` is installed, it will be used
to open remote files.
For file-like objects,
stream position may not be updated accordingly after reading.
columns
Columns to select. Accepts a list of column indices (starting at zero) or a list
of column names.
Expand Down Expand Up @@ -199,6 +201,8 @@ def read_ipc_stream(
that have a `read()` method, such as a file handler like the builtin `open`
function, or a `BytesIO` instance). If `fsspec` is installed, it will be used
to open remote files.
For file-like objects,
stream position may not be updated accordingly after reading.
columns
Columns to select. Accepts a list of column indices (starting at zero) or a list
of column names.
Expand Down Expand Up @@ -287,6 +291,8 @@ def read_ipc_schema(source: str | Path | IO[bytes] | bytes) -> dict[str, DataTyp
Path to a file or a file-like object (by "file-like object" we refer to objects
that have a `read()` method, such as a file handler like the builtin `open`
function, or a `BytesIO` instance).
For file-like objects,
stream position may not be updated accordingly after reading.

Returns
-------
Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/io/json/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def read_json(
Path to a file or a file-like object (by "file-like object" we refer to objects
that have a `read()` method, such as a file handler like the builtin `open`
function, or a `BytesIO` instance).
For file-like objects,
stream position may not be updated accordingly after reading.
schema : Sequence of str, (str,DataType) pairs, or a {str:DataType,} dict
The DataFrame schema may be declared in several ways:

Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/io/ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def read_ndjson(
Path to a file or a file-like object (by "file-like object" we refer to objects
that have a `read()` method, such as a file handler like the builtin `open`
function, or a `BytesIO` instance).
For file-like objects,
stream position may not be updated accordingly after reading.
schema : Sequence of str, (str,DataType) pairs, or a {str:DataType,} dict
The DataFrame schema may be declared in several ways:

Expand Down
4 changes: 4 additions & 0 deletions py-polars/polars/io/parquet/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def read_parquet(
that have a `read()` method, such as a file handler like the builtin `open`
function, or a `BytesIO` instance). If the path is a directory, files in that
directory will all be read.
For file-like objects,
stream position may not be updated accordingly after reading.
columns
Columns to select. Accepts a list of column indices (starting at zero) or a list
of column names.
Expand Down Expand Up @@ -273,6 +275,8 @@ def read_parquet_schema(source: str | Path | IO[bytes] | bytes) -> dict[str, Dat
Path to a file or a file-like object (by "file-like object" we refer to objects
that have a `read()` method, such as a file handler like the builtin `open`
function, or a `BytesIO` instance).
For file-like objects,
stream position may not be updated accordingly after reading.

Returns
-------
Expand Down
4 changes: 4 additions & 0 deletions py-polars/polars/io/spreadsheet/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ def read_excel(
Path to a file or a file-like object (by "file-like object" we refer to objects
that have a `read()` method, such as a file handler like the builtin `open`
function, or a `BytesIO` instance).
For file-like objects,
stream position may not be updated accordingly after reading.
sheet_id
Sheet number(s) to convert (set `0` to load all sheets as DataFrames) and
return a `{sheetname:frame,}` dict. (Defaults to `1` if neither this nor
Expand Down Expand Up @@ -383,6 +385,8 @@ def read_ods(
Path to a file or a file-like object (by "file-like object" we refer to objects
that have a `read()` method, such as a file handler like the builtin `open`
function, or a `BytesIO` instance).
For file-like objects,
stream position may not be updated accordingly after reading.
sheet_id
Sheet number(s) to convert, starting from 1 (set `0` to load *all* worksheets
as DataFrames) and return a `{sheetname:frame,}` dict. (Defaults to `1` if
Expand Down
32 changes: 19 additions & 13 deletions py-polars/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,21 @@ fn get_either_file_and_path(
let encoding = encoding.extract::<Cow<str>>()?;
Ok(encoding.eq_ignore_ascii_case("utf-8") || encoding.eq_ignore_ascii_case("utf8"))
};
let flush_file = |py_f: &Bound<PyAny>| -> PyResult<()> {
py_f.getattr("flush")?.call0()?;
Ok(())
};
#[cfg(target_family = "unix")]
if let Some(fd) = ((py_f.is_exact_instance(&io.getattr("FileIO").unwrap())
|| py_f.is_exact_instance(&io.getattr("BufferedReader").unwrap())
|| py_f.is_exact_instance(&io.getattr("BufferedWriter").unwrap())
|| py_f.is_exact_instance(&io.getattr("BufferedRandom").unwrap())
|| py_f.is_exact_instance(&io.getattr("BufferedRWPair").unwrap())
|| (py_f.is_exact_instance(&io.getattr("TextIOWrapper").unwrap())
&& is_utf8_encoding(&py_f)?))
&& (!write || flush_file(&py_f).is_ok()))
if let Some(fd) = (py_f.is_exact_instance(&io.getattr("FileIO").unwrap())
|| (py_f.is_exact_instance(&io.getattr("BufferedReader").unwrap())
|| py_f.is_exact_instance(&io.getattr("BufferedWriter").unwrap())
|| py_f.is_exact_instance(&io.getattr("BufferedRandom").unwrap())
|| py_f.is_exact_instance(&io.getattr("BufferedRWPair").unwrap())
|| (py_f.is_exact_instance(&io.getattr("TextIOWrapper").unwrap())
&& is_utf8_encoding(&py_f)?))
&& if write {
// invalidate read buffer
py_f.call_method0("flush").is_ok()
} else {
// flush write buffer
py_f.call_method1("seek", (0, 1)).is_ok()
})
.then(|| {
py_f.getattr("fileno")
.and_then(|fileno| fileno.call0())
Expand Down Expand Up @@ -261,8 +263,12 @@ fn get_either_file_and_path(
)
.into());
}
// XXX: we have to clear buffer here.
// Is there a better solution?
if write {
flush_file(&py_f)?;
py_f.call_method0("flush")?;
} else {
py_f.call_method1("seek", (0, 1))?;
}
py_f.getattr("buffer")?
} else {
Expand Down
8 changes: 5 additions & 3 deletions py-polars/tests/unit/io/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2240,10 +2240,12 @@ def test_write_csv_raise_on_non_utf8_17328(


@pytest.mark.write_disk()
def test_write_csv_appending_17328(tmp_path: Path) -> None:
def test_write_csv_appending_17543(tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)
df = pl.DataFrame({"col": ["value"]})
with (tmp_path / "append.csv").open("w") as f:
f.write("# test\n")
pl.DataFrame({"col": ["value"]}).write_csv(f)
df.write_csv(f)
with (tmp_path / "append.csv").open("r") as f:
assert f.read() == "# test\ncol\nvalue\n"
assert f.readline() == "# test\n"
assert pl.read_csv(f).equals(df)