Skip to content

Commit f3bc954

Browse files
committed
Support standard IOBase methods on NativeFile
Adds support for most common file methods, adding enough to use `io.TextIOWrapper`. Added attribtes/methods: - `closed` attribute - `readable`, `writable`, `seekable` methods - `read1` alias for `read` to support `TextIOWrapper` on python 2 - No-op `flush` method Also refactored the cython internals a bit, adding default settings for `is_readable` and `is_writable`, which makes subclasses not need to set them in all places. Also renamed `is_writeable` to `is_writable` for common spelling with the standard python method `writable`.
1 parent 5c704bc commit f3bc954

File tree

6 files changed

+136
-69
lines changed

6 files changed

+136
-69
lines changed

python/pyarrow/io-hdfs.pxi

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -413,25 +413,22 @@ cdef class HadoopFileSystem:
413413
&wr_handle))
414414

415415
out.wr_file = <shared_ptr[OutputStream]> wr_handle
416-
417-
out.is_readable = False
418-
out.is_writeable = 1
416+
out.is_writable = True
419417
else:
420418
with nogil:
421419
check_status(self.client.get()
422420
.OpenReadable(c_path, &rd_handle))
423421

424422
out.rd_file = <shared_ptr[RandomAccessFile]> rd_handle
425423
out.is_readable = True
426-
out.is_writeable = 0
427424

428425
if c_buffer_size == 0:
429426
c_buffer_size = 2 ** 16
430427

431428
out.mode = mode
432429
out.buffer_size = c_buffer_size
433430
out.parent = _HdfsFileNanny(self, out)
434-
out.is_open = True
431+
out.closed = False
435432
out.own_file = True
436433

437434
return out

python/pyarrow/io.pxi

Lines changed: 67 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,14 @@ cdef extern from "Python.h":
3939

4040

4141
cdef class NativeFile:
42-
4342
def __cinit__(self):
44-
self.is_open = False
43+
self.closed = True
4544
self.own_file = False
45+
self.is_readable = False
46+
self.is_writable = False
4647

4748
def __dealloc__(self):
48-
if self.is_open and self.own_file:
49+
if self.own_file and not self.closed:
4950
self.close()
5051

5152
def __enter__(self):
@@ -65,45 +66,63 @@ cdef class NativeFile:
6566

6667
def __get__(self):
6768
# Emulate built-in file modes
68-
if self.is_readable and self.is_writeable:
69+
if self.is_readable and self.is_writable:
6970
return 'rb+'
7071
elif self.is_readable:
7172
return 'rb'
72-
elif self.is_writeable:
73+
elif self.is_writable:
7374
return 'wb'
7475
else:
7576
raise ValueError('File object is malformed, has no mode')
7677

78+
def readable(self):
79+
self._assert_open()
80+
return self.is_readable
81+
82+
def writable(self):
83+
self._assert_open()
84+
return self.is_writable
85+
86+
def seekable(self):
87+
self._assert_open()
88+
return self.is_readable
89+
7790
def close(self):
78-
if self.is_open:
91+
if not self.closed:
7992
with nogil:
8093
if self.is_readable:
8194
check_status(self.rd_file.get().Close())
8295
else:
8396
check_status(self.wr_file.get().Close())
84-
self.is_open = False
97+
self.closed = True
98+
99+
def flush(self):
100+
"""Flush the buffer stream, if applicable.
101+
102+
No-op to match the IOBase interface."""
103+
self._assert_open()
85104

86105
cdef read_handle(self, shared_ptr[RandomAccessFile]* file):
87106
self._assert_readable()
88107
file[0] = <shared_ptr[RandomAccessFile]> self.rd_file
89108

90109
cdef write_handle(self, shared_ptr[OutputStream]* file):
91-
self._assert_writeable()
110+
self._assert_writable()
92111
file[0] = <shared_ptr[OutputStream]> self.wr_file
93112

94113
def _assert_open(self):
95-
if not self.is_open:
114+
if self.closed:
96115
raise ValueError("I/O operation on closed file")
97116

98117
def _assert_readable(self):
99118
self._assert_open()
100119
if not self.is_readable:
101120
raise IOError("only valid on readonly files")
102121

103-
def _assert_writeable(self):
122+
def _assert_writable(self):
104123
self._assert_open()
105-
if not self.is_writeable:
106-
raise IOError("only valid on writeable files")
124+
if not self.is_writable:
125+
raise IOError("only valid on writable files")
107126

108127
def size(self):
109128
"""
@@ -175,7 +194,7 @@ cdef class NativeFile:
175194
Write byte from any object implementing buffer protocol (bytes,
176195
bytearray, ndarray, pyarrow.Buffer)
177196
"""
178-
self._assert_writeable()
197+
self._assert_writable()
179198

