Skip to content

Commit

Permalink
Doc: add more doc for OGRLayer::GetArrowSchema()
Browse files Browse the repository at this point in the history
  • Loading branch information
rouault committed Jun 14, 2022
1 parent b0e65dc commit 0946e21
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 4 deletions.
287 changes: 287 additions & 0 deletions doc/source/tutorials/vector_api_tut.rst
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,293 @@ In Python:
ds = None
.. _vector_api_tut_arrow_stream:

Reading From OGR using the Arrow C Stream data interface
--------------------------------------------------------

.. versionadded:: 3.6

Instead of retrieving features one at a time, it is also possible to retrive
them by batches, with a column-oriented memory layout, using the
:cpp:func:`OGRLayer::GetArrowStream` method. Note that this method is more
difficult to use than the traditional :cpp:func:`OGRLayer::GetNextFeature` approach,
and is only advised when compatibility with the
`Apache Arrow C Stream interface <https://arrow.apache.org/docs/format/CStreamInterface.html>`_
is needed, or when column-oriented consumption of layers is required.

Pending using an helper library, consumption of the Arrow C Stream interface
requires reading of the following documents:

- `Arrow C Stream interface <https://arrow.apache.org/docs/format/CStreamInterface.html>`_
- `Arrow C data interface <https://arrow.apache.org/docs/format/CDataInterface.html>`_
- `Arrow Columnar Format <https://arrow.apache.org/docs/format/Columnar.html>`_.

The Arrow C Stream interface interface consists of a set of C structures, ArrowArrayStream, that provides
two main callbacks to get:

- a ArrowSchema with the get_schema() callback. A ArrowSchema describes a set of
field descriptions (name, type, metadata). All OGR data types have a corresponding
Arrow data type.

- a sequence of ArrowArray with the get_next() callback. A ArrowArray captures
a set of values for a specific column/field in a subset of features.
This is the equivalent of a
`Series <https://arrow.apache.org/docs/python/pandas.html#series>`_ in a Pandas DataFrame.
This is a potentially hiearchical structure that can aggregate
sub arrays, and in OGR usage, the main array will be a StructArray which is
the collection of OGR attribute and geometry fields.
The layout of buffers and children arrays per data type is detailed in the
`Arrow Columnar Format <https://arrow.apache.org/docs/format/Columnar.html>`_.

If a layer consists of 4 features with 2 fields (one of integer type, one of
floating-point type), the representation as a ArrowArray is *conceptually* the
following one:

.. code-block:: c
array.children[0].buffers[1] = { 1, 2, 3, 4 };
array.children[1].buffers[1] = { 1.2, 2.3, 3.4, 4.5 };
The content of a whole layer can be seen as a sequence of record batches, each
record batches being an ArrowArray of a subset of features. Instead of iterating
over individual features, one iterates over a batch of several features at
once.

The ArrowArrayStream, ArrowSchema, ArrowArray structures are defined in a
ogr_recordbatch.h public header file, directly derived from
https://github.com/apache/arrow/blob/master/cpp/src/arrow/c/abi.h
to get API/ABI compatibility with Apache Arrow C++. This header file must be
explicitly included when the related array batch API is used.

The GetArrowStream() method has the followin signature:

.. code-block:: cpp
virtual bool OGRLayer::GetArrowStream(struct ArrowArrayStream* out_stream,
CSLConstList papszOptions = nullptr);
It is also available in the C API as :cpp:func:`OGR_L_GetArrowStream`.

out_stream is a pointer to a ArrowArrayStream structure, that can be in a uninitialized
state (the method will ignore any initial content).

On successful return, and when the stream interfaces is no longer needed, it must must
be freed with out_stream->release(out_stream).

There are extra precautions to take into account in a OGR context. Unless
otherwise specified by a particular driver implementation, the ArrowArrayStream
structure, and the ArrowSchema or ArrowArray objects its callbacks have returned,
should no longer be used (except for potentially being released) after the
OGRLayer from which it was initialized has been destroyed (typically at dataset
closing). Furthermore, unless otherwise specified by a particular driver
implementation, only one ArrowArrayStream can be active at a time on
a given layer (that is the last active one must be explicitly released before
a next one is asked). Changing filter state, ignored columns, modifying the schema
or using ResetReading()/GetNextFeature() while using a ArrowArrayStream is
strongly discouraged and may lead to unexpected results. As a rule of thumb,
no OGRLayer methods that affect the state of a layer should be called on a
layer, while an ArrowArrayStream on it is active.

