Skip to content

Commit 98d7ff0

Browse files
authored
ARROW-18272: [Python] Support filesystem parameter in ParquetFile (#14717)
Will fix [ARROW-18272](https://issues.apache.org/jira/browse/ARROW-18272) Authored-by: Miles Granger <miles59923@gmail.com> Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
1 parent cab660f commit 98d7ff0

File tree

3 files changed

+48
-6
lines changed

3 files changed

+48
-6
lines changed

python/pyarrow/fs.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ def _ensure_filesystem(
135135

136136

137137
def _resolve_filesystem_and_path(
138-
path, filesystem=None, allow_legacy_filesystem=False
138+
path, filesystem=None, allow_legacy_filesystem=False, memory_map=False
139139
):
140140
"""
141141
Return filesystem/path from path which could be an URI or a plain
@@ -151,7 +151,8 @@ def _resolve_filesystem_and_path(
151151

152152
if filesystem is not None:
153153
filesystem = _ensure_filesystem(
154-
filesystem, allow_legacy_filesystem=allow_legacy_filesystem
154+
filesystem, use_mmap=memory_map,
155+
allow_legacy_filesystem=allow_legacy_filesystem
155156
)
156157
if isinstance(filesystem, LocalFileSystem):
157158
path = _stringify_path(path)
@@ -169,7 +170,8 @@ def _resolve_filesystem_and_path(
169170
# if filesystem is not given, try to automatically determine one
170171
# first check if the file exists as a local (relative) file path
171172
# if not then try to parse the path as an URI
172-
filesystem = LocalFileSystem()
173+
filesystem = LocalFileSystem(use_mmap=memory_map)
174+
173175
try:
174176
file_info = filesystem.get_file_info(path)
175177
except ValueError: # ValueError means path is likely an URI

python/pyarrow/parquet/core.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,10 @@ class ParquetFile:
257257
If not None, override the maximum total size of containers allocated
258258
when decoding Thrift structures. The default limit should be
259259
sufficient for most Parquet files.
260+
filesystem : FileSystem, default None
261+
If nothing passed, will be inferred based on path.
262+
Path will try to be found in the local on-disk filesystem otherwise
263+
it will be parsed as an URI to determine the filesystem.
260264
261265
Examples
262266
--------
@@ -304,7 +308,16 @@ def __init__(self, source, *, metadata=None, common_metadata=None,
304308
read_dictionary=None, memory_map=False, buffer_size=0,
305309
pre_buffer=False, coerce_int96_timestamp_unit=None,
306310
decryption_properties=None, thrift_string_size_limit=None,
307-
thrift_container_size_limit=None):
311+
thrift_container_size_limit=None, filesystem=None):
312+
313+
self._close_source = getattr(source, 'closed', True)
314+
315+
filesystem, source = _resolve_filesystem_and_path(
316+
source, filesystem, memory_map)
317+
if filesystem is not None:
318+
source = filesystem.open_input_file(source)
319+
self._close_source = True # We opened it here, ensure we close it.
320+
308321
self.reader = ParquetReader()
309322
self.reader.open(
310323
source, use_memory_map=memory_map,
@@ -315,7 +328,6 @@ def __init__(self, source, *, metadata=None, common_metadata=None,
315328
thrift_string_size_limit=thrift_string_size_limit,
316329
thrift_container_size_limit=thrift_container_size_limit,
317330
)
318-
self._close_source = getattr(source, 'closed', True)
319331
self.common_metadata = common_metadata
320332
self._nested_paths_by_prefix = self._build_nested_paths()
321333

python/pyarrow/tests/parquet/test_parquet_file.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io
1919
import os
20+
import sys
2021
from unittest import mock
2122

2223
import pytest
@@ -177,8 +178,12 @@ def test_parquet_file_pass_directory_instead_of_file(tempdir):
177178
path = tempdir / 'directory'
178179
os.mkdir(str(path))
179180

180-
with pytest.raises(IOError, match="Expected file path"):
181+
msg = f"Cannot open for reading: path '{str(path)}' is a directory"
182+
with pytest.raises(IOError) as exc:
181183
pq.ParquetFile(path)
184+
if exc.errisinstance(PermissionError) and sys.platform == 'win32':
185+
return # Windows CI can get a PermissionError here.
186+
exc.match(msg)
182187

183188

184189
def test_read_column_invalid_index():
@@ -329,3 +334,26 @@ def test_parquet_file_explicitly_closed(tempdir):
329334
p.read()
330335
assert not p.closed
331336
assert p.closed # parquet file obj reports as closed
337+
338+
339+
@pytest.mark.s3
340+
@pytest.mark.parametrize("use_uri", (True, False))
341+
def test_parquet_file_with_filesystem(tempdir, s3_example_fs, use_uri):
342+
s3_fs, s3_uri, s3_path = s3_example_fs
343+
344+
args = (s3_uri if use_uri else s3_path,)
345+
kwargs = {} if use_uri else dict(filesystem=s3_fs)
346+
347+
table = pa.table({"a": range(10)})
348+
pq.write_table(table, s3_path, filesystem=s3_fs)
349+
350+
parquet_file = pq.ParquetFile(*args, **kwargs)
351+
assert parquet_file.read() == table
352+
assert not parquet_file.closed
353+
parquet_file.close()
354+
assert parquet_file.closed
355+
356+
with pq.ParquetFile(*args, **kwargs) as f:
357+
assert f.read() == table
358+
assert not f.closed
359+
assert f.closed

0 commit comments

Comments
 (0)