180199
if isinstance(data, six.string_types):
181200
data = tobytes(data)
@@ -224,6 +243,12 @@ cdef class NativeFile:
224243

225244
return PyObject_to_object(obj)
226245

246+
def read1(self, nbytes=None):
247+
"""Read and return up to n bytes.
248+
249+
Alias for read, needed to match the IOBase interface."""
250+
return self.read(nbytes=None)
251+
227252
def read_buffer(self, nbytes=None):
228253
cdef:
229254
int64_t c_nbytes
@@ -333,7 +358,7 @@ cdef class NativeFile:
333358
Pipe file-like object to file
334359
"""
335360
write_queue = Queue(50)
336-
self._assert_writeable()
361+
self._assert_writable()
337362

338363
buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
339364

@@ -390,16 +415,14 @@ cdef class PythonFile(NativeFile):
390415

391416
if mode.startswith('w'):
392417
self.wr_file.reset(new PyOutputStream(handle))
393-
self.is_readable = 0
394-
self.is_writeable = 1
418+
self.is_writable = True
395419
elif mode.startswith('r'):
396420
self.rd_file.reset(new PyReadableFile(handle))
397-
self.is_readable = 1
398-
self.is_writeable = 0
421+
self.is_readable = True
399422
else:
400423
raise ValueError('Invalid file mode: {0}'.format(mode))
401424

402-
self.is_open = True
425+
self.closed = False
403426

404427

405428
cdef class MemoryMappedFile(NativeFile):
@@ -409,11 +432,6 @@ cdef class MemoryMappedFile(NativeFile):
409432
cdef:
410433
object path
411434

412-
def __cinit__(self):
413-
self.is_open = False
414-
self.is_readable = 0
415-
self.is_writeable = 0
416-
417435
@staticmethod
418436
def create(path, size):
419437
cdef:
@@ -426,11 +444,11 @@ cdef class MemoryMappedFile(NativeFile):
426444

427445
cdef MemoryMappedFile result = MemoryMappedFile()
428446
result.path = path
429-
result.is_readable = 1
430-
result.is_writeable = 1
447+
result.is_readable = True
448+
result.is_writable = True
431449
result.wr_file = <shared_ptr[OutputStream]> handle
432450
result.rd_file = <shared_ptr[RandomAccessFile]> handle
433-
result.is_open = True
451+
result.closed = False
434452

435453
return result
436454

@@ -444,14 +462,14 @@ cdef class MemoryMappedFile(NativeFile):
444462

445463
if mode in ('r', 'rb'):
446464
c_mode = FileMode_READ
447-
self.is_readable = 1
465+
self.is_readable = True
448466
elif mode in ('w', 'wb'):
449467
c_mode = FileMode_WRITE
450-
self.is_writeable = 1
468+
self.is_writable = True
451469
elif mode in ('r+', 'r+b', 'rb+'):
452470
c_mode = FileMode_READWRITE
453-
self.is_readable = 1
454-
self.is_writeable = 1
471+
self.is_readable = True
472+
self.is_writable = True
455473
else:
456474
raise ValueError('Invalid file mode: {0}'.format(mode))
457475

@@ -460,7 +478,7 @@ cdef class MemoryMappedFile(NativeFile):
460478

461479
self.wr_file = <shared_ptr[OutputStream]> handle
462480
self.rd_file = <shared_ptr[RandomAccessFile]> handle
463-
self.is_open = True
481+
self.closed = False
464482

465483

466484
def memory_map(path, mode='r'):
@@ -484,7 +502,7 @@ def memory_map(path, mode='r'):
484502
def create_memory_map(path, size):
485503
"""
486504
Create memory map at indicated path of the given size, return open
487-
writeable file object
505+
writable file object
488506
489507
Parameters
490508
----------
@@ -513,42 +531,39 @@ cdef class OSFile(NativeFile):
513531
shared_ptr[Readable] handle
514532
c_string c_path = encode_file_path(path)
515533

516-
self.is_readable = self.is_writeable = 0
517-
518534
if mode in ('r', 'rb'):
519535
self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
520536
elif mode in ('w', 'wb'):
521-
self._open_writeable(c_path)
537+
self._open_writable(c_path)
522538
else:
523539
raise ValueError('Invalid file mode: {0}'.format(mode))
524540

525-
self.is_open = True
541+
self.closed = False
526542

527543
cdef _open_readable(self, c_string path, CMemoryPool* pool):
528544
cdef shared_ptr[ReadableFile] handle
529545

530546
with nogil:
531547
check_status(ReadableFile.Open(path, pool, &handle))
532548

