Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions python/pyarrow/io-hdfs.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -413,25 +413,22 @@ cdef class HadoopFileSystem:
&wr_handle))

out.wr_file = <shared_ptr[OutputStream]> wr_handle

out.is_readable = False
out.is_writeable = 1
out.is_writable = True
else:
with nogil:
check_status(self.client.get()
.OpenReadable(c_path, &rd_handle))

out.rd_file = <shared_ptr[RandomAccessFile]> rd_handle
out.is_readable = True
out.is_writeable = 0

if c_buffer_size == 0:
c_buffer_size = 2 ** 16

out.mode = mode
out.buffer_size = c_buffer_size
out.parent = _HdfsFileNanny(self, out)
out.is_open = True
out.closed = False
out.own_file = True

return out
Expand Down
122 changes: 67 additions & 55 deletions python/pyarrow/io.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ cdef extern from "Python.h":


cdef class NativeFile:

def __cinit__(self):
self.is_open = False
self.closed = True
self.own_file = False
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should own_file be true for most of these? I stuck with the existing behavior, but this seems odd to me. For example MemoryMappedFile opens the file itself, why isn't it responsible for closing it on __dealloc__? Are we relying on a C++ destructor to handle that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this arose due to some wacky GC issues with HDFS file handles, I'd have to look back at the original commit that added this to remember. My prior would be that __dealloc__ / __del__ should close the file if it is open. With MemoryMappedFile, this is a no-op because the file descriptor is owned by the mapped buffer, whose lifetime may exceed the file object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I'll leave that for a follow-up PR if that's ok. I believe this attribute is unnecessary for any of the existing file objects.

self.is_readable = False
self.is_writable = False

def __dealloc__(self):
if self.is_open and self.own_file:
if self.own_file and not self.closed:
self.close()

def __enter__(self):
Expand All @@ -65,45 +66,63 @@ cdef class NativeFile:

def __get__(self):
# Emulate built-in file modes
if self.is_readable and self.is_writeable:
if self.is_readable and self.is_writable:
return 'rb+'
elif self.is_readable:
return 'rb'
elif self.is_writeable:
elif self.is_writable:
return 'wb'
else:
raise ValueError('File object is malformed, has no mode')

def readable(self):
self._assert_open()
return self.is_readable

def writable(self):
self._assert_open()
return self.is_writable

def seekable(self):
self._assert_open()
return self.is_readable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be properties?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The seekable/readable/writable? No, this is required for the python file interface. See https://docs.python.org/3/library/io.html#io.IOBase

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roger, thanks


def close(self):
if self.is_open:
if not self.closed:
with nogil:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but I wonder what happens if two threads call close() concurrently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, maybe the nogil block here should be removed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, if rd_file.close() or wr_file.close() may do I/O, you want to release the GIL.
One possible solution is something like:

rd_file, self.rd_file = self.rd_file, None
wr_file, self.wr_file = self.wr_file, None
with nogil:
    if self.is_readable:
        check_status(rd_file.get().close())
    else:
        check_status(rd_file.get().close())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'd work, but prevents calling close again in the case of a failure. It's not actually clear to me what should happen if close fails (reset state?, set closed anayway?). Anyway, should probably open a jira about this, as it's unrelated to this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general guideline for close() is that even if it fails somehow (e.g. IO error), it should still put the object in a "closed" state (and release associated resources).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GIL should be released here -- if any Python resources are required in Close then the GIL will be re-acquired

if self.is_readable:
check_status(self.rd_file.get().Close())
else:
check_status(self.wr_file.get().Close())
self.is_open = False
self.closed = True

def flush(self):
"""Flush the buffer stream, if applicable.

No-op to match the IOBase interface."""
self._assert_open()

cdef read_handle(self, shared_ptr[RandomAccessFile]* file):
self._assert_readable()
file[0] = <shared_ptr[RandomAccessFile]> self.rd_file

cdef write_handle(self, shared_ptr[OutputStream]* file):
self._assert_writeable()
self._assert_writable()
file[0] = <shared_ptr[OutputStream]> self.wr_file

def _assert_open(self):
if not self.is_open:
if self.closed:
raise ValueError("I/O operation on closed file")

