Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python bindings: implement __arrow_c_stream__() interface for ogr.Layer #9043

Merged
merged 2 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions autotest/ogr/ogr_mem.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,54 @@ def test_ogr_mem_alter_geom_field_defn():
assert lyr.GetSpatialRef() is None


###############################################################################
# Test ogr.Layer.__arrow_c_stream__() interface.
# Cf https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html


@gdaltest.enable_exceptions()
def test_ogr_mem_arrow_stream_pycapsule_interface():
import ctypes

ds = ogr.GetDriverByName("Memory").CreateDataSource("")
lyr = ds.CreateLayer("foo")

stream = lyr.__arrow_c_stream__()
assert stream
t = type(stream)
assert t.__module__ == "builtins"
assert t.__name__ == "PyCapsule"
rouault marked this conversation as resolved.
Show resolved Hide resolved
capsule_get_name = ctypes.pythonapi.PyCapsule_GetName
capsule_get_name.argtypes = [ctypes.py_object]
capsule_get_name.restype = ctypes.c_char_p
assert capsule_get_name(ctypes.py_object(stream)) == b"arrow_array_stream"

with pytest.raises(
Exception, match="An arrow Arrow Stream is in progress on that layer"
):
lyr.__arrow_c_stream__()

del stream

stream = lyr.__arrow_c_stream__()
assert stream
del stream

with pytest.raises(Exception, match="requested_schema != None not implemented"):
# "something" should rather by a PyCapsule with an ArrowSchema...
lyr.__arrow_c_stream__(requested_schema="something")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think requested_schema was added for future-proofing, but no one implements it yet. So it's fine to ignore


# Also test GetArrowArrayStreamInterface() to be able to specify options
stream = lyr.GetArrowArrayStreamInterface(
{"INCLUDE_FID": "NO"}
).__arrow_c_stream__()
assert stream
t = type(stream)
assert t.__module__ == "builtins"
assert t.__name__ == "PyCapsule"
del stream


###############################################################################


Expand Down
41 changes: 41 additions & 0 deletions swig/include/ogr.i
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,22 @@ public:
}; /* class ArrowArrayStream */
#endif

#ifdef SWIGPYTHON
// Implements __arrow_c_stream__ export interface:
// https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html#create-a-pycapsule
%{
static void ReleaseArrowArrayStreamPyCapsule(PyObject* capsule) {
struct ArrowArrayStream* stream =
(struct ArrowArrayStream*)PyCapsule_GetPointer(capsule, "arrow_array_stream");
if (stream->release != NULL) {
stream->release(stream);
}
CPLFree(stream);
}
%}

#endif

/************************************************************************/
/* OGRLayer */
/************************************************************************/
Expand Down Expand Up @@ -1507,6 +1523,31 @@ public:

#ifdef SWIGPYTHON

PyObject* ExportArrowArrayStreamPyCapsule(char** options = NULL)
{
struct ArrowArrayStream* stream =
(struct ArrowArrayStream*)CPLMalloc(sizeof(struct ArrowArrayStream));

const int success = OGR_L_GetArrowStream(self, stream, options);

PyObject* ret;
SWIG_PYTHON_THREAD_BEGIN_BLOCK;
if( success )
{
ret = PyCapsule_New(stream, "arrow_array_stream", ReleaseArrowArrayStreamPyCapsule);
}
else
{
CPLFree(stream);
Py_INCREF(Py_None);
ret = Py_None;
}

SWIG_PYTHON_THREAD_END_BLOCK;

return ret;
}

%newobject GetArrowStream;
ArrowArrayStream* GetArrowStream(char** options = NULL) {
struct ArrowArrayStream* stream = (struct ArrowArrayStream* )malloc(sizeof(struct ArrowArrayStream));
Expand Down
84 changes: 84 additions & 0 deletions swig/include/python/ogr_python.i
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,90 @@ def ReleaseResultSet(self, sql_lyr):
schema = property(schema)


def __arrow_c_stream__(self, requested_schema=None):
"""
Export to a C ArrowArrayStream PyCapsule, according to
https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html

Also note that only one active stream can be queried at a time for a
given layer.

To specify options how the ArrowStream should be generated, use
the GetArrowArrayStreamInterface(self, options) method

Parameters
----------
requested_schema : PyCapsule, default None
The schema to which the stream should be casted, passed as a
PyCapsule containing a C ArrowSchema representation of the
requested schema.
Currently, this is not supported and will raise a
NotImplementedError if the schema is not None

Returns
-------
PyCapsule
A capsule containing a C ArrowArrayStream struct.
"""

if requested_schema is not None:
raise NotImplementedError("requested_schema != None not implemented")

return self.ExportArrowArrayStreamPyCapsule()


def GetArrowArrayStreamInterface(self, options = []):
"""
Return a proxy object that implements the __arrow_c_stream__() method,
but allows the user to pass options.

Parameters
----------
options : List of strings or dict with options such as INCLUDE_FID=NO, MAX_FEATURES_IN_BATCH=<number>, etc.

Returns
-------
a proxy object which implements the __arrow_c_stream__() method
"""

class ArrowArrayStreamInterface:
def __init__(self, lyr, options):
self.lyr = lyr
self.options = options

def __arrow_c_stream__(self, requested_schema=None):
"""
Export to a C ArrowArrayStream PyCapsule, according to
https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html

Also note that only one active stream can be queried at a time for a
given layer.

To specify options how the ArrowStream should be generated, use
the GetArrowArrayStreamInterface(self, options) method

Parameters
----------
requested_schema : PyCapsule, default None
The schema to which the stream should be casted, passed as a
PyCapsule containing a C ArrowSchema representation of the
requested schema.
Currently, this is not supported and will raise a
NotImplementedError if the schema is not None

Returns
-------
PyCapsule
A capsule containing a C ArrowArrayStream struct.
"""
if requested_schema is not None:
raise NotImplementedError("requested_schema != None not implemented")

return self.lyr.ExportArrowArrayStreamPyCapsule(self.options)

return ArrowArrayStreamInterface(self, options)


def GetArrowStreamAsPyArrow(self, options = []):
""" Return an ArrowStream as PyArrow Schema and Array objects """

Expand Down
Loading