- 
                Notifications
    You must be signed in to change notification settings 
- Fork 3.9k
ARROW-2036: [Python] Support standard IOBase methods on NativeFile #1517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -39,13 +39,14 @@ cdef extern from "Python.h": | |
|  | ||
|  | ||
| cdef class NativeFile: | ||
|  | ||
| def __cinit__(self): | ||
| self.is_open = False | ||
| self.closed = True | ||
| self.own_file = False | ||
| self.is_readable = False | ||
| self.is_writable = False | ||
|  | ||
| def __dealloc__(self): | ||
| if self.is_open and self.own_file: | ||
| if self.own_file and not self.closed: | ||
| self.close() | ||
|  | ||
| def __enter__(self): | ||
|  | @@ -65,45 +66,63 @@ cdef class NativeFile: | |
|  | ||
| def __get__(self): | ||
| # Emulate built-in file modes | ||
| if self.is_readable and self.is_writeable: | ||
| if self.is_readable and self.is_writable: | ||
| return 'rb+' | ||
| elif self.is_readable: | ||
| return 'rb' | ||
| elif self.is_writeable: | ||
| elif self.is_writable: | ||
| return 'wb' | ||
| else: | ||
| raise ValueError('File object is malformed, has no mode') | ||
|  | ||
| def readable(self): | ||
| self._assert_open() | ||
| return self.is_readable | ||
|  | ||
| def writable(self): | ||
| self._assert_open() | ||
| return self.is_writable | ||
|  | ||
| def seekable(self): | ||
| self._assert_open() | ||
| return self.is_readable | ||
|          | ||
|  | ||
| def close(self): | ||
| if self.is_open: | ||
| if not self.closed: | ||
| with nogil: | ||
|          | ||
| if self.is_readable: | ||
| check_status(self.rd_file.get().Close()) | ||
| else: | ||
| check_status(self.wr_file.get().Close()) | ||
| self.is_open = False | ||
| self.closed = True | ||
|  | ||
| def flush(self): | ||
| """Flush the buffer stream, if applicable. | ||
|  | ||
| No-op to match the IOBase interface.""" | ||
| self._assert_open() | ||
|  | ||
| cdef read_handle(self, shared_ptr[RandomAccessFile]* file): | ||
| self._assert_readable() | ||
| file[0] = <shared_ptr[RandomAccessFile]> self.rd_file | ||
|  | ||
| cdef write_handle(self, shared_ptr[OutputStream]* file): | ||
| self._assert_writeable() | ||
| self._assert_writable() | ||
| file[0] = <shared_ptr[OutputStream]> self.wr_file | ||
|  | ||
| def _assert_open(self): | ||
| if not self.is_open: | ||
| if self.closed: | ||
| raise ValueError("I/O operation on closed file") | ||
|  | ||
| def _assert_readable(self): | ||
| self._assert_open() | ||
| if not self.is_readable: | ||
| raise IOError("only valid on readonly files") | ||
|  | ||
| def _assert_writeable(self): | ||
| def _assert_writable(self): | ||
| self._assert_open() | ||
| if not self.is_writeable: | ||
| raise IOError("only valid on writeable files") | ||
| if not self.is_writable: | ||
| raise IOError("only valid on writable files") | ||
|  | ||
| def size(self): | ||
| """ | ||
|  | @@ -175,7 +194,7 @@ cdef class NativeFile: | |
| Write byte from any object implementing buffer protocol (bytes, | ||
| bytearray, ndarray, pyarrow.Buffer) | ||
| """ | ||
| self._assert_writeable() | ||
| self._assert_writable() | ||
|  | ||
| if isinstance(data, six.string_types): | ||
| data = tobytes(data) | ||
|  | @@ -224,6 +243,12 @@ cdef class NativeFile: | |
|  | ||
| return PyObject_to_object(obj) | ||
|  | ||
| def read1(self, nbytes=None): | ||
| """Read and return up to n bytes. | ||
|  | ||
| Alias for read, needed to match the IOBase interface.""" | ||
| return self.read(nbytes=None) | ||
|  | ||
| def read_buffer(self, nbytes=None): | ||
| cdef: | ||
| int64_t c_nbytes | ||
|  | @@ -333,7 +358,7 @@ cdef class NativeFile: | |
| Pipe file-like object to file | ||
| """ | ||
| write_queue = Queue(50) | ||
| self._assert_writeable() | ||
| self._assert_writable() | ||
|  | ||
| buffer_size = buffer_size or DEFAULT_BUFFER_SIZE | ||
|  | ||
|  | @@ -390,16 +415,14 @@ cdef class PythonFile(NativeFile): | |
|  | ||
| if mode.startswith('w'): | ||
| self.wr_file.reset(new PyOutputStream(handle)) | ||
| self.is_readable = 0 | ||
| self.is_writeable = 1 | ||
| self.is_writable = True | ||
| elif mode.startswith('r'): | ||
| self.rd_file.reset(new PyReadableFile(handle)) | ||
| self.is_readable = 1 | ||
| self.is_writeable = 0 | ||
| self.is_readable = True | ||
| else: | ||
| raise ValueError('Invalid file mode: {0}'.format(mode)) | ||
|  | ||
| self.is_open = True | ||
| self.closed = False | ||
|  | ||
|  | ||
| cdef class MemoryMappedFile(NativeFile): | ||
|  | @@ -409,11 +432,6 @@ cdef class MemoryMappedFile(NativeFile): | |
| cdef: | ||
| object path | ||
|  | ||
| def __cinit__(self): | ||
| self.is_open = False | ||
| self.is_readable = 0 | ||
| self.is_writeable = 0 | ||
|  | ||
| @staticmethod | ||
| def create(path, size): | ||
| cdef: | ||
|  | @@ -426,11 +444,11 @@ cdef class MemoryMappedFile(NativeFile): | |
|  | ||
| cdef MemoryMappedFile result = MemoryMappedFile() | ||
| result.path = path | ||
| result.is_readable = 1 | ||
| result.is_writeable = 1 | ||
| result.is_readable = True | ||
| result.is_writable = True | ||
| result.wr_file = <shared_ptr[OutputStream]> handle | ||
| result.rd_file = <shared_ptr[RandomAccessFile]> handle | ||
| result.is_open = True | ||
| result.closed = False | ||
|  | ||
| return result | ||
|  | ||
|  | @@ -444,14 +462,14 @@ cdef class MemoryMappedFile(NativeFile): | |
|  | ||
| if mode in ('r', 'rb'): | ||
| c_mode = FileMode_READ | ||
| self.is_readable = 1 | ||
| self.is_readable = True | ||
| elif mode in ('w', 'wb'): | ||
| c_mode = FileMode_WRITE | ||
| self.is_writeable = 1 | ||
| self.is_writable = True | ||
| elif mode in ('r+', 'r+b', 'rb+'): | ||
| c_mode = FileMode_READWRITE | ||
| self.is_readable = 1 | ||
| self.is_writeable = 1 | ||
| self.is_readable = True | ||
| self.is_writable = True | ||
| else: | ||
| raise ValueError('Invalid file mode: {0}'.format(mode)) | ||
|  | ||
|  | @@ -460,7 +478,7 @@ cdef class MemoryMappedFile(NativeFile): | |
|  | ||
| self.wr_file = <shared_ptr[OutputStream]> handle | ||
| self.rd_file = <shared_ptr[RandomAccessFile]> handle | ||
| self.is_open = True | ||
| self.closed = False | ||
|  | ||
|  | ||
| def memory_map(path, mode='r'): | ||
|  | @@ -484,7 +502,7 @@ def memory_map(path, mode='r'): | |
| def create_memory_map(path, size): | ||
| """ | ||
| Create memory map at indicated path of the given size, return open | ||
| writeable file object | ||
| writable file object | ||
|  | ||
| Parameters | ||
| ---------- | ||
|  | @@ -513,42 +531,39 @@ cdef class OSFile(NativeFile): | |
| shared_ptr[Readable] handle | ||
| c_string c_path = encode_file_path(path) | ||
|  | ||
| self.is_readable = self.is_writeable = 0 | ||
|  | ||
| if mode in ('r', 'rb'): | ||
| self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool)) | ||
| elif mode in ('w', 'wb'): | ||
| self._open_writeable(c_path) | ||
| self._open_writable(c_path) | ||
| else: | ||
| raise ValueError('Invalid file mode: {0}'.format(mode)) | ||
|  | ||
| self.is_open = True | ||
| self.closed = False | ||
|  | ||
| cdef _open_readable(self, c_string path, CMemoryPool* pool): | ||
| cdef shared_ptr[ReadableFile] handle | ||
|  | ||
| with nogil: | ||
| check_status(ReadableFile.Open(path, pool, &handle)) | ||
|  | ||
| self.is_readable = 1 | ||
| self.is_readable = True | ||
| self.rd_file = <shared_ptr[RandomAccessFile]> handle | ||
|  | ||
| cdef _open_writeable(self, c_string path): | ||
| cdef _open_writable(self, c_string path): | ||
| cdef shared_ptr[FileOutputStream] handle | ||
|  | ||
| with nogil: | ||
| check_status(FileOutputStream.Open(path, &handle)) | ||
| self.is_writeable = 1 | ||
| self.is_writable = True | ||
| self.wr_file = <shared_ptr[OutputStream]> handle | ||
|  | ||
|  | ||
| cdef class FixedSizeBufferWriter(NativeFile): | ||
|  | ||
| def __cinit__(self, Buffer buffer): | ||
| self.wr_file.reset(new CFixedSizeBufferWriter(buffer.buffer)) | ||
| self.is_readable = 0 | ||
| self.is_writeable = 1 | ||
| self.is_open = True | ||
| self.is_writable = True | ||
| self.closed = False | ||
|  | ||
| def set_memcopy_threads(self, int num_threads): | ||
| cdef CFixedSizeBufferWriter* writer = \ | ||
|  | @@ -738,24 +753,22 @@ cdef class BufferOutputStream(NativeFile): | |
| self.buffer = _allocate_buffer(maybe_unbox_memory_pool(memory_pool)) | ||
| self.wr_file.reset(new CBufferOutputStream( | ||
| <shared_ptr[CResizableBuffer]> self.buffer)) | ||
| self.is_readable = 0 | ||
| self.is_writeable = 1 | ||
| self.is_open = True | ||
| self.is_writable = True | ||
| self.closed = False | ||
|  | ||
| def get_result(self): | ||
| with nogil: | ||
| check_status(self.wr_file.get().Close()) | ||
| self.is_open = False | ||
| self.closed = True | ||
| return pyarrow_wrap_buffer(<shared_ptr[CBuffer]> self.buffer) | ||
|  | ||
|  | ||
| cdef class MockOutputStream(NativeFile): | ||
|  | ||
| def __cinit__(self): | ||
| self.wr_file.reset(new CMockOutputStream()) | ||
| self.is_readable = 0 | ||
| self.is_writeable = 1 | ||
| self.is_open = True | ||
| self.is_writable = True | ||
| self.closed = False | ||
|  | ||
| def size(self): | ||
| return (<CMockOutputStream*>self.wr_file.get()).GetExtentBytesWritten() | ||
|  | @@ -780,9 +793,8 @@ cdef class BufferReader(NativeFile): | |
| self.buffer = frombuffer(obj) | ||
|  | ||
| self.rd_file.reset(new CBufferReader(self.buffer.buffer)) | ||
| self.is_readable = 1 | ||
| self.is_writeable = 0 | ||
| self.is_open = True | ||
| self.is_readable = True | ||
| self.closed = False | ||
|  | ||
|  | ||
| def frombuffer(object obj): | ||
|  | @@ -834,8 +846,8 @@ cdef get_writer(object source, shared_ptr[OutputStream]* writer): | |
| if isinstance(source, NativeFile): | ||
| nf = source | ||
|  | ||
| if not nf.is_writeable: | ||
| raise IOError('Native file is not writeable') | ||
| if not nf.is_writable: | ||
| raise IOError('Native file is not writable') | ||
|  | ||
| nf.write_handle(writer) | ||
| else: | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should
own_filebe true for most of these? I stuck with the existing behavior, but this seems odd to me. For exampleMemoryMappedFileopens the file itself, why isn't it responsible for closing it on__dealloc__? Are we relying on a C++ destructor to handle that?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this arose due to some wacky GC issues with HDFS file handles, I'd have to look back at the original commit that added this to remember. My prior would be that
__dealloc__/__del__should close the file if it is open. WithMemoryMappedFile, this is a no-op because the file descriptor is owned by the mapped buffer, whose lifetime may exceed the file objectThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I'll leave that for a follow-up PR if that's ok. I believe this attribute is unnecessary for any of the existing file objects.