def _assert_readable(self):
self._assert_open()
if not self.is_readable:
raise IOError("only valid on readonly files")

def _assert_writeable(self):
def _assert_writable(self):
self._assert_open()
if not self.is_writeable:
raise IOError("only valid on writeable files")
if not self.is_writable:
raise IOError("only valid on writable files")

def size(self):
"""
Expand Down Expand Up @@ -175,7 +194,7 @@ cdef class NativeFile:
Write byte from any object implementing buffer protocol (bytes,
bytearray, ndarray, pyarrow.Buffer)
"""
self._assert_writeable()
self._assert_writable()

if isinstance(data, six.string_types):
data = tobytes(data)
Expand Down Expand Up @@ -224,6 +243,12 @@ cdef class NativeFile:

return PyObject_to_object(obj)

def read1(self, nbytes=None):
"""Read and return up to n bytes.

Alias for read, needed to match the IOBase interface."""
return self.read(nbytes=None)

def read_buffer(self, nbytes=None):
cdef:
int64_t c_nbytes
Expand Down Expand Up @@ -333,7 +358,7 @@ cdef class NativeFile:
Pipe file-like object to file
"""
write_queue = Queue(50)
self._assert_writeable()
self._assert_writable()

buffer_size = buffer_size or DEFAULT_BUFFER_SIZE

Expand Down Expand Up @@ -390,16 +415,14 @@ cdef class PythonFile(NativeFile):

if mode.startswith('w'):
self.wr_file.reset(new PyOutputStream(handle))
self.is_readable = 0
self.is_writeable = 1
self.is_writable = True
elif mode.startswith('r'):
self.rd_file.reset(new PyReadableFile(handle))
self.is_readable = 1
self.is_writeable = 0
self.is_readable = True
else:
raise ValueError('Invalid file mode: {0}'.format(mode))

self.is_open = True
self.closed = False


cdef class MemoryMappedFile(NativeFile):
Expand All @@ -409,11 +432,6 @@ cdef class MemoryMappedFile(NativeFile):
cdef:
object path

def __cinit__(self):
self.is_open = False
self.is_readable = 0
self.is_writeable = 0

@staticmethod
def create(path, size):
cdef:
Expand All @@ -426,11 +444,11 @@ cdef class MemoryMappedFile(NativeFile):

cdef MemoryMappedFile result = MemoryMappedFile()
result.path = path
result.is_readable = 1
result.is_writeable = 1
result.is_readable = True
result.is_writable = True
result.wr_file = <shared_ptr[OutputStream]> handle
result.rd_file = <shared_ptr[RandomAccessFile]> handle
result.is_open = True
result.closed = False

return result

Expand All @@ -444,14 +462,14 @@ cdef class MemoryMappedFile(NativeFile):

if mode in ('r', 'rb'):
c_mode = FileMode_READ
self.is_readable = 1
self.is_readable = True
elif mode in ('w', 'wb'):
c_mode = FileMode_WRITE
self.is_writeable = 1
self.is_writable = True
elif mode in ('r+', 'r+b', 'rb+'):
c_mode = FileMode_READWRITE
self.is_readable = 1
self.is_writeable = 1
self.is_readable = True
self.is_writable = True
else:
raise ValueError('Invalid file mode: {0}'.format(mode))

Expand All @@ -460,7 +478,7 @@ cdef class MemoryMappedFile(NativeFile):

self.wr_file = <shared_ptr[OutputStream]> handle
self.rd_file = <shared_ptr[RandomAccessFile]> handle
self.is_open = True
self.closed = False


def memory_map(path, mode='r'):
Expand All @@ -484,7 +502,7 @@ def memory_map(path, mode='r'):
def create_memory_map(path, size):
"""
Create memory map at indicated path of the given size, return open
writeable file object
writable file object

Parameters
----------
Expand Down Expand Up @@ -513,42 +531,39 @@ cdef class OSFile(NativeFile):
shared_ptr[Readable] handle
c_string c_path = encode_file_path(path)

self.is_readable = self.is_writeable = 0

if mode in ('r', 'rb'):
self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
elif mode in ('w', 'wb'):
self._open_writeable(c_path)
self._open_writable(c_path)
else:
raise ValueError('Invalid file mode: {0}'.format(mode))