The papszOptions that may be provided is a NULL terminated list of key=value
strings, that may be driver specific.

OGRLayer has a base implementation of GetArrowStream() that is such:

- The get_schema() callback returns a schema whose top-level object returned is
of type Struct, and whose children consist of the FID column, all OGR attribute
fields and geometry fields to Arrow fields.
The FID column may be omitted by providing the INCLUDE_FID=NO option.

When get_schema() returns 0, and the schema is no longer needed, it must
be released with the following procedure, to take into account that it might
have been released by other code, as documented in the Arrow C data
interface:

.. code-block:: c
if( out_schema->release )
out_schema->release(out_schema)
- The get_next() callback retrieve the next record batch over the layer.

out_array is a pointer to a ArrowArray structure, that can be in a uninitialized
state (the method will ignore any initial content).

The default implementation uses GetNextFeature() internally to retrieve batches
of up to 65,536 features (configurable with the MAX_FEATURES_IN_BATCH=num option).
The starting address of buffers allocated by the
default implementation is aligned on 64-byte boundaries.

The default implementation outputs geometries as WKB in a binary field,
whose corresponding entry in the schema is marked with the metadata item
``ARROW:extension:name`` set to ``ogc.wkb``. Specialized implementations may output
by default other formats (particularly the Arrow driver that can return geometries
encoded according to the GeoArrow specification (using a list of coordinates).
The GEOMETRY_ENCODING=WKB option can be passed to force the use of WKB (through
the default implementation)

The method may take into account ignored fields set with SetIgnoredFields() (the
default implementation does), and should take into account filters set with
SetSpatialFilter() and SetAttributeFilter(). Note however that specialized implementations
may fallback to the default (slower) implementation when filters are set.

Mixing calls to GetNextFeature() and get_next() is not recommended, as
the behaviour will be unspecified (but it should not crash).

When get_next() returns 0, and the array is no longer needed, it must
be released with the following procedure, to take into account that it might
have been released by other code, as documented in the Arrow C data
interface:

.. code-block:: c
if( out_array->release )
out_array->release(out_array)
Drivers that have a specialized implementation advertize the
new OLCFastGetArrowStream layer capability.

Using directly (as a producer or a consumer) a ArrowArray is admitedly not
trivial, and requires good intimacy with the Arrow C data interface and columnar
array specifications, to know, in which buffer of an array, data is to be read,
which data type void* buffers should be cast to, how to use buffers that contain
null/not_null information, how to use offset buffers for data types of type List, etc.
The study of the gdal_array._RecordBatchAsNumpy() method of the SWIG Python
bindings (https://github.com/OSGeo/gdal/blob/master/swig/include/gdal_array.i)
can give a good hint of how to use an ArrowArray object, in conjunction
with the associated ArrowSchema.

The below example illustrates how to read the content of a layer that consists
of a integer field and a geometry field:


.. code-block:: c++

#include "gdal_priv.h"
#include "ogr_api.h"
#include "ogrsf_frmts.h"
#include "ogr_recordbatch.h"
#include <cassert>

int main(int argc, char* argv[])
{
GDALAllRegister();
GDALDataset* poDS = GDALDataset::Open(argv[1]);
if( poDS == nullptr )
{
CPLError(CE_Failure, CPLE_AppDefined, "Open() failed\n");
exit(1);
}
OGRLayer* poLayer = poDS->GetLayer(0);
OGRLayerH hLayer = OGRLayer::ToHandle(poLayer);

// Get the Arrow stream
struct ArrowArrayStream stream;
if( !OGR_L_GetArrowStream(hLayer, &stream, nullptr))
{
CPLError(CE_Failure, CPLE_AppDefined, "OGR_L_GetArrowStream() failed\n");
delete poDS;
exit(1);
}

// Get the schema
struct ArrowSchema schema;
if( stream.get_schema(&stream, &schema) != 0 )
{
CPLError(CE_Failure, CPLE_AppDefined, "get_schema() failed\n");
stream.release(&stream);
delete poDS;
exit(1);
}

// Check that the returned schema consists of one int64 field (for FID),
// one int32 field and one binary/wkb field
if( schema.n_children != 3 ||
strcmp(schema.children[0]->format, "l") != 0 || // int64 -> FID
strcmp(schema.children[1]->format, "i") != 0 || // int32
strcmp(schema.children[2]->format, "z") != 0 ) // binary for WKB
{
CPLError(CE_Failure, CPLE_AppDefined,
"Layer has not the expected schema required by this example.");
schema.release(&schema);
stream.release(&stream);
delete poDS;
exit(1);
}
schema.release(&schema);

// Iterate over batches
while( true )
{
struct ArrowArray array;
if( stream.get_next(&stream, &array) != 0 ||
array.release == nullptr )
{
break;
}

assert(array.n_children == 3);

// Cast the array->children[].buffers[] to the appropriate data types
const auto int_child = array.children[1];
assert(int_child->n_buffers == 2);
const uint8_t* int_field_not_null = static_cast<const uint8_t*>(int_child->buffers[0]);
const int32_t* int_field = static_cast<const int32_t*>(int_child->buffers[1]);

const auto wkb_child = array.children[2];
assert(wkb_child->n_buffers == 3);
const uint8_t* wkb_field_not_null = static_cast<const uint8_t*>(wkb_child->buffers[0]);
const int32_t* wkb_offset = static_cast<const int32_t*>(wkb_child->buffers[1]);
const uint8_t* wkb_field = static_cast<const uint8_t*>(wkb_child->buffers[2]);

// Lambda to check if a field is set for a given feature index
const auto IsSet = [](const uint8_t* buffer_not_null, int i)
{
return buffer_not_null == nullptr || (buffer_not_null[i/8] >> (i%8)) != 0;
};

// Iterate through features of a batch
for( long long i = 0; i < array.length; i++ )
{
if( IsSet(int_field_not_null, i) )
printf("int_field[%lld] = %d\n", i, int_field[i]);
else
printf("int_field[%lld] = null\n", i);

if( IsSet(wkb_field_not_null, i) )
{
const void* wkb = wkb_field + wkb_offset[i];
const int32_t length = wkb_offset[i+1] - wkb_offset[i];
char* wkt = nullptr;
OGRGeometry* geom = nullptr;
OGRGeometryFactory::createFromWkb(wkb, nullptr, &geom, length);
if( geom )
{
geom->exportToWkt(&wkt);
}
printf("wkb_field[%lld] = %s\n", i, wkt ? wkt : "invalid geometry");
CPLFree(wkt);
delete geom;
}
else
{
printf("wkb_field[%lld] = null\n", i);
}
}

// Release memory taken by the batch
array.release(&array);
}

// Release stream and dataset
stream.release(&stream);
delete poDS;
return 0;
}


Writing To OGR
--------------

Expand Down
9 changes: 7 additions & 2 deletions doc/source/user/vector_data_model.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,14 @@ Layer

An :cpp:class:`OGRLayer` represents a layer of features within a data source. All features in an OGRLayer share a common schema and are of the same :cpp:class:`OGRFeatureDefn`. An OGRLayer class also contains methods for reading features from the data source. The OGRLayer can be thought of as a gateway for reading and writing features from an underlying data source, normally a file format. In SFCOM and other table based simple features implementation an OGRLayer represents a spatial table.

The OGRLayer includes methods for sequential and random reading and writing. Read access (via the :cpp:func:`OGRLayer::GetNextFeature` method) normally reads all features, one at a time sequentially; however, it can be limited to return features intersecting a particular geographic region by installing a spatial filter on the OGRLayer (via the :cpp:func:`OGRLayer::SetSpatialFilter` method).
The OGRLayer includes methods for sequential and random reading and writing. Read access (via the :cpp:func:`OGRLayer::GetNextFeature` method) normally reads all features, one at a time sequentially; however, it can be limited to return features intersecting a particular geographic region by installing a spatial filter on the OGRLayer (via the :cpp:func:`OGRLayer::SetSpatialFilter` method). A filter on attributes can only be set with the :cpp:func:`OGRLayer::SetAttributeFilter` method.

One flaw in the current OGR architecture is that the spatial filter is set directly on the OGRLayer which is intended to be the only representative of a given layer in a data source. This means it isn't possible to have multiple read operations active at one time with different spatial filters on each. This aspect may be revised in the future to introduce an OGRLayerView class or something similar.
Starting with GDAL 3.6, as an alternative to getting features through ``GetNextFeature``, it is possible to retrieve them by batches, with a column-oriented memory layout, using the :cpp:func:`OGRLayer::GetArrowStream` method (cf :ref:`vector_api_tut_arrow_stream`).

One flaw in the current OGR architecture is that the spatial and attribute filters are set directly on the OGRLayer which is intended to be the only representative of a given layer in a data source. This means it isn't possible to have multiple read operations active at one time with different spatial filters on each.

..
This aspect may be revised in the future to introduce an OGRLayerView class or something similar.
Another question that might arise is why the OGRLayer and OGRFeatureDefn classes are distinct. An OGRLayer always has a one-to-one relationship to an OGRFeatureDefn, so why not amalgamate the classes. There are two reasons:
- As defined now OGRFeature and OGRFeatureDefn don't depend on OGRLayer, so they can exist independently in memory without regard to a particular layer in a data store.
Expand Down
6 changes: 6 additions & 0 deletions ogr/ogrsf_frmts/generic/ogrlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6144,6 +6144,9 @@ const char* OGRLayer::GetLastErrorArrowArrayStream(struct ArrowArrayStream*)
}
stream.release(&stream);
\endcode
*
* A full example is available in the
* <a href="https://gdal.org/tutorials/vector_api_tut.html#reading-from-ogr-using-the-arrow-c-stream-data-interface">Reading From OGR using the Arrow C Stream data interface</a> tutorial.
*
* Options may be driver specific. The default implementation recognizes the
* following options:
Expand Down Expand Up @@ -6261,6 +6264,9 @@ bool OGRLayer::GetArrowStream(struct ArrowArrayStream* out_stream,
}
stream.release(&stream);
\endcode
*
* A full example is available in the
* <a href="https://gdal.org/tutorials/vector_api_tut.html#reading-from-ogr-using-the-arrow-c-stream-data-interface">Reading From OGR using the Arrow C Stream data interface</a> tutorial.
*
* Options may be driver specific. The default implementation recognizes the
* following options:
Expand Down
10 changes: 8 additions & 2 deletions ogr/ogrsf_frmts/ogrsf_frmts.dox
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ by the OGRSFDriverManager.

\brief Reset feature reading to start on the first feature.

This affects GetNextFeature().
This affects GetNextFeature() and GetArrowStream().

This method is the same as the C function OGR_L_ResetReading().

Expand All @@ -744,7 +744,7 @@ by the OGRSFDriverManager.

\brief Reset feature reading to start on the first feature.

This affects GetNextFeature().
This affects GetNextFeature() and GetArrowStream().

This function is the same as the C++ method OGRLayer::ResetReading().

Expand All @@ -768,6 +768,9 @@ by the OGRSFDriverManager.
This method implements sequential access to the features of a layer. The
ResetReading() method can be used to start at the beginning again.

Starting with GDAL 3.6, it is possible to retrieve them by batches, with a
column-oriented memory layout, using the GetArrowStream() method.

Features returned by GetNextFeature() may or may not be affected by
concurrent modifications depending on drivers. A guaranteed way of seeing
modifications in effect is to call ResetReading() on layers where
Expand Down Expand Up @@ -801,6 +804,9 @@ by the OGRSFDriverManager.
The OGR_L_ResetReading() function can be used to start at the beginning
again.

Starting with GDAL 3.6, it is possible to retrieve them by batches, with a
column-oriented memory layout, using the OGR_L_GetArrowStream() function.

Features returned by OGR_GetNextFeature() may or may not be affected by
concurrent modifications depending on drivers. A guaranteed way of seeing
modifications in effect is to call OGR_L_ResetReading() on layers where
Expand Down

0 comments on commit 0946e21

Please sign in to comment.