533-
self.is_readable = 1
549+
self.is_readable = True
534550
self.rd_file = <shared_ptr[RandomAccessFile]> handle
535551

536-
cdef _open_writeable(self, c_string path):
552+
cdef _open_writable(self, c_string path):
537553
cdef shared_ptr[FileOutputStream] handle
538554

539555
with nogil:
540556
check_status(FileOutputStream.Open(path, &handle))
541-
self.is_writeable = 1
557+
self.is_writable = True
542558
self.wr_file = <shared_ptr[OutputStream]> handle
543559

544560

545561
cdef class FixedSizeBufferWriter(NativeFile):
546562

547563
def __cinit__(self, Buffer buffer):
548564
self.wr_file.reset(new CFixedSizeBufferWriter(buffer.buffer))
549-
self.is_readable = 0
550-
self.is_writeable = 1
551-
self.is_open = True
565+
self.is_writable = True
566+
self.closed = False
552567

553568
def set_memcopy_threads(self, int num_threads):
554569
cdef CFixedSizeBufferWriter* writer = \
@@ -738,24 +753,22 @@ cdef class BufferOutputStream(NativeFile):
738753
self.buffer = _allocate_buffer(maybe_unbox_memory_pool(memory_pool))
739754
self.wr_file.reset(new CBufferOutputStream(
740755
<shared_ptr[CResizableBuffer]> self.buffer))
741-
self.is_readable = 0
742-
self.is_writeable = 1
743-
self.is_open = True
756+
self.is_writable = True
757+
self.closed = False
744758

745759
def get_result(self):
746760
with nogil:
747761
check_status(self.wr_file.get().Close())
748-
self.is_open = False
762+
self.closed = True
749763
return pyarrow_wrap_buffer(<shared_ptr[CBuffer]> self.buffer)
750764

751765

752766
cdef class MockOutputStream(NativeFile):
753767

754768
def __cinit__(self):
755769
self.wr_file.reset(new CMockOutputStream())
756-
self.is_readable = 0
757-
self.is_writeable = 1
758-
self.is_open = True
770+
self.is_writable = True
771+
self.closed = False
759772

760773
def size(self):
761774
return (<CMockOutputStream*>self.wr_file.get()).GetExtentBytesWritten()
@@ -780,9 +793,8 @@ cdef class BufferReader(NativeFile):
780793
self.buffer = frombuffer(obj)
781794

782795
self.rd_file.reset(new CBufferReader(self.buffer.buffer))
783-
self.is_readable = 1
784-
self.is_writeable = 0
785-
self.is_open = True
796+
self.is_readable = True
797+
self.closed = False
786798

787799

788800
def frombuffer(object obj):
@@ -834,8 +846,8 @@ cdef get_writer(object source, shared_ptr[OutputStream]* writer):
834846
if isinstance(source, NativeFile):
835847
nf = source
836848

837-
if not nf.is_writeable:
838-
raise IOError('Native file is not writeable')
849+
if not nf.is_writable:
850+
raise IOError('Native file is not writable')
839851

840852
nf.write_handle(writer)
841853
else:

python/pyarrow/ipc.pxi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ def write_tensor(Tensor tensor, NativeFile dest):
429429
int32_t metadata_length
430430
int64_t body_length
431431

432-
dest._assert_writeable()
432+
dest._assert_writable()
433433

434434
with nogil:
435435
check_status(

python/pyarrow/ipc.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class RecordBatchStreamWriter(lib._RecordBatchWriter):
6565
Parameters
6666
----------
6767
sink : str, pyarrow.NativeFile, or file-like Python object
68-
Either a file path, or a writeable file object
68+
Either a file path, or a writable file object
6969
schema : pyarrow.Schema
7070
The Arrow schema for data to be written to the file
7171
"""
@@ -96,7 +96,7 @@ class RecordBatchFileWriter(lib._RecordBatchFileWriter):
9696
Parameters
9797
----------
9898
sink : str, pyarrow.NativeFile, or file-like Python object
99-
Either a file path, or a writeable file object
99+
Either a file path, or a writable file object
100100
schema : pyarrow.Schema
101101
The Arrow schema for data to be written to the file
102102
"""

python/pyarrow/lib.pxd

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,8 @@ cdef class NativeFile:
333333
shared_ptr[RandomAccessFile] rd_file
334334
shared_ptr[OutputStream] wr_file
335335
bint is_readable
336-
bint is_writeable
337-
bint is_open
336+
bint is_writable
337+
readonly bint closed
338338
bint own_file
339339

340340
# By implementing these "virtual" functions (all functions in Cython

0 commit comments

Comments
 (0)