self.is_open = True
self.closed = False

cdef _open_readable(self, c_string path, CMemoryPool* pool):
cdef shared_ptr[ReadableFile] handle

with nogil:
check_status(ReadableFile.Open(path, pool, &handle))

self.is_readable = 1
self.is_readable = True
self.rd_file = <shared_ptr[RandomAccessFile]> handle

cdef _open_writeable(self, c_string path):
cdef _open_writable(self, c_string path):
cdef shared_ptr[FileOutputStream] handle

with nogil:
check_status(FileOutputStream.Open(path, &handle))
self.is_writeable = 1
self.is_writable = True
self.wr_file = <shared_ptr[OutputStream]> handle


cdef class FixedSizeBufferWriter(NativeFile):

def __cinit__(self, Buffer buffer):
self.wr_file.reset(new CFixedSizeBufferWriter(buffer.buffer))
self.is_readable = 0
self.is_writeable = 1
self.is_open = True
self.is_writable = True
self.closed = False

def set_memcopy_threads(self, int num_threads):
cdef CFixedSizeBufferWriter* writer = \
Expand Down Expand Up @@ -738,24 +753,22 @@ cdef class BufferOutputStream(NativeFile):
self.buffer = _allocate_buffer(maybe_unbox_memory_pool(memory_pool))
self.wr_file.reset(new CBufferOutputStream(
<shared_ptr[CResizableBuffer]> self.buffer))
self.is_readable = 0
self.is_writeable = 1
self.is_open = True
self.is_writable = True
self.closed = False

def get_result(self):
with nogil:
check_status(self.wr_file.get().Close())
self.is_open = False
self.closed = True
return pyarrow_wrap_buffer(<shared_ptr[CBuffer]> self.buffer)


cdef class MockOutputStream(NativeFile):

def __cinit__(self):
self.wr_file.reset(new CMockOutputStream())
self.is_readable = 0
self.is_writeable = 1
self.is_open = True
self.is_writable = True
self.closed = False

def size(self):
return (<CMockOutputStream*>self.wr_file.get()).GetExtentBytesWritten()
Expand All @@ -780,9 +793,8 @@ cdef class BufferReader(NativeFile):
self.buffer = frombuffer(obj)

self.rd_file.reset(new CBufferReader(self.buffer.buffer))
self.is_readable = 1
self.is_writeable = 0
self.is_open = True
self.is_readable = True
self.closed = False


def frombuffer(object obj):
Expand Down Expand Up @@ -834,8 +846,8 @@ cdef get_writer(object source, shared_ptr[OutputStream]* writer):
if isinstance(source, NativeFile):
nf = source

if not nf.is_writeable:
raise IOError('Native file is not writeable')
if not nf.is_writable:
raise IOError('Native file is not writable')

nf.write_handle(writer)
else:
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/ipc.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ def write_tensor(Tensor tensor, NativeFile dest):
int32_t metadata_length
int64_t body_length

dest._assert_writeable()
dest._assert_writable()

with nogil:
check_status(
Expand Down
4 changes: 2 additions & 2 deletions python/pyarrow/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class RecordBatchStreamWriter(lib._RecordBatchWriter):
Parameters
----------
sink : str, pyarrow.NativeFile, or file-like Python object
Either a file path, or a writeable file object
Either a file path, or a writable file object
schema : pyarrow.Schema
The Arrow schema for data to be written to the file
"""
Expand Down Expand Up @@ -96,7 +96,7 @@ class RecordBatchFileWriter(lib._RecordBatchFileWriter):
Parameters
----------
sink : str, pyarrow.NativeFile, or file-like Python object
Either a file path, or a writeable file object
Either a file path, or a writable file object
schema : pyarrow.Schema
The Arrow schema for data to be written to the file
"""
Expand Down
4 changes: 2 additions & 2 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ cdef class NativeFile:
shared_ptr[RandomAccessFile] rd_file
shared_ptr[OutputStream] wr_file
bint is_readable
bint is_writeable
bint is_open
bint is_writable
readonly bint closed
bint own_file

# By implementing these "virtual" functions (all functions in Cython
Expand Down
Loading