From b0317f2b2b62b3be9beb8d834aa51b776fb0179e Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 20 Aug 2024 17:04:33 +0900
Subject: [PATCH 01/32] GH-43707: [Python] Fix compilation on Cython<3 (#43765)
### Rationale for this change
Fix compilation on Cython < 3
### What changes are included in this PR?
Add an explicit cast
### Are these changes tested?
N/A
### Are there any user-facing changes?
No
* GitHub Issue: #43707
Authored-by: David Li
Signed-off-by: Joris Van den Bossche
---
python/pyarrow/types.pxi | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi
index 93d68fb847890..dcd2b61c33411 100644
--- a/python/pyarrow/types.pxi
+++ b/python/pyarrow/types.pxi
@@ -5328,8 +5328,9 @@ def opaque(DataType storage_type, str type_name not None, str vendor_name not No
cdef:
c_string c_type_name = tobytes(type_name)
c_string c_vendor_name = tobytes(vendor_name)
- shared_ptr[CDataType] c_type = make_shared[COpaqueType](
+ shared_ptr[COpaqueType] c_opaque_type = make_shared[COpaqueType](
storage_type.sp_type, c_type_name, c_vendor_name)
+ shared_ptr[CDataType] c_type = static_pointer_cast[CDataType, COpaqueType](c_opaque_type)
OpaqueType out = OpaqueType.__new__(OpaqueType)
out.init(c_type)
return out
From cc3c868aea7317a58447658f1c165ad352cd4865 Mon Sep 17 00:00:00 2001
From: Albert Villanova del Moral
<8515462+albertvillanova@users.noreply.github.com>
Date: Tue, 20 Aug 2024 16:57:57 +0200
Subject: [PATCH 02/32] MINOR: [Documentation] Add installation of ninja-build
to Python Development docs (#43600)
### Rationale for this change
Otherwise, you get a CMake error:
```
CMake Error: CMake was unable to find a build program corresponding to "Ninja". CMAKE_MAKE_PROGRAM is not set. You probably need to select a different build tool.
```
Authored-by: Albert Villanova del Moral <8515462+albertvillanova@users.noreply.github.com>
Signed-off-by: Joris Van den Bossche
---
docs/source/developers/python.rst | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/source/developers/python.rst b/docs/source/developers/python.rst
index 2f3e892ce8ede..6beea55e66b86 100644
--- a/docs/source/developers/python.rst
+++ b/docs/source/developers/python.rst
@@ -267,7 +267,7 @@ On Debian/Ubuntu, you need the following minimal set of dependencies:
.. code-block::
- $ sudo apt-get install build-essential cmake python3-dev
+ $ sudo apt-get install build-essential ninja-build cmake python3-dev
Now, let's create a Python virtual environment with all Python dependencies
in the same folder as the repositories, and a target installation folder:
From 525881987d0b9b4f464c3e3593a9a7b4e3c767d0 Mon Sep 17 00:00:00 2001
From: Joel Lubinitsky <33523178+joellubi@users.noreply.github.com>
Date: Tue, 20 Aug 2024 20:25:19 -0400
Subject: [PATCH 03/32] GH-17682: [C++][Python] Bool8 Extension Type
Implementation (#43488)
### Rationale for this change
C++ and Python implementations of #43234
### What changes are included in this PR?
- Implement C++ `Bool8Type`, `Bool8Array`, `Bool8Scalar`, and tests
- Implement Python bindings to C++, as well as zero-copy numpy conversion methods
- TODO: docs waiting for rebase on #43458
### Are these changes tested?
Yes
### Are there any user-facing changes?
Bool8 extension type will be available in C++ and Python libraries
* GitHub Issue: #17682
Authored-by: Joel Lubinitsky
Signed-off-by: Felipe Oliveira Carvalho
---
cpp/src/arrow/CMakeLists.txt | 1 +
cpp/src/arrow/extension/CMakeLists.txt | 6 +
cpp/src/arrow/extension/bool8.cc | 61 ++++++++
cpp/src/arrow/extension/bool8.h | 58 ++++++++
cpp/src/arrow/extension/bool8_test.cc | 91 ++++++++++++
cpp/src/arrow/extension_type.cc | 7 +-
python/pyarrow/__init__.py | 7 +-
python/pyarrow/array.pxi | 114 ++++++++++++++-
python/pyarrow/includes/libarrow.pxd | 9 ++
python/pyarrow/lib.pxd | 3 +
python/pyarrow/public-api.pxi | 2 +
python/pyarrow/scalar.pxi | 23 ++-
python/pyarrow/tests/test_extension_type.py | 152 ++++++++++++++++++++
python/pyarrow/tests/test_misc.py | 3 +
python/pyarrow/types.pxi | 74 ++++++++++
15 files changed, 604 insertions(+), 7 deletions(-)
create mode 100644 cpp/src/arrow/extension/bool8.cc
create mode 100644 cpp/src/arrow/extension/bool8.h
create mode 100644 cpp/src/arrow/extension/bool8_test.cc
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index fb785e1e9571b..fb7253b6fd69d 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -906,6 +906,7 @@ endif()
if(ARROW_JSON)
arrow_add_object_library(ARROW_JSON
+ extension/bool8.cc
extension/fixed_shape_tensor.cc
extension/opaque.cc
json/options.cc
diff --git a/cpp/src/arrow/extension/CMakeLists.txt b/cpp/src/arrow/extension/CMakeLists.txt
index 6741ab602f50b..fcd5fa529ab56 100644
--- a/cpp/src/arrow/extension/CMakeLists.txt
+++ b/cpp/src/arrow/extension/CMakeLists.txt
@@ -15,6 +15,12 @@
# specific language governing permissions and limitations
# under the License.
+add_arrow_test(test
+ SOURCES
+ bool8_test.cc
+ PREFIX
+ "arrow-extension-bool8")
+
add_arrow_test(test
SOURCES
fixed_shape_tensor_test.cc
diff --git a/cpp/src/arrow/extension/bool8.cc b/cpp/src/arrow/extension/bool8.cc
new file mode 100644
index 0000000000000..c081f0c2b2866
--- /dev/null
+++ b/cpp/src/arrow/extension/bool8.cc
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include
+
+#include "arrow/extension/bool8.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::extension {
+
+bool Bool8Type::ExtensionEquals(const ExtensionType& other) const {
+ return extension_name() == other.extension_name();
+}
+
+std::string Bool8Type::ToString(bool show_metadata) const {
+ std::stringstream ss;
+ ss << "extension<" << this->extension_name() << ">";
+ return ss.str();
+}
+
+std::string Bool8Type::Serialize() const { return ""; }
+
+Result> Bool8Type::Deserialize(
+ std::shared_ptr storage_type, const std::string& serialized_data) const {
+ if (storage_type->id() != Type::INT8) {
+ return Status::Invalid("Expected INT8 storage type, got ", storage_type->ToString());
+ }
+ if (serialized_data != "") {
+ return Status::Invalid("Serialize data must be empty, got ", serialized_data);
+ }
+ return bool8();
+}
+
+std::shared_ptr Bool8Type::MakeArray(std::shared_ptr data) const {
+ DCHECK_EQ(data->type->id(), Type::EXTENSION);
+ DCHECK_EQ("arrow.bool8",
+ internal::checked_cast(*data->type).extension_name());
+ return std::make_shared(data);
+}
+
+Result> Bool8Type::Make() {
+ return std::make_shared();
+}
+
+std::shared_ptr bool8() { return std::make_shared(); }
+
+} // namespace arrow::extension
diff --git a/cpp/src/arrow/extension/bool8.h b/cpp/src/arrow/extension/bool8.h
new file mode 100644
index 0000000000000..02e629b28a867
--- /dev/null
+++ b/cpp/src/arrow/extension/bool8.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/extension_type.h"
+
+namespace arrow::extension {
+
+/// \brief Bool8 is an alternate representation for boolean
+/// arrays using 8 bits instead of 1 bit per value. The underlying
+/// storage type is int8.
+class ARROW_EXPORT Bool8Array : public ExtensionArray {
+ public:
+ using ExtensionArray::ExtensionArray;
+};
+
+/// \brief Bool8 is an alternate representation for boolean
+/// arrays using 8 bits instead of 1 bit per value. The underlying
+/// storage type is int8.
+class ARROW_EXPORT Bool8Type : public ExtensionType {
+ public:
+ /// \brief Construct a Bool8Type.
+ Bool8Type() : ExtensionType(int8()) {}
+
+ std::string extension_name() const override { return "arrow.bool8"; }
+ std::string ToString(bool show_metadata = false) const override;
+
+ bool ExtensionEquals(const ExtensionType& other) const override;
+
+ std::string Serialize() const override;
+
+ Result> Deserialize(
+ std::shared_ptr storage_type,
+ const std::string& serialized_data) const override;
+
+ /// Create a Bool8Array from ArrayData
+ std::shared_ptr MakeArray(std::shared_ptr data) const override;
+
+ static Result> Make();
+};
+
+/// \brief Return a Bool8Type instance.
+ARROW_EXPORT std::shared_ptr bool8();
+
+} // namespace arrow::extension
diff --git a/cpp/src/arrow/extension/bool8_test.cc b/cpp/src/arrow/extension/bool8_test.cc
new file mode 100644
index 0000000000000..eabcfcf62d32c
--- /dev/null
+++ b/cpp/src/arrow/extension/bool8_test.cc
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/extension/bool8.h"
+#include "arrow/io/memory.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/ipc/writer.h"
+#include "arrow/testing/extension_type.h"
+#include "arrow/testing/gtest_util.h"
+
+namespace arrow {
+
+TEST(Bool8Type, Basics) {
+ auto type = internal::checked_pointer_cast(extension::bool8());
+ auto type2 = internal::checked_pointer_cast(extension::bool8());
+ ASSERT_EQ("arrow.bool8", type->extension_name());
+ ASSERT_EQ(*type, *type);
+ ASSERT_NE(*arrow::null(), *type);
+ ASSERT_EQ(*type, *type2);
+ ASSERT_EQ(*arrow::int8(), *type->storage_type());
+ ASSERT_EQ("", type->Serialize());
+ ASSERT_EQ("extension", type->ToString(false));
+}
+
+TEST(Bool8Type, CreateFromArray) {
+ auto type = internal::checked_pointer_cast(extension::bool8());
+ auto storage = ArrayFromJSON(int8(), "[-1,0,1,2,null]");
+ auto array = ExtensionType::WrapArray(type, storage);
+ ASSERT_EQ(5, array->length());
+ ASSERT_EQ(1, array->null_count());
+}
+
+TEST(Bool8Type, Deserialize) {
+ auto type = internal::checked_pointer_cast(extension::bool8());
+ ASSERT_OK_AND_ASSIGN(auto deserialized, type->Deserialize(type->storage_type(), ""));
+ ASSERT_EQ(*type, *deserialized);
+ ASSERT_NOT_OK(type->Deserialize(type->storage_type(), "must be empty"));
+ ASSERT_EQ(*type, *deserialized);
+ ASSERT_NOT_OK(type->Deserialize(uint8(), ""));
+ ASSERT_EQ(*type, *deserialized);
+}
+
+TEST(Bool8Type, MetadataRoundTrip) {
+ auto type = internal::checked_pointer_cast(extension::bool8());
+ std::string serialized = type->Serialize();
+ ASSERT_OK_AND_ASSIGN(auto deserialized,
+ type->Deserialize(type->storage_type(), serialized));
+ ASSERT_EQ(*type, *deserialized);
+}
+
+TEST(Bool8Type, BatchRoundTrip) {
+ auto type = internal::checked_pointer_cast(extension::bool8());
+
+ auto storage = ArrayFromJSON(int8(), "[-1,0,1,2,null]");
+ auto array = ExtensionType::WrapArray(type, storage);
+ auto batch =
+ RecordBatch::Make(schema({field("field", type)}), array->length(), {array});
+
+ std::shared_ptr written;
+ {
+ ASSERT_OK_AND_ASSIGN(auto out_stream, io::BufferOutputStream::Create());
+ ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcWriteOptions::Defaults(),
+ out_stream.get()));
+
+ ASSERT_OK_AND_ASSIGN(auto complete_ipc_stream, out_stream->Finish());
+
+ io::BufferReader reader(complete_ipc_stream);
+ std::shared_ptr batch_reader;
+ ASSERT_OK_AND_ASSIGN(batch_reader, ipc::RecordBatchStreamReader::Open(&reader));
+ ASSERT_OK(batch_reader->ReadNext(&written));
+ }
+
+ ASSERT_EQ(*batch->schema(), *written->schema());
+ ASSERT_BATCHES_EQUAL(*batch, *written);
+}
+
+} // namespace arrow
diff --git a/cpp/src/arrow/extension_type.cc b/cpp/src/arrow/extension_type.cc
index cf8dda7a85df4..685018f7de7b8 100644
--- a/cpp/src/arrow/extension_type.cc
+++ b/cpp/src/arrow/extension_type.cc
@@ -28,6 +28,7 @@
#include "arrow/chunked_array.h"
#include "arrow/config.h"
#ifdef ARROW_JSON
+#include "arrow/extension/bool8.h"
#include "arrow/extension/fixed_shape_tensor.h"
#endif
#include "arrow/status.h"
@@ -146,10 +147,12 @@ static void CreateGlobalRegistry() {
#ifdef ARROW_JSON
// Register canonical extension types
- auto ext_type =
+ auto fst_ext_type =
checked_pointer_cast(extension::fixed_shape_tensor(int64(), {}));
+ ARROW_CHECK_OK(g_registry->RegisterType(fst_ext_type));
- ARROW_CHECK_OK(g_registry->RegisterType(ext_type));
+ auto bool8_ext_type = checked_pointer_cast(extension::bool8());
+ ARROW_CHECK_OK(g_registry->RegisterType(bool8_ext_type));
#endif
}
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index aa7bab9f97e05..807bcdc315036 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -174,6 +174,7 @@ def print_entry(label, value):
run_end_encoded,
fixed_shape_tensor,
opaque,
+ bool8,
field,
type_for_alias,
DataType, DictionaryType, StructType,
@@ -184,7 +185,7 @@ def print_entry(label, value):
FixedSizeBinaryType, Decimal128Type, Decimal256Type,
BaseExtensionType, ExtensionType,
RunEndEncodedType, FixedShapeTensorType, OpaqueType,
- PyExtensionType, UnknownExtensionType,
+ Bool8Type, PyExtensionType, UnknownExtensionType,
register_extension_type, unregister_extension_type,
DictionaryMemo,
KeyValueMetadata,
@@ -218,7 +219,7 @@ def print_entry(label, value):
MonthDayNanoIntervalArray,
Decimal128Array, Decimal256Array, StructArray, ExtensionArray,
RunEndEncodedArray, FixedShapeTensorArray, OpaqueArray,
- scalar, NA, _NULL as NULL, Scalar,
+ Bool8Array, scalar, NA, _NULL as NULL, Scalar,
NullScalar, BooleanScalar,
Int8Scalar, Int16Scalar, Int32Scalar, Int64Scalar,
UInt8Scalar, UInt16Scalar, UInt32Scalar, UInt64Scalar,
@@ -235,7 +236,7 @@ def print_entry(label, value):
FixedSizeBinaryScalar, DictionaryScalar,
MapScalar, StructScalar, UnionScalar,
RunEndEncodedScalar, ExtensionScalar,
- FixedShapeTensorScalar, OpaqueScalar)
+ FixedShapeTensorScalar, OpaqueScalar, Bool8Scalar)
# Buffers, allocation
from pyarrow.lib import (DeviceAllocationType, Device, MemoryManager,
diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi
index 6c40a21db96ca..4c3eb93232634 100644
--- a/python/pyarrow/array.pxi
+++ b/python/pyarrow/array.pxi
@@ -1581,7 +1581,7 @@ cdef class Array(_PandasConvertible):
def to_numpy(self, zero_copy_only=True, writable=False):
"""
- Return a NumPy view or copy of this array (experimental).
+ Return a NumPy view or copy of this array.
By default, tries to return a view of this array. This is only
supported for primitive arrays with the same memory layout as NumPy
@@ -4476,6 +4476,118 @@ cdef class OpaqueArray(ExtensionArray):
"""
+cdef class Bool8Array(ExtensionArray):
+ """
+ Concrete class for bool8 extension arrays.
+
+ Examples
+ --------
+ Define the extension type for an bool8 array
+
+ >>> import pyarrow as pa
+ >>> bool8_type = pa.bool8()
+
+ Create an extension array
+
+ >>> arr = [-1, 0, 1, 2, None]
+ >>> storage = pa.array(arr, pa.int8())
+ >>> pa.ExtensionArray.from_storage(bool8_type, storage)
+
+ [
+ -1,
+ 0,
+ 1,
+ 2,
+ null
+ ]
+ """
+
+ def to_numpy(self, zero_copy_only=True, writable=False):
+ """
+ Return a NumPy bool view or copy of this array.
+
+ By default, tries to return a view of this array. This is only
+ supported for arrays without any nulls.
+
+ Parameters
+ ----------
+ zero_copy_only : bool, default True
+ If True, an exception will be raised if the conversion to a numpy
+ array would require copying the underlying data (e.g. in presence
+ of nulls).
+ writable : bool, default False
+ For numpy arrays created with zero copy (view on the Arrow data),
+ the resulting array is not writable (Arrow data is immutable).
+ By setting this to True, a copy of the array is made to ensure
+ it is writable.
+
+ Returns
+ -------
+ array : numpy.ndarray
+ """
+ if not writable:
+ try:
+ return self.storage.to_numpy().view(np.bool_)
+ except ArrowInvalid as e:
+ if zero_copy_only:
+ raise e
+
+ return _pc().not_equal(self.storage, 0).to_numpy(zero_copy_only=zero_copy_only, writable=writable)
+
+ @staticmethod
+ def from_storage(Int8Array storage):
+ """
+ Construct Bool8Array from Int8Array storage.
+
+ Parameters
+ ----------
+ storage : Int8Array
+ The underlying storage for the result array.
+
+ Returns
+ -------
+ bool8_array : Bool8Array
+ """
+ return ExtensionArray.from_storage(bool8(), storage)
+
+ @staticmethod
+ def from_numpy(obj):
+ """
+ Convert numpy array to a bool8 extension array without making a copy.
+ The input array must be 1-dimensional, with either bool_ or int8 dtype.
+
+ Parameters
+ ----------
+ obj : numpy.ndarray
+
+ Returns
+ -------
+ bool8_array : Bool8Array
+
+ Examples
+ --------
+ >>> import pyarrow as pa
+ >>> import numpy as np
+ >>> arr = np.array([True, False, True], dtype=np.bool_)
+ >>> pa.Bool8Array.from_numpy(arr)
+
+ [
+ 1,
+ 0,
+ 1
+ ]
+ """
+
+ if obj.ndim != 1:
+ raise ValueError(f"Cannot convert {obj.ndim}-D array to bool8 array")
+
+ if obj.dtype not in [np.bool_, np.int8]:
+ raise TypeError(f"Array dtype {obj.dtype} incompatible with bool8 storage")
+
+ storage_arr = array(obj.view(np.int8), type=int8())
+ return Bool8Array.from_storage(storage_arr)
+
+
cdef dict _array_classes = {
_Type_NA: NullArray,
_Type_BOOL: BooleanArray,
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 9b008d150f1f1..a54a1db292f70 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2895,6 +2895,15 @@ cdef extern from "arrow/extension/opaque.h" namespace "arrow::extension" nogil:
pass
+cdef extern from "arrow/extension/bool8.h" namespace "arrow::extension" nogil:
+ cdef cppclass CBool8Type" arrow::extension::Bool8Type"(CExtensionType):
+
+ @staticmethod
+ CResult[shared_ptr[CDataType]] Make()
+
+ cdef cppclass CBool8Array" arrow::extension::Bool8Array"(CExtensionArray):
+ pass
+
cdef extern from "arrow/util/compression.h" namespace "arrow" nogil:
cdef enum CCompressionType" arrow::Compression::type":
CCompressionType_UNCOMPRESSED" arrow::Compression::UNCOMPRESSED"
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 2cb302d20a8ac..e3625c1815274 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -214,6 +214,9 @@ cdef class FixedShapeTensorType(BaseExtensionType):
cdef:
const CFixedShapeTensorType* tensor_ext_type
+cdef class Bool8Type(BaseExtensionType):
+ cdef:
+ const CBool8Type* bool8_ext_type
cdef class OpaqueType(BaseExtensionType):
cdef:
diff --git a/python/pyarrow/public-api.pxi b/python/pyarrow/public-api.pxi
index 2f9fc1c554209..19a26bd6c683d 100644
--- a/python/pyarrow/public-api.pxi
+++ b/python/pyarrow/public-api.pxi
@@ -126,6 +126,8 @@ cdef api object pyarrow_wrap_data_type(
out = FixedShapeTensorType.__new__(FixedShapeTensorType)
elif ext_type.extension_name() == b"arrow.opaque":
out = OpaqueType.__new__(OpaqueType)
+ elif ext_type.extension_name() == b"arrow.bool8":
+ out = Bool8Type.__new__(Bool8Type)
else:
out = BaseExtensionType.__new__(BaseExtensionType)
else:
diff --git a/python/pyarrow/scalar.pxi b/python/pyarrow/scalar.pxi
index 12a99c2aece63..72ae2aee5f8b3 100644
--- a/python/pyarrow/scalar.pxi
+++ b/python/pyarrow/scalar.pxi
@@ -1091,6 +1091,18 @@ cdef class OpaqueScalar(ExtensionScalar):
"""
+cdef class Bool8Scalar(ExtensionScalar):
+ """
+ Concrete class for bool8 extension scalar.
+ """
+
+ def as_py(self):
+ """
+ Return this scalar as a Python object.
+ """
+ py_val = super().as_py()
+ return None if py_val is None else py_val != 0
+
cdef dict _scalar_classes = {
_Type_BOOL: BooleanScalar,
_Type_UINT8: UInt8Scalar,
@@ -1199,6 +1211,11 @@ def scalar(value, type=None, *, from_pandas=None, MemoryPool memory_pool=None):
type = ensure_type(type, allow_none=True)
pool = maybe_unbox_memory_pool(memory_pool)
+ extension_type = None
+ if type is not None and type.id == _Type_EXTENSION:
+ extension_type = type
+ type = type.storage_type
+
if _is_array_like(value):
value = get_values(value, &is_pandas_object)
@@ -1223,4 +1240,8 @@ def scalar(value, type=None, *, from_pandas=None, MemoryPool memory_pool=None):
# retrieve the scalar from the first position
scalar = GetResultValue(array.get().GetScalar(0))
- return Scalar.wrap(scalar)
+ result = Scalar.wrap(scalar)
+
+ if extension_type is not None:
+ result = ExtensionScalar.from_storage(extension_type, result)
+ return result
diff --git a/python/pyarrow/tests/test_extension_type.py b/python/pyarrow/tests/test_extension_type.py
index 58c54189f223e..b04ee85ec99ad 100644
--- a/python/pyarrow/tests/test_extension_type.py
+++ b/python/pyarrow/tests/test_extension_type.py
@@ -1707,3 +1707,155 @@ def test_opaque_type(pickle_module, storage_type, storage):
# cast extension type -> storage type
inner = arr.cast(storage_type)
assert inner == storage
+
+
+def test_bool8_type(pickle_module):
+ bool8_type = pa.bool8()
+ storage_type = pa.int8()
+ assert bool8_type.extension_name == "arrow.bool8"
+ assert bool8_type.storage_type == storage_type
+ assert str(bool8_type) == "extension"
+
+ assert bool8_type == bool8_type
+ assert bool8_type == pa.bool8()
+ assert bool8_type != storage_type
+
+ # Pickle roundtrip
+ result = pickle_module.loads(pickle_module.dumps(bool8_type))
+ assert result == bool8_type
+
+ # IPC roundtrip
+ storage = pa.array([-1, 0, 1, 2, None], storage_type)
+ arr = pa.ExtensionArray.from_storage(bool8_type, storage)
+ assert isinstance(arr, pa.Bool8Array)
+
+ # extension is registered by default
+ buf = ipc_write_batch(pa.RecordBatch.from_arrays([arr], ["ext"]))
+ batch = ipc_read_batch(buf)
+
+ assert batch.column(0).type.extension_name == "arrow.bool8"
+ assert isinstance(batch.column(0), pa.Bool8Array)
+
+ # cast storage -> extension type
+ result = storage.cast(bool8_type)
+ assert result == arr
+
+ # cast extension type -> storage type
+ inner = arr.cast(storage_type)
+ assert inner == storage
+
+
+def test_bool8_to_bool_conversion():
+ bool_arr = pa.array([True, False, True, True, None], pa.bool_())
+ bool8_arr = pa.ExtensionArray.from_storage(
+ pa.bool8(),
+ pa.array([-1, 0, 1, 2, None], pa.int8()),
+ )
+
+ # cast extension type -> arrow boolean type
+ assert bool8_arr.cast(pa.bool_()) == bool_arr
+
+ # cast arrow boolean type -> extension type, expecting canonical values
+ canonical_storage = pa.array([1, 0, 1, 1, None], pa.int8())
+ canonical_bool8_arr = pa.ExtensionArray.from_storage(pa.bool8(), canonical_storage)
+ assert bool_arr.cast(pa.bool8()) == canonical_bool8_arr
+
+
+def test_bool8_to_numpy_conversion():
+ arr = pa.ExtensionArray.from_storage(
+ pa.bool8(),
+ pa.array([-1, 0, 1, 2, None], pa.int8()),
+ )
+
+ # cannot zero-copy with nulls
+ with pytest.raises(
+ pa.ArrowInvalid,
+ match="Needed to copy 1 chunks with 1 nulls, but zero_copy_only was True",
+ ):
+ arr.to_numpy()
+
+ # nullable conversion possible with a copy, but dest dtype is object
+ assert np.array_equal(
+ arr.to_numpy(zero_copy_only=False),
+ np.array([True, False, True, True, None], dtype=np.object_),
+ )
+
+ # zero-copy possible with non-null array
+ np_arr_no_nulls = np.array([True, False, True, True], dtype=np.bool_)
+ arr_no_nulls = pa.ExtensionArray.from_storage(
+ pa.bool8(),
+ pa.array([-1, 0, 1, 2], pa.int8()),
+ )
+
+ arr_to_np = arr_no_nulls.to_numpy()
+ assert np.array_equal(arr_to_np, np_arr_no_nulls)
+
+ # same underlying buffer
+ assert arr_to_np.ctypes.data == arr_no_nulls.buffers()[1].address
+
+ # if the user requests a writable array, a copy should be performed
+ arr_to_np_writable = arr_no_nulls.to_numpy(zero_copy_only=False, writable=True)
+ assert np.array_equal(arr_to_np_writable, np_arr_no_nulls)
+
+ # different underlying buffer
+ assert arr_to_np_writable.ctypes.data != arr_no_nulls.buffers()[1].address
+
+
+def test_bool8_from_numpy_conversion():
+ np_arr_no_nulls = np.array([True, False, True, True], dtype=np.bool_)
+ canonical_bool8_arr_no_nulls = pa.ExtensionArray.from_storage(
+ pa.bool8(),
+ pa.array([1, 0, 1, 1], pa.int8()),
+ )
+
+ arr_from_np = pa.Bool8Array.from_numpy(np_arr_no_nulls)
+ assert arr_from_np == canonical_bool8_arr_no_nulls
+
+ # same underlying buffer
+ assert arr_from_np.buffers()[1].address == np_arr_no_nulls.ctypes.data
+
+ # conversion only valid for 1-D arrays
+ with pytest.raises(
+ ValueError,
+ match="Cannot convert 2-D array to bool8 array",
+ ):
+ pa.Bool8Array.from_numpy(
+ np.array([[True, False], [False, True]], dtype=np.bool_),
+ )
+
+ with pytest.raises(
+ ValueError,
+ match="Cannot convert 0-D array to bool8 array",
+ ):
+ pa.Bool8Array.from_numpy(np.bool_())
+
+ # must use compatible storage type
+ with pytest.raises(
+ TypeError,
+ match="Array dtype float64 incompatible with bool8 storage",
+ ):
+ pa.Bool8Array.from_numpy(np.array([1, 2, 3], dtype=np.float64))
+
+
+def test_bool8_scalar():
+ assert pa.ExtensionScalar.from_storage(pa.bool8(), -1).as_py() is True
+ assert pa.ExtensionScalar.from_storage(pa.bool8(), 0).as_py() is False
+ assert pa.ExtensionScalar.from_storage(pa.bool8(), 1).as_py() is True
+ assert pa.ExtensionScalar.from_storage(pa.bool8(), 2).as_py() is True
+ assert pa.ExtensionScalar.from_storage(pa.bool8(), None).as_py() is None
+
+ arr = pa.ExtensionArray.from_storage(
+ pa.bool8(),
+ pa.array([-1, 0, 1, 2, None], pa.int8()),
+ )
+ assert arr[0].as_py() is True
+ assert arr[1].as_py() is False
+ assert arr[2].as_py() is True
+ assert arr[3].as_py() is True
+ assert arr[4].as_py() is None
+
+ assert pa.scalar(-1, type=pa.bool8()).as_py() is True
+ assert pa.scalar(0, type=pa.bool8()).as_py() is False
+ assert pa.scalar(1, type=pa.bool8()).as_py() is True
+ assert pa.scalar(2, type=pa.bool8()).as_py() is True
+ assert pa.scalar(None, type=pa.bool8()).as_py() is None
diff --git a/python/pyarrow/tests/test_misc.py b/python/pyarrow/tests/test_misc.py
index 9a55a38177fc8..5d3471c7c35db 100644
--- a/python/pyarrow/tests/test_misc.py
+++ b/python/pyarrow/tests/test_misc.py
@@ -250,6 +250,9 @@ def test_set_timezone_db_path_non_windows():
pa.OpaqueArray,
pa.OpaqueScalar,
pa.OpaqueType,
+ pa.Bool8Array,
+ pa.Bool8Scalar,
+ pa.Bool8Type,
])
def test_extension_type_constructor_errors(klass):
# ARROW-2638: prevent calling extension class constructors directly
diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi
index dcd2b61c33411..563782f0c2643 100644
--- a/python/pyarrow/types.pxi
+++ b/python/pyarrow/types.pxi
@@ -1837,6 +1837,37 @@ cdef class FixedShapeTensorType(BaseExtensionType):
return FixedShapeTensorScalar
+cdef class Bool8Type(BaseExtensionType):
+ """
+ Concrete class for bool8 extension type.
+
+ Bool8 is an alternate representation for boolean
+ arrays using 8 bits instead of 1 bit per value. The underlying
+ storage type is int8.
+
+ Examples
+ --------
+ Create an instance of bool8 extension type:
+
+ >>> import pyarrow as pa
+ >>> pa.bool8()
+ Bool8Type(extension)
+ """
+
+ cdef void init(self, const shared_ptr[CDataType]& type) except *:
+ BaseExtensionType.init(self, type)
+ self.bool8_ext_type = type.get()
+
+ def __arrow_ext_class__(self):
+ return Bool8Array
+
+ def __reduce__(self):
+ return bool8, ()
+
+ def __arrow_ext_scalar_class__(self):
+ return Bool8Scalar
+
+
cdef class OpaqueType(BaseExtensionType):
"""
Concrete class for opaque extension type.
@@ -5278,6 +5309,49 @@ def fixed_shape_tensor(DataType value_type, shape, dim_names=None, permutation=N
return out
+def bool8():
+ """
+ Create instance of bool8 extension type.
+
+ Examples
+ --------
+ Create an instance of bool8 extension type:
+
+ >>> import pyarrow as pa
+ >>> type = pa.bool8()
+ >>> type
+ Bool8Type(extension)
+
+ Inspect the data type:
+
+ >>> type.storage_type
+ DataType(int8)
+
+ Create a table with a bool8 array:
+
+ >>> arr = [-1, 0, 1, 2, None]
+ >>> storage = pa.array(arr, pa.int8())
+ >>> other = pa.ExtensionArray.from_storage(type, storage)
+ >>> pa.table([other], names=["unknown_col"])
+ pyarrow.Table
+ unknown_col: extension
+ ----
+ unknown_col: [[-1,0,1,2,null]]
+
+ Returns
+ -------
+ type : Bool8Type
+ """
+
+ cdef Bool8Type out = Bool8Type.__new__(Bool8Type)
+
+ c_type = GetResultValue(CBool8Type.Make())
+
+ out.init(c_type)
+
+ return out
+
+
def opaque(DataType storage_type, str type_name not None, str vendor_name not None):
"""
Create instance of opaque extension type.
From 27c22389579dd773d9701f5d3c743bbfca3bdb8e Mon Sep 17 00:00:00 2001
From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com>
Date: Wed, 21 Aug 2024 14:38:12 +0900
Subject: [PATCH 04/32] MINOR: [Java] Bump org.codehaus.mojo:exec-maven-plugin
from 3.3.0 to 3.4.1 in /java (#43692)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Bumps [org.codehaus.mojo:exec-maven-plugin](https://github.com/mojohaus/exec-maven-plugin) from 3.3.0 to 3.4.1.
Release notes
Sourced from org.codehaus.mojo:exec-maven-plugin's releases.
3.4.1
🐛 Bug Fixes
📦 Dependency updates
👻 Maintenance
🔧 Build
3.4.0
🚀 New features and improvements
- Allow
<includePluginDependencies>
to be specified for the exec:exec goal (#432) @sebthom
🐛 Bug Fixes
📦 Dependency updates
👻 Maintenance
🔧 Build
Commits
7b0be2c
[maven-release-plugin] prepare release 3.4.1
5ac4f80
Environment variable Path should be used as case-insensitive
cfb3a9f
Use Maven4 enabled with GH Action
d0ded48
Use shared release drafter GH Action
4c22954
Bump org.codehaus.mojo:mojo-parent from 84 to 85
a8c4f94
fix: NPE because declared MavenSession field hides field of superclass
a2b735f
Remove redundant spotless configuration
8e0e83c
[maven-release-plugin] prepare for next development iteration
6c4996f
[maven-release-plugin] prepare release 3.4.0
c7ad671
Remove Log4j 1.2.x from ITs
- Additional commits viewable in compare view
[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=org.codehaus.mojo:exec-maven-plugin&package-manager=maven&previous-version=3.3.0&new-version=3.4.1)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@ dependabot rebase`.
[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)
---
Dependabot commands and options
You can trigger Dependabot actions by commenting on this PR:
- `@ dependabot rebase` will rebase this PR
- `@ dependabot recreate` will recreate this PR, overwriting any edits that have been made to it
- `@ dependabot merge` will merge this PR after your CI passes on it
- `@ dependabot squash and merge` will squash and merge this PR after your CI passes on it
- `@ dependabot cancel merge` will cancel a previously requested merge and block automerging
- `@ dependabot reopen` will reopen this PR if it is closed
- `@ dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
- `@ dependabot show ignore conditions` will show all of the ignore conditions of the specified dependency
- `@ dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
- `@ dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
- `@ dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
Authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: David Li
---
java/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/java/pom.xml b/java/pom.xml
index 1524dc3257997..0f3e5760f2b82 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -504,7 +504,7 @@ under the License.
org.codehaus.mojo
exec-maven-plugin
- 3.3.0
+ 3.4.1
org.codehaus.mojo
From 4af1e491df7ac22217656668b65c3e8d55f5b5ab Mon Sep 17 00:00:00 2001
From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com>
Date: Wed, 21 Aug 2024 14:56:44 +0900
Subject: [PATCH 05/32] MINOR: [Java] Bump io.grpc:grpc-bom from 1.65.0 to
1.66.0 in /java (#43657)
Bumps [io.grpc:grpc-bom](https://github.com/grpc/grpc-java) from 1.65.0 to 1.66.0.
Release notes
Sourced from io.grpc:grpc-bom's releases.
v1.65.1
What's Changed
- netty: Restore old behavior of NettyAdaptiveCumulator, but avoid using that class if Netty is on version 4.1.111 or later
Commits
cf78406
Bump version to 1.66.0
33af0a7
Update README etc to reference 1.66.0
19c9b99
xds: XdsClient should unsubscribe on last resource (#11264)
752a045
Revert "Start 1.67.0 development cycle (#11416)" (#11428)
ef09d94
Revert "Introduce onResult2 in NameResolver Listener2 that returns Status (#1...
c37fb18
Start 1.67.0 development cycle
9ba2f9d
Introduce onResult2 in NameResolver Listener2 that returns Status (#11313)
786523d
xds: WRR rr_fallback should trigger with one endpoint weight
b108ed3
api: Give instruments a toString() including their name
eb4cdf7
Update MAINTAINERS.md (#11241)
- Additional commits viewable in compare view
[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=io.grpc:grpc-bom&package-manager=maven&previous-version=1.65.0&new-version=1.66.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)
Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@ dependabot rebase`.
[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)
---
Dependabot commands and options
You can trigger Dependabot actions by commenting on this PR:
- `@ dependabot rebase` will rebase this PR
- `@ dependabot recreate` will recreate this PR, overwriting any edits that have been made to it
- `@ dependabot merge` will merge this PR after your CI passes on it
- `@ dependabot squash and merge` will squash and merge this PR after your CI passes on it
- `@ dependabot cancel merge` will cancel a previously requested merge and block automerging
- `@ dependabot reopen` will reopen this PR if it is closed
- `@ dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
- `@ dependabot show ignore conditions` will show all of the ignore conditions of the specified dependency
- `@ dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
- `@ dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
- `@ dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
Authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: David Li
---
java/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/java/pom.xml b/java/pom.xml
index 0f3e5760f2b82..a73453df68fd2 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -97,7 +97,7 @@ under the License.
2.0.13
33.2.1-jre
4.1.112.Final
- 1.65.0
+ 1.66.0
3.25.4
2.17.2
3.4.0
From 9fc03015463a8f1cb616b088342b104fbc767a0c Mon Sep 17 00:00:00 2001
From: Joris Van den Bossche
Date: Wed, 21 Aug 2024 09:22:53 +0200
Subject: [PATCH 06/32] GH-43069: [Python] Use Py_IsFinalizing from
pythoncapi_compat.h (#43767)
### Rationale for this change
https://github.com/apache/arrow/pull/43540 already vendored `pythoncapi_compat.h`, so closing https://github.com/apache/arrow/issues/43069 by using this as well for `Py_IsFinalizing` (which was added in https://github.com/apache/arrow/pull/42034, and for which we opened that follow-up issue to use `pythoncapi_compat.h` instead)
Authored-by: Joris Van den Bossche
Signed-off-by: Joris Van den Bossche
---
python/pyarrow/src/arrow/python/udf.cc | 5 +----
1 file changed, 1 insertion(+), 4 deletions(-)
diff --git a/python/pyarrow/src/arrow/python/udf.cc b/python/pyarrow/src/arrow/python/udf.cc
index 2c1e97c3ea03d..74f16899c47eb 100644
--- a/python/pyarrow/src/arrow/python/udf.cc
+++ b/python/pyarrow/src/arrow/python/udf.cc
@@ -24,14 +24,11 @@
#include "arrow/compute/kernel.h"
#include "arrow/compute/row/grouper.h"
#include "arrow/python/common.h"
+#include "arrow/python/vendored/pythoncapi_compat.h"
#include "arrow/table.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
-// Py_IsFinalizing added in Python 3.13.0a4
-#if PY_VERSION_HEX < 0x030D00A4
-#define Py_IsFinalizing() _Py_IsFinalizing()
-#endif
namespace arrow {
using compute::ExecSpan;
using compute::Grouper;
From e1e7c501019ac26c896d61fa0c129eee83da9b55 Mon Sep 17 00:00:00 2001
From: Oliver Layer
Date: Wed, 21 Aug 2024 13:22:57 +0200
Subject: [PATCH 07/32] GH-40036: [C++] Azure file system write buffering &
async writes (#43096)
### Rationale for this change
See #40036.
### What changes are included in this PR?
Write buffering and async writes (similar to what the S3 file system does) in the `ObjectAppendStream` for the Azure file system.
With write buffering and async writes, the input scenario creation runtime in the tests (which uses the `ObjectAppendStream` against Azurite) decreased from ~25s (see [here](https://github.com/apache/arrow/issues/40036)) to ~800ms:
```
[ RUN ] TestAzuriteFileSystem.OpenInputFileMixedReadVsReadAt
[ OK ] TestAzuriteFileSystem.OpenInputFileMixedReadVsReadAt (787 ms)
```
### Are these changes tested?
Added some tests with background writes enabled and disabled (some were taken from the S3 tests). Everything changed should be covered.
### Are there any user-facing changes?
`AzureOptions` now allows for `background_writes` to be set (default: true). No breaking changes.
### Notes
- The code in `DoWrite` is very similar to [the code in the S3 FS](https://github.com/apache/arrow/blob/edfa343eeca008513f0300924380e1b187cc976b/cpp/src/arrow/filesystem/s3fs.cc#L1753). Maybe this could be unified? I didn't see this in the scope of the PR though.
* GitHub Issue: #40036
Lead-authored-by: Oliver Layer
Co-authored-by: Antoine Pitrou
Signed-off-by: Antoine Pitrou
---
cpp/src/arrow/filesystem/azurefs.cc | 276 ++++++++++++++++++++---
cpp/src/arrow/filesystem/azurefs.h | 3 +
cpp/src/arrow/filesystem/azurefs_test.cc | 264 ++++++++++++++++++----
3 files changed, 471 insertions(+), 72 deletions(-)
diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc
index 9b3c0c0c1d703..0bad856339729 100644
--- a/cpp/src/arrow/filesystem/azurefs.cc
+++ b/cpp/src/arrow/filesystem/azurefs.cc
@@ -22,6 +22,7 @@
#include "arrow/filesystem/azurefs.h"
#include "arrow/filesystem/azurefs_internal.h"
+#include "arrow/io/memory.h"
// idenfity.hpp triggers -Wattributes warnings cause -Werror builds to fail,
// so disable it for this file with pragmas.
@@ -144,6 +145,9 @@ Status AzureOptions::ExtractFromUriQuery(const Uri& uri) {
blob_storage_scheme = "http";
dfs_storage_scheme = "http";
}
+ } else if (kv.first == "background_writes") {
+ ARROW_ASSIGN_OR_RAISE(background_writes,
+ ::arrow::internal::ParseBoolean(kv.second));
} else {
return Status::Invalid(
"Unexpected query parameter in Azure Blob File System URI: '", kv.first, "'");
@@ -937,8 +941,8 @@ Status CommitBlockList(std::shared_ptr block_bl
const std::vector& block_ids,
const Blobs::CommitBlockListOptions& options) {
try {
- // CommitBlockList puts all block_ids in the latest element. That means in the case of
- // overlapping block_ids the newly staged block ids will always replace the
+ // CommitBlockList puts all block_ids in the latest element. That means in the case
+ // of overlapping block_ids the newly staged block ids will always replace the
// previously committed blocks.
// https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
block_blob_client->CommitBlockList(block_ids, options);
@@ -950,7 +954,34 @@ Status CommitBlockList(std::shared_ptr block_bl
return Status::OK();
}
+Status StageBlock(Blobs::BlockBlobClient* block_blob_client, const std::string& id,
+ Core::IO::MemoryBodyStream& content) {
+ try {
+ block_blob_client->StageBlock(id, content);
+ } catch (const Storage::StorageException& exception) {
+ return ExceptionToStatus(
+ exception, "StageBlock failed for '", block_blob_client->GetUrl(),
+ "' new_block_id: '", id,
+ "'. Staging new blocks is fundamental to streaming writes to blob storage.");
+ }
+
+ return Status::OK();
+}
+
+/// Writes will be buffered up to this size (in bytes) before actually uploading them.
+static constexpr int64_t kBlockUploadSizeBytes = 10 * 1024 * 1024;
+/// The maximum size of a block in Azure Blob (as per docs).
+static constexpr int64_t kMaxBlockSizeBytes = 4UL * 1024 * 1024 * 1024;
+
+/// This output stream, similar to other arrow OutputStreams, is not thread-safe.
class ObjectAppendStream final : public io::OutputStream {
+ private:
+ struct UploadState;
+
+ std::shared_ptr Self() {
+ return std::dynamic_pointer_cast(shared_from_this());
+ }
+
public:
ObjectAppendStream(std::shared_ptr block_blob_client,
const io::IOContext& io_context, const AzureLocation& location,
@@ -958,7 +989,8 @@ class ObjectAppendStream final : public io::OutputStream {
const AzureOptions& options)
: block_blob_client_(std::move(block_blob_client)),
io_context_(io_context),
- location_(location) {
+ location_(location),
+ background_writes_(options.background_writes) {
if (metadata && metadata->size() != 0) {
ArrowMetadataToCommitBlockListOptions(metadata, commit_block_list_options_);
} else if (options.default_metadata && options.default_metadata->size() != 0) {
@@ -1008,10 +1040,13 @@ class ObjectAppendStream final : public io::OutputStream {
content_length_ = 0;
}
}
+
+ upload_state_ = std::make_shared();
+
if (content_length_ > 0) {
ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
for (auto block : block_list.CommittedBlocks) {
- block_ids_.push_back(block.Name);
+ upload_state_->block_ids.push_back(block.Name);
}
}
initialised_ = true;
@@ -1031,12 +1066,34 @@ class ObjectAppendStream final : public io::OutputStream {
if (closed_) {
return Status::OK();
}
+
+ if (current_block_) {
+ // Upload remaining buffer
+ RETURN_NOT_OK(AppendCurrentBlock());
+ }
+
RETURN_NOT_OK(Flush());
block_blob_client_ = nullptr;
closed_ = true;
return Status::OK();
}
+ Future<> CloseAsync() override {
+ if (closed_) {
+ return Status::OK();
+ }
+
+ if (current_block_) {
+ // Upload remaining buffer
+ RETURN_NOT_OK(AppendCurrentBlock());
+ }
+
+ return FlushAsync().Then([self = Self()]() {
+ self->block_blob_client_ = nullptr;
+ self->closed_ = true;
+ });
+ }
+
bool closed() const override { return closed_; }
Status CheckClosed(const char* action) const {
@@ -1052,11 +1109,11 @@ class ObjectAppendStream final : public io::OutputStream {
}
Status Write(const std::shared_ptr& buffer) override {
- return DoAppend(buffer->data(), buffer->size(), buffer);
+ return DoWrite(buffer->data(), buffer->size(), buffer);
}
Status Write(const void* data, int64_t nbytes) override {
- return DoAppend(data, nbytes);
+ return DoWrite(data, nbytes);
}
Status Flush() override {
@@ -1066,20 +1123,111 @@ class ObjectAppendStream final : public io::OutputStream {
// flush. This also avoids some unhandled errors when flushing in the destructor.
return Status::OK();
}
- return CommitBlockList(block_blob_client_, block_ids_, commit_block_list_options_);
+
+ Future<> pending_blocks_completed;
+ {
+ std::unique_lock lock(upload_state_->mutex);
+ pending_blocks_completed = upload_state_->pending_blocks_completed;
+ }
+
+ RETURN_NOT_OK(pending_blocks_completed.status());
+ std::unique_lock lock(upload_state_->mutex);
+ return CommitBlockList(block_blob_client_, upload_state_->block_ids,
+ commit_block_list_options_);
}
- private:
- Status DoAppend(const void* data, int64_t nbytes,
- std::shared_ptr owned_buffer = nullptr) {
- RETURN_NOT_OK(CheckClosed("append"));
- auto append_data = reinterpret_cast(data);
- Core::IO::MemoryBodyStream block_content(append_data, nbytes);
- if (block_content.Length() == 0) {
+ Future<> FlushAsync() {
+ RETURN_NOT_OK(CheckClosed("flush async"));
+ if (!initialised_) {
+ // If the stream has not been successfully initialized then there is nothing to
+ // flush. This also avoids some unhandled errors when flushing in the destructor.
return Status::OK();
}
- const auto n_block_ids = block_ids_.size();
+ Future<> pending_blocks_completed;
+ {
+ std::unique_lock lock(upload_state_->mutex);
+ pending_blocks_completed = upload_state_->pending_blocks_completed;
+ }
+
+ return pending_blocks_completed.Then([self = Self()] {
+ std::unique_lock lock(self->upload_state_->mutex);
+ return CommitBlockList(self->block_blob_client_, self->upload_state_->block_ids,
+ self->commit_block_list_options_);
+ });
+ }
+
+ private:
+ Status AppendCurrentBlock() {
+ ARROW_ASSIGN_OR_RAISE(auto buf, current_block_->Finish());
+ current_block_.reset();
+ current_block_size_ = 0;
+ return AppendBlock(buf);
+ }
+
+ Status DoWrite(const void* data, int64_t nbytes,
+ std::shared_ptr owned_buffer = nullptr) {
+ if (closed_) {
+ return Status::Invalid("Operation on closed stream");
+ }
+
+ const auto* data_ptr = reinterpret_cast(data);
+ auto advance_ptr = [this, &data_ptr, &nbytes](const int64_t offset) {
+ data_ptr += offset;
+ nbytes -= offset;
+ pos_ += offset;
+ content_length_ += offset;
+ };
+
+ // Handle case where we have some bytes buffered from prior calls.
+ if (current_block_size_ > 0) {
+ // Try to fill current buffer
+ const int64_t to_copy =
+ std::min(nbytes, kBlockUploadSizeBytes - current_block_size_);
+ RETURN_NOT_OK(current_block_->Write(data_ptr, to_copy));
+ current_block_size_ += to_copy;
+ advance_ptr(to_copy);
+
+ // If buffer isn't full, break
+ if (current_block_size_ < kBlockUploadSizeBytes) {
+ return Status::OK();
+ }
+
+ // Upload current buffer
+ RETURN_NOT_OK(AppendCurrentBlock());
+ }
+
+ // We can upload chunks without copying them into a buffer
+ while (nbytes >= kBlockUploadSizeBytes) {
+ const auto upload_size = std::min(nbytes, kMaxBlockSizeBytes);
+ RETURN_NOT_OK(AppendBlock(data_ptr, upload_size));
+ advance_ptr(upload_size);
+ }
+
+ // Buffer remaining bytes
+ if (nbytes > 0) {
+ current_block_size_ = nbytes;
+
+ if (current_block_ == nullptr) {
+ ARROW_ASSIGN_OR_RAISE(
+ current_block_,
+ io::BufferOutputStream::Create(kBlockUploadSizeBytes, io_context_.pool()));
+ } else {
+ // Re-use the allocation from before.
+ RETURN_NOT_OK(current_block_->Reset(kBlockUploadSizeBytes, io_context_.pool()));
+ }
+
+ RETURN_NOT_OK(current_block_->Write(data_ptr, current_block_size_));
+ pos_ += current_block_size_;
+ content_length_ += current_block_size_;
+ }
+
+ return Status::OK();
+ }
+
+ std::string CreateBlock() {
+ std::unique_lock lock(upload_state_->mutex);
+ const auto n_block_ids = upload_state_->block_ids.size();
// New block ID must always be distinct from the existing block IDs. Otherwise we
// will accidentally replace the content of existing blocks, causing corruption.
@@ -1093,36 +1241,106 @@ class ObjectAppendStream final : public io::OutputStream {
new_block_id.insert(0, required_padding_digits, '0');
// There is a small risk when appending to a blob created by another client that
// `new_block_id` may overlapping with an existing block id. Adding the `-arrow`
- // suffix significantly reduces the risk, but does not 100% eliminate it. For example
- // if the blob was previously created with one block, with id `00001-arrow` then the
- // next block we append will conflict with that, and cause corruption.
+ // suffix significantly reduces the risk, but does not 100% eliminate it. For
+ // example if the blob was previously created with one block, with id `00001-arrow`
+ // then the next block we append will conflict with that, and cause corruption.
new_block_id += "-arrow";
new_block_id = Core::Convert::Base64Encode(
std::vector(new_block_id.begin(), new_block_id.end()));
- try {
- block_blob_client_->StageBlock(new_block_id, block_content);
- } catch (const Storage::StorageException& exception) {
- return ExceptionToStatus(
- exception, "StageBlock failed for '", block_blob_client_->GetUrl(),
- "' new_block_id: '", new_block_id,
- "'. Staging new blocks is fundamental to streaming writes to blob storage.");
+ upload_state_->block_ids.push_back(new_block_id);
+
+ // We only use the future if we have background writes enabled. Without background
+ // writes the future is initialized as finished and not mutated any more.
+ if (background_writes_ && upload_state_->blocks_in_progress++ == 0) {
+ upload_state_->pending_blocks_completed = Future<>::Make();
}
- block_ids_.push_back(new_block_id);
- pos_ += nbytes;
- content_length_ += nbytes;
+
+ return new_block_id;
+ }
+
+ Status AppendBlock(const void* data, int64_t nbytes,
+ std::shared_ptr owned_buffer = nullptr) {
+ RETURN_NOT_OK(CheckClosed("append"));
+
+ if (nbytes == 0) {
+ return Status::OK();
+ }
+
+ const auto block_id = CreateBlock();
+
+ if (background_writes_) {
+ if (owned_buffer == nullptr) {
+ ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool()));
+ memcpy(owned_buffer->mutable_data(), data, nbytes);
+ } else {
+ DCHECK_EQ(data, owned_buffer->data());
+ DCHECK_EQ(nbytes, owned_buffer->size());
+ }
+
+ // The closure keeps the buffer and the upload state alive
+ auto deferred = [owned_buffer, block_id, block_blob_client = block_blob_client_,
+ state = upload_state_]() mutable -> Status {
+ Core::IO::MemoryBodyStream block_content(owned_buffer->data(),
+ owned_buffer->size());
+
+ auto status = StageBlock(block_blob_client.get(), block_id, block_content);
+ HandleUploadOutcome(state, status);
+ return Status::OK();
+ };
+ RETURN_NOT_OK(io::internal::SubmitIO(io_context_, std::move(deferred)));
+ } else {
+ auto append_data = reinterpret_cast(data);
+ Core::IO::MemoryBodyStream block_content(append_data, nbytes);
+
+ RETURN_NOT_OK(StageBlock(block_blob_client_.get(), block_id, block_content));
+ }
+
return Status::OK();
}
+ Status AppendBlock(std::shared_ptr buffer) {
+ return AppendBlock(buffer->data(), buffer->size(), buffer);
+ }
+
+ static void HandleUploadOutcome(const std::shared_ptr& state,
+ const Status& status) {
+ std::unique_lock lock(state->mutex);
+ if (!status.ok()) {
+ state->status &= status;
+ }
+ // Notify completion
+ if (--state->blocks_in_progress == 0) {
+ auto fut = state->pending_blocks_completed;
+ lock.unlock();
+ fut.MarkFinished(state->status);
+ }
+ }
+
std::shared_ptr block_blob_client_;
const io::IOContext io_context_;
const AzureLocation location_;
+ const bool background_writes_;
int64_t content_length_ = kNoSize;
+ std::shared_ptr current_block_;
+ int64_t current_block_size_ = 0;
+
bool closed_ = false;
bool initialised_ = false;
int64_t pos_ = 0;
- std::vector block_ids_;
+
+ // This struct is kept alive through background writes to avoid problems
+ // in the completion handler.
+ struct UploadState {
+ std::mutex mutex;
+ std::vector block_ids;
+ int64_t blocks_in_progress = 0;
+ Status status;
+ Future<> pending_blocks_completed = Future<>::MakeFinished(Status::OK());
+ };
+ std::shared_ptr upload_state_;
+
Blobs::CommitBlockListOptions commit_block_list_options_;
};
diff --git a/cpp/src/arrow/filesystem/azurefs.h b/cpp/src/arrow/filesystem/azurefs.h
index 072b061eeb2a9..ebbe00c4ee784 100644
--- a/cpp/src/arrow/filesystem/azurefs.h
+++ b/cpp/src/arrow/filesystem/azurefs.h
@@ -112,6 +112,9 @@ struct ARROW_EXPORT AzureOptions {
/// This will be ignored if non-empty metadata is passed to OpenOutputStream.
std::shared_ptr default_metadata;
+ /// Whether OutputStream writes will be issued in the background, without blocking.
+ bool background_writes = true;
+
private:
enum class CredentialKind {
kDefault,
diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc
index 5ff241b17ff58..9d437d1f83aac 100644
--- a/cpp/src/arrow/filesystem/azurefs_test.cc
+++ b/cpp/src/arrow/filesystem/azurefs_test.cc
@@ -39,6 +39,7 @@
#include
#include
#include
+#include
#include
#include
@@ -53,6 +54,7 @@
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/util.h"
+#include "arrow/util/future.h"
#include "arrow/util/io_util.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
@@ -566,6 +568,7 @@ class TestAzureOptions : public ::testing::Test {
ASSERT_EQ(options.dfs_storage_scheme, default_options.dfs_storage_scheme);
ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kDefault);
ASSERT_EQ(path, "container/dir/blob");
+ ASSERT_EQ(options.background_writes, true);
}
void TestFromUriDfsStorage() {
@@ -582,6 +585,7 @@ class TestAzureOptions : public ::testing::Test {
ASSERT_EQ(options.dfs_storage_scheme, default_options.dfs_storage_scheme);
ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kDefault);
ASSERT_EQ(path, "file_system/dir/file");
+ ASSERT_EQ(options.background_writes, true);
}
void TestFromUriAbfs() {
@@ -597,6 +601,7 @@ class TestAzureOptions : public ::testing::Test {
ASSERT_EQ(options.dfs_storage_scheme, "https");
ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kStorageSharedKey);
ASSERT_EQ(path, "container/dir/blob");
+ ASSERT_EQ(options.background_writes, true);
}
void TestFromUriAbfss() {
@@ -612,6 +617,7 @@ class TestAzureOptions : public ::testing::Test {
ASSERT_EQ(options.dfs_storage_scheme, "https");
ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kStorageSharedKey);
ASSERT_EQ(path, "container/dir/blob");
+ ASSERT_EQ(options.background_writes, true);
}
void TestFromUriEnableTls() {
@@ -628,6 +634,17 @@ class TestAzureOptions : public ::testing::Test {
ASSERT_EQ(options.dfs_storage_scheme, "http");
ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kStorageSharedKey);
ASSERT_EQ(path, "container/dir/blob");
+ ASSERT_EQ(options.background_writes, true);
+ }
+
+ void TestFromUriDisableBackgroundWrites() {
+ std::string path;
+ ASSERT_OK_AND_ASSIGN(auto options,
+ AzureOptions::FromUri(
+ "abfs://account:password@127.0.0.1:10000/container/dir/blob?"
+ "background_writes=false",
+ &path));
+ ASSERT_EQ(options.background_writes, false);
}
void TestFromUriCredentialDefault() {
@@ -773,6 +790,9 @@ TEST_F(TestAzureOptions, FromUriDfsStorage) { TestFromUriDfsStorage(); }
TEST_F(TestAzureOptions, FromUriAbfs) { TestFromUriAbfs(); }
TEST_F(TestAzureOptions, FromUriAbfss) { TestFromUriAbfss(); }
TEST_F(TestAzureOptions, FromUriEnableTls) { TestFromUriEnableTls(); }
+TEST_F(TestAzureOptions, FromUriDisableBackgroundWrites) {
+ TestFromUriDisableBackgroundWrites();
+}
TEST_F(TestAzureOptions, FromUriCredentialDefault) { TestFromUriCredentialDefault(); }
TEST_F(TestAzureOptions, FromUriCredentialAnonymous) { TestFromUriCredentialAnonymous(); }
TEST_F(TestAzureOptions, FromUriCredentialStorageSharedKey) {
@@ -929,8 +949,9 @@ class TestAzureFileSystem : public ::testing::Test {
void UploadLines(const std::vector& lines, const std::string& path,
int total_size) {
ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(path, {}));
- const auto all_lines = std::accumulate(lines.begin(), lines.end(), std::string(""));
- ASSERT_OK(output->Write(all_lines));
+ for (auto const& line : lines) {
+ ASSERT_OK(output->Write(line.data(), line.size()));
+ }
ASSERT_OK(output->Close());
}
@@ -1474,6 +1495,162 @@ class TestAzureFileSystem : public ::testing::Test {
arrow::fs::AssertFileInfo(fs(), data.Path("dir/file0"), FileType::File);
}
+ void AssertObjectContents(AzureFileSystem* fs, std::string_view path,
+ std::string_view expected) {
+ ASSERT_OK_AND_ASSIGN(auto input, fs->OpenInputStream(std::string{path}));
+ std::string contents;
+ std::shared_ptr buffer;
+ do {
+ ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024));
+ contents.append(buffer->ToString());
+ } while (buffer->size() != 0);
+
+ EXPECT_EQ(expected, contents);
+ }
+
+ void TestOpenOutputStreamSmall() {
+ ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+
+ auto data = SetUpPreexistingData();
+ const auto path = data.ContainerPath("test-write-object");
+ ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {}));
+ const std::string_view expected(PreexistingData::kLoremIpsum);
+ ASSERT_OK(output->Write(expected));
+ ASSERT_OK(output->Close());
+
+ // Verify we can read the object back.
+ AssertObjectContents(fs.get(), path, expected);
+ }
+
+ void TestOpenOutputStreamLarge() {
+ ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+
+ auto data = SetUpPreexistingData();
+ const auto path = data.ContainerPath("test-write-object");
+ ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {}));
+
+ // Upload 5 MB, 4 MB und 2 MB and a very small write to test varying sizes
+ std::vector sizes{5 * 1024 * 1024, 4 * 1024 * 1024, 2 * 1024 * 1024,
+ 2000};
+
+ std::vector buffers{};
+ char current_char = 'A';
+ for (const auto size : sizes) {
+ buffers.emplace_back(size, current_char++);
+ }
+
+ auto expected_size = std::int64_t{0};
+ for (size_t i = 0; i < buffers.size(); ++i) {
+ ASSERT_OK(output->Write(buffers[i]));
+ expected_size += sizes[i];
+ ASSERT_EQ(expected_size, output->Tell());
+ }
+ ASSERT_OK(output->Close());
+
+ AssertObjectContents(fs.get(), path,
+ buffers[0] + buffers[1] + buffers[2] + buffers[3]);
+ }
+
+ void TestOpenOutputStreamLargeSingleWrite() {
+ ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+
+ auto data = SetUpPreexistingData();
+ const auto path = data.ContainerPath("test-write-object");
+ ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {}));
+
+ constexpr std::int64_t size{12 * 1024 * 1024};
+ const std::string large_string(size, 'X');
+
+ ASSERT_OK(output->Write(large_string));
+ ASSERT_EQ(size, output->Tell());
+ ASSERT_OK(output->Close());
+
+ AssertObjectContents(fs.get(), path, large_string);
+ }
+
+ void TestOpenOutputStreamCloseAsync() {
+#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)
+ // This false positive leak is similar to the one pinpointed in the
+ // have_false_positive_memory_leak_with_generator() comments above,
+ // though the stack trace is different. It happens when a block list
+ // is committed from a background thread.
+ //
+ // clang-format off
+ // Direct leak of 968 byte(s) in 1 object(s) allocated from:
+ // #0 calloc
+ // #1 (/lib/x86_64-linux-gnu/libxml2.so.2+0xe25a4)
+ // #2 __xmlDefaultBufferSize
+ // #3 xmlBufferCreate
+ // #4 Azure::Storage::_internal::XmlWriter::XmlWriter()
+ // #5 Azure::Storage::Blobs::_detail::BlockBlobClient::CommitBlockList
+ // #6 Azure::Storage::Blobs::BlockBlobClient::CommitBlockList
+ // #7 arrow::fs::(anonymous namespace)::CommitBlockList
+ // #8 arrow::fs::(anonymous namespace)::ObjectAppendStream::FlushAsync()::'lambda'
+ // clang-format on
+ //
+ // TODO perhaps remove this skip once we can rely on
+ // https://github.com/Azure/azure-sdk-for-cpp/pull/5767
+ //
+ // Also note that ClickHouse has a workaround for a similar issue:
+ // https://github.com/ClickHouse/ClickHouse/pull/45796
+ if (options_.background_writes) {
+ GTEST_SKIP() << "False positive memory leak in libxml2 with CloseAsync";
+ }
+#endif
+ ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+ auto data = SetUpPreexistingData();
+ const std::string path = data.ContainerPath("test-write-object");
+ constexpr auto payload = PreexistingData::kLoremIpsum;
+
+ ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path));
+ ASSERT_OK(stream->Write(payload));
+ auto close_fut = stream->CloseAsync();
+
+ ASSERT_OK(close_fut.MoveResult());
+
+ AssertObjectContents(fs.get(), path, payload);
+ }
+
+ void TestOpenOutputStreamCloseAsyncDestructor() {
+#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)
+ // See above.
+ if (options_.background_writes) {
+ GTEST_SKIP() << "False positive memory leak in libxml2 with CloseAsync";
+ }
+#endif
+ ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+ auto data = SetUpPreexistingData();
+ const std::string path = data.ContainerPath("test-write-object");
+ constexpr auto payload = PreexistingData::kLoremIpsum;
+
+ ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path));
+ ASSERT_OK(stream->Write(payload));
+ // Destructor implicitly closes stream and completes the upload.
+ // Testing it doesn't matter whether flush is triggered asynchronously
+ // after CloseAsync or synchronously after stream.reset() since we're just
+ // checking that the future keeps the stream alive until completion
+ // rather than segfaulting on a dangling stream.
+ auto close_fut = stream->CloseAsync();
+ stream.reset();
+ ASSERT_OK(close_fut.MoveResult());
+
+ AssertObjectContents(fs.get(), path, payload);
+ }
+
+ void TestOpenOutputStreamDestructor() {
+ ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+ constexpr auto* payload = "new data";
+ auto data = SetUpPreexistingData();
+ const std::string path = data.ContainerPath("test-write-object");
+
+ ASSERT_OK_AND_ASSIGN(auto stream, fs->OpenOutputStream(path));
+ ASSERT_OK(stream->Write(payload));
+ // Destructor implicitly closes stream and completes the multipart upload.
+ stream.reset();
+
+ AssertObjectContents(fs.get(), path, payload);
+ }
+
private:
using StringMatcher =
::testing::PolymorphicMatcher<::testing::internal::HasSubstrMatcher>;
@@ -2704,53 +2881,27 @@ TEST_F(TestAzuriteFileSystem, WriteMetadataHttpHeaders) {
ASSERT_EQ("text/plain", content_type);
}
-TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) {
- auto data = SetUpPreexistingData();
- const auto path = data.ContainerPath("test-write-object");
- ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(path, {}));
- const std::string_view expected(PreexistingData::kLoremIpsum);
- ASSERT_OK(output->Write(expected));
- ASSERT_OK(output->Close());
-
- // Verify we can read the object back.
- ASSERT_OK_AND_ASSIGN(auto input, fs()->OpenInputStream(path));
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmallNoBackgroundWrites) {
+ options_.background_writes = false;
+ TestOpenOutputStreamSmall();
+}
- std::array inbuf{};
- ASSERT_OK_AND_ASSIGN(auto size, input->Read(inbuf.size(), inbuf.data()));
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) { TestOpenOutputStreamSmall(); }
- EXPECT_EQ(expected, std::string_view(inbuf.data(), size));
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeNoBackgroundWrites) {
+ options_.background_writes = false;
+ TestOpenOutputStreamLarge();
}
-TEST_F(TestAzuriteFileSystem, OpenOutputStreamLarge) {
- auto data = SetUpPreexistingData();
- const auto path = data.ContainerPath("test-write-object");
- ASSERT_OK_AND_ASSIGN(auto output, fs()->OpenOutputStream(path, {}));
- std::array sizes{257 * 1024, 258 * 1024, 259 * 1024};
- std::array buffers{
- std::string(sizes[0], 'A'),
- std::string(sizes[1], 'B'),
- std::string(sizes[2], 'C'),
- };
- auto expected = std::int64_t{0};
- for (auto i = 0; i != 3; ++i) {
- ASSERT_OK(output->Write(buffers[i]));
- expected += sizes[i];
- ASSERT_EQ(expected, output->Tell());
- }
- ASSERT_OK(output->Close());
-
- // Verify we can read the object back.
- ASSERT_OK_AND_ASSIGN(auto input, fs()->OpenInputStream(path));
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamLarge) { TestOpenOutputStreamLarge(); }
- std::string contents;
- std::shared_ptr buffer;
- do {
- ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024));
- ASSERT_TRUE(buffer);
- contents.append(buffer->ToString());
- } while (buffer->size() != 0);
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeSingleWriteNoBackgroundWrites) {
+ options_.background_writes = false;
+ TestOpenOutputStreamLargeSingleWrite();
+}
- EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]);
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamLargeSingleWrite) {
+ TestOpenOutputStreamLargeSingleWrite();
}
TEST_F(TestAzuriteFileSystem, OpenOutputStreamTruncatesExistingFile) {
@@ -2820,6 +2971,33 @@ TEST_F(TestAzuriteFileSystem, OpenOutputStreamClosed) {
ASSERT_RAISES(Invalid, output->Tell());
}
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamCloseAsync) {
+ TestOpenOutputStreamCloseAsync();
+}
+
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamCloseAsyncNoBackgroundWrites) {
+ options_.background_writes = false;
+ TestOpenOutputStreamCloseAsync();
+}
+
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamAsyncDestructor) {
+ TestOpenOutputStreamCloseAsyncDestructor();
+}
+
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamAsyncDestructorNoBackgroundWrites) {
+ options_.background_writes = false;
+ TestOpenOutputStreamCloseAsyncDestructor();
+}
+
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamDestructor) {
+ TestOpenOutputStreamDestructor();
+}
+
+TEST_F(TestAzuriteFileSystem, OpenOutputStreamDestructorNoBackgroundWrites) {
+ options_.background_writes = false;
+ TestOpenOutputStreamDestructor();
+}
+
TEST_F(TestAzuriteFileSystem, OpenOutputStreamUri) {
auto data = SetUpPreexistingData();
const auto path = data.ContainerPath("open-output-stream-uri.txt");
From ffee537d88ab6d26614e2a1e85d4d18152695020 Mon Sep 17 00:00:00 2001
From: Joris Van den Bossche
Date: Wed, 21 Aug 2024 14:18:45 +0200
Subject: [PATCH 08/32] GH-42222: [Python] Add bindings for CopyTo on
RecordBatch and Array classes (#42223)
### Rationale for this change
We have added bindings for the Device and MemoryManager classes (https://github.com/apache/arrow/issues/41126), and as a next step we can expose the functionality to copy a full Array or RecordBatch to a specific memory manager.
### What changes are included in this PR?
This adds a `copy_to` method on pyarrow Array and RecordBatch.
### Are these changes tested?
Yes
* GitHub Issue: #42222
Authored-by: Joris Van den Bossche
Signed-off-by: Joris Van den Bossche
---
python/pyarrow/array.pxi | 36 ++++++++++++
python/pyarrow/device.pxi | 6 ++
python/pyarrow/includes/libarrow.pxd | 4 ++
python/pyarrow/lib.pxd | 4 ++
python/pyarrow/table.pxi | 35 ++++++++++++
python/pyarrow/tests/test_cuda.py | 82 +++++++++++-----------------
python/pyarrow/tests/test_device.py | 26 +++++++++
7 files changed, 143 insertions(+), 50 deletions(-)
diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi
index 4c3eb93232634..77d6c9c06d2de 100644
--- a/python/pyarrow/array.pxi
+++ b/python/pyarrow/array.pxi
@@ -1702,6 +1702,42 @@ cdef class Array(_PandasConvertible):
_append_array_buffers(self.sp_array.get().data().get(), res)
return res
+ def copy_to(self, destination):
+ """
+ Construct a copy of the array with all buffers on destination
+ device.
+
+ This method recursively copies the array's buffers and those of its
+ children onto the destination MemoryManager device and returns the
+ new Array.
+
+ Parameters
+ ----------
+ destination : pyarrow.MemoryManager or pyarrow.Device
+ The destination device to copy the array to.
+
+ Returns
+ -------
+ Array
+ """
+ cdef:
+ shared_ptr[CArray] c_array
+ shared_ptr[CMemoryManager] c_memory_manager
+
+ if isinstance(destination, Device):
+ c_memory_manager = (destination).unwrap().get().default_memory_manager()
+ elif isinstance(destination, MemoryManager):
+ c_memory_manager = (destination).unwrap()
+ else:
+ raise TypeError(
+ "Argument 'destination' has incorrect type (expected a "
+ f"pyarrow Device or MemoryManager, got {type(destination)})"
+ )
+
+ with nogil:
+ c_array = GetResultValue(self.ap.CopyTo(c_memory_manager))
+ return pyarrow_wrap_array(c_array)
+
def _export_to_c(self, out_ptr, out_schema_ptr=0):
"""
Export to a C ArrowArray struct, given its pointer.
diff --git a/python/pyarrow/device.pxi b/python/pyarrow/device.pxi
index 6e6034752085a..26256de62093e 100644
--- a/python/pyarrow/device.pxi
+++ b/python/pyarrow/device.pxi
@@ -64,6 +64,9 @@ cdef class Device(_Weakrefable):
self.init(device)
return self
+ cdef inline shared_ptr[CDevice] unwrap(self) nogil:
+ return self.device
+
def __eq__(self, other):
if not isinstance(other, Device):
return False
@@ -130,6 +133,9 @@ cdef class MemoryManager(_Weakrefable):
self.init(mm)
return self
+ cdef inline shared_ptr[CMemoryManager] unwrap(self) nogil:
+ return self.memory_manager
+
def __repr__(self):
return "".format(
frombytes(self.memory_manager.get().device().get().ToString())
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index a54a1db292f70..6f510cfc0c06c 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -234,7 +234,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
CStatus Validate() const
CStatus ValidateFull() const
CResult[shared_ptr[CArray]] View(const shared_ptr[CDataType]& type)
+
CDeviceAllocationType device_type()
+ CResult[shared_ptr[CArray]] CopyTo(const shared_ptr[CMemoryManager]& to) const
shared_ptr[CArray] MakeArray(const shared_ptr[CArrayData]& data)
CResult[shared_ptr[CArray]] MakeArrayOfNull(
@@ -1027,6 +1029,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CRecordBatch] Slice(int64_t offset)
shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length)
+ CResult[shared_ptr[CRecordBatch]] CopyTo(const shared_ptr[CMemoryManager]& to) const
+
CResult[shared_ptr[CTensor]] ToTensor(c_bool null_to_nan, c_bool row_major,
CMemoryPool* pool) const
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index e3625c1815274..a7c3b496a0045 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -542,6 +542,8 @@ cdef class Device(_Weakrefable):
@staticmethod
cdef wrap(const shared_ptr[CDevice]& device)
+ cdef inline shared_ptr[CDevice] unwrap(self) nogil
+
cdef class MemoryManager(_Weakrefable):
cdef:
@@ -552,6 +554,8 @@ cdef class MemoryManager(_Weakrefable):
@staticmethod
cdef wrap(const shared_ptr[CMemoryManager]& mm)
+ cdef inline shared_ptr[CMemoryManager] unwrap(self) nogil
+
cdef class Buffer(_Weakrefable):
cdef:
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 8f7c44e55dc8d..6d34c71c9df40 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -3569,6 +3569,41 @@ cdef class RecordBatch(_Tabular):
row_major, pool))
return pyarrow_wrap_tensor(c_tensor)
+ def copy_to(self, destination):
+ """
+ Copy the entire RecordBatch to destination device.
+
+ This copies each column of the record batch to create
+ a new record batch where all underlying buffers for the columns have
+ been copied to the destination MemoryManager.
+
+ Parameters
+ ----------
+ destination : pyarrow.MemoryManager or pyarrow.Device
+ The destination device to copy the array to.
+
+ Returns
+ -------
+ RecordBatch
+ """
+ cdef:
+ shared_ptr[CRecordBatch] c_batch
+ shared_ptr[CMemoryManager] c_memory_manager
+
+ if isinstance(destination, Device):
+ c_memory_manager = (destination).unwrap().get().default_memory_manager()
+ elif isinstance(destination, MemoryManager):
+ c_memory_manager = (destination).unwrap()
+ else:
+ raise TypeError(
+ "Argument 'destination' has incorrect type (expected a "
+ f"pyarrow Device or MemoryManager, got {type(destination)})"
+ )
+
+ with nogil:
+ c_batch = GetResultValue(self.batch.CopyTo(c_memory_manager))
+ return pyarrow_wrap_batch(c_batch)
+
def _export_to_c(self, out_ptr, out_schema_ptr=0):
"""
Export to a C ArrowArray struct, given its pointer.
diff --git a/python/pyarrow/tests/test_cuda.py b/python/pyarrow/tests/test_cuda.py
index 36b97a6206463..d55be651b1571 100644
--- a/python/pyarrow/tests/test_cuda.py
+++ b/python/pyarrow/tests/test_cuda.py
@@ -827,21 +827,29 @@ def test_IPC(size):
assert p.exitcode == 0
-def _arr_copy_to_host(carr):
- # TODO replace below with copy to device when exposed in python
- buffers = []
- for cbuf in carr.buffers():
- if cbuf is None:
- buffers.append(None)
- else:
- buf = global_context.foreign_buffer(
- cbuf.address, cbuf.size, cbuf
- ).copy_to_host()
- buffers.append(buf)
-
- child = pa.Array.from_buffers(carr.type.value_type, 3, buffers[2:])
- new = pa.Array.from_buffers(carr.type, 2, buffers[:2], children=[child])
- return new
+def test_copy_to():
+ _, buf = make_random_buffer(size=10, target='device')
+ mm_cuda = buf.memory_manager
+
+ for dest in [mm_cuda, mm_cuda.device]:
+ arr = pa.array([0, 1, 2])
+ arr_cuda = arr.copy_to(dest)
+ assert not arr_cuda.buffers()[1].is_cpu
+ assert arr_cuda.buffers()[1].device_type == pa.DeviceAllocationType.CUDA
+ assert arr_cuda.buffers()[1].device == mm_cuda.device
+
+ arr_roundtrip = arr_cuda.copy_to(pa.default_cpu_memory_manager())
+ assert arr_roundtrip.equals(arr)
+
+ batch = pa.record_batch({"col": arr})
+ batch_cuda = batch.copy_to(dest)
+ buf_cuda = batch_cuda["col"].buffers()[1]
+ assert not buf_cuda.is_cpu
+ assert buf_cuda.device_type == pa.DeviceAllocationType.CUDA
+ assert buf_cuda.device == mm_cuda.device
+
+ batch_roundtrip = batch_cuda.copy_to(pa.default_cpu_memory_manager())
+ assert batch_roundtrip.equals(batch)
def test_device_interface_array():
@@ -856,19 +864,10 @@ def test_device_interface_array():
typ = pa.list_(pa.int32())
arr = pa.array([[1], [2, 42]], type=typ)
- # TODO replace below with copy to device when exposed in python
- cbuffers = []
- for buf in arr.buffers():
- if buf is None:
- cbuffers.append(None)
- else:
- cbuf = global_context.new_buffer(buf.size)
- cbuf.copy_from_host(buf, position=0, nbytes=buf.size)
- cbuffers.append(cbuf)
-
- carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[
- pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:])
- ])
+ # copy to device
+ _, buf = make_random_buffer(size=10, target='device')
+ mm_cuda = buf.memory_manager
+ carr = arr.copy_to(mm_cuda)
# Type is known up front
carr._export_to_c_device(ptr_array)
@@ -882,7 +881,7 @@ def test_device_interface_array():
del carr
carr_new = pa.Array._import_from_c_device(ptr_array, typ)
assert carr_new.type == pa.list_(pa.int32())
- arr_new = _arr_copy_to_host(carr_new)
+ arr_new = carr_new.copy_to(pa.default_cpu_memory_manager())
assert arr_new.equals(arr)
del carr_new
@@ -891,15 +890,13 @@ def test_device_interface_array():
pa.Array._import_from_c_device(ptr_array, typ)
# Schema is exported and imported at the same time
- carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[
- pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:])
- ])
+ carr = arr.copy_to(mm_cuda)
carr._export_to_c_device(ptr_array, ptr_schema)
# Delete and recreate C++ objects from exported pointers
del carr
carr_new = pa.Array._import_from_c_device(ptr_array, ptr_schema)
assert carr_new.type == pa.list_(pa.int32())
- arr_new = _arr_copy_to_host(carr_new)
+ arr_new = carr_new.copy_to(pa.default_cpu_memory_manager())
assert arr_new.equals(arr)
del carr_new
@@ -908,21 +905,6 @@ def test_device_interface_array():
pa.Array._import_from_c_device(ptr_array, ptr_schema)
-def _batch_copy_to_host(cbatch):
- # TODO replace below with copy to device when exposed in python
- arrs = []
- for col in cbatch.columns:
- buffers = [
- global_context.foreign_buffer(buf.address, buf.size, buf).copy_to_host()
- if buf is not None else None
- for buf in col.buffers()
- ]
- new = pa.Array.from_buffers(col.type, len(col), buffers)
- arrs.append(new)
-
- return pa.RecordBatch.from_arrays(arrs, schema=cbatch.schema)
-
-
def test_device_interface_batch_array():
cffi = pytest.importorskip("pyarrow.cffi")
ffi = cffi.ffi
@@ -949,7 +931,7 @@ def test_device_interface_batch_array():
del cbatch
cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema)
assert cbatch_new.schema == schema
- batch_new = _batch_copy_to_host(cbatch_new)
+ batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager())
assert batch_new.equals(batch)
del cbatch_new
@@ -964,7 +946,7 @@ def test_device_interface_batch_array():
del cbatch
cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)
assert cbatch_new.schema == schema
- batch_new = _batch_copy_to_host(cbatch_new)
+ batch_new = cbatch_new.copy_to(pa.default_cpu_memory_manager())
assert batch_new.equals(batch)
del cbatch_new
diff --git a/python/pyarrow/tests/test_device.py b/python/pyarrow/tests/test_device.py
index 6bdb015be1a95..dc1a51e6d0092 100644
--- a/python/pyarrow/tests/test_device.py
+++ b/python/pyarrow/tests/test_device.py
@@ -17,6 +17,8 @@
import pyarrow as pa
+import pytest
+
def test_device_memory_manager():
mm = pa.default_cpu_memory_manager()
@@ -41,3 +43,27 @@ def test_buffer_device():
assert buf.device.is_cpu
assert buf.device == pa.default_cpu_memory_manager().device
assert buf.memory_manager.is_cpu
+
+
+def test_copy_to():
+ mm = pa.default_cpu_memory_manager()
+
+ arr = pa.array([0, 1, 2])
+ batch = pa.record_batch({"col": arr})
+
+ for dest in [mm, mm.device]:
+ arr_copied = arr.copy_to(dest)
+ assert arr_copied.equals(arr)
+ assert arr_copied.buffers()[1].device == mm.device
+ assert arr_copied.buffers()[1].address != arr.buffers()[1].address
+
+ batch_copied = batch.copy_to(dest)
+ assert batch_copied.equals(batch)
+ assert batch_copied["col"].buffers()[1].device == mm.device
+ assert batch_copied["col"].buffers()[1].address != arr.buffers()[1].address
+
+ with pytest.raises(TypeError, match="Argument 'destination' has incorrect type"):
+ arr.copy_to(mm.device.device_type)
+
+ with pytest.raises(TypeError, match="Argument 'destination' has incorrect type"):
+ batch.copy_to(mm.device.device_type)
From f9911ee2ffc62fa946b2e1198bcdd13a757181fe Mon Sep 17 00:00:00 2001
From: Antoine Pitrou
Date: Wed, 21 Aug 2024 14:37:47 +0200
Subject: [PATCH 09/32] GH-43776: [C++] Add chunked Take benchmarks with a
small selection factor (#43772)
This should help exercise the performance of chunked Take implementation on more use cases.
* GitHub Issue: #43776
Authored-by: Antoine Pitrou
Signed-off-by: Antoine Pitrou
---
.../kernels/vector_selection_benchmark.cc | 91 ++++++++++++++++---
1 file changed, 80 insertions(+), 11 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/vector_selection_benchmark.cc b/cpp/src/arrow/compute/kernels/vector_selection_benchmark.cc
index c2a27dfe43488..75affd32560f0 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection_benchmark.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection_benchmark.cc
@@ -17,6 +17,7 @@
#include "benchmark/benchmark.h"
+#include
#include
#include
@@ -42,6 +43,9 @@ struct FilterParams {
const double filter_null_proportion;
};
+constexpr double kDefaultTakeSelectionFactor = 1.0;
+constexpr double kSmallTakeSelectionFactor = 0.05;
+
std::vector g_data_sizes = {kL2Size};
// The benchmark state parameter references this vector of cases. Test high and
@@ -104,14 +108,21 @@ struct TakeBenchmark {
benchmark::State& state;
RegressionArgs args;
random::RandomArrayGenerator rand;
+ double selection_factor;
bool indices_have_nulls;
bool monotonic_indices = false;
TakeBenchmark(benchmark::State& state, bool indices_have_nulls,
bool monotonic_indices = false)
+ : TakeBenchmark(state, /*selection_factor=*/kDefaultTakeSelectionFactor,
+ indices_have_nulls, monotonic_indices) {}
+
+ TakeBenchmark(benchmark::State& state, double selection_factor, bool indices_have_nulls,
+ bool monotonic_indices = false)
: state(state),
args(state, /*size_is_bytes=*/false),
rand(kSeed),
+ selection_factor(selection_factor),
indices_have_nulls(indices_have_nulls),
monotonic_indices(monotonic_indices) {}
@@ -185,10 +196,10 @@ struct TakeBenchmark {
}
void Bench(const std::shared_ptr& values) {
- double indices_null_proportion = indices_have_nulls ? args.null_proportion : 0;
- auto indices =
- rand.Int32(values->length(), 0, static_cast(values->length() - 1),
- indices_null_proportion);
+ const double indices_null_proportion = indices_have_nulls ? args.null_proportion : 0;
+ const int64_t num_indices = static_cast(selection_factor * values->length());
+ auto indices = rand.Int32(num_indices, 0, static_cast(values->length() - 1),
+ indices_null_proportion);
if (monotonic_indices) {
auto arg_sorter = *SortIndices(*indices);
@@ -198,14 +209,15 @@ struct TakeBenchmark {
for (auto _ : state) {
ABORT_NOT_OK(Take(values, indices).status());
}
- state.SetItemsProcessed(state.iterations() * values->length());
+ state.SetItemsProcessed(state.iterations() * num_indices);
+ state.counters["selection_factor"] = selection_factor;
}
void BenchChunked(const std::shared_ptr& values, bool chunk_indices_too) {
double indices_null_proportion = indices_have_nulls ? args.null_proportion : 0;
- auto indices =
- rand.Int32(values->length(), 0, static_cast(values->length() - 1),
- indices_null_proportion);
+ const int64_t num_indices = static_cast(selection_factor * values->length());
+ auto indices = rand.Int32(num_indices, 0, static_cast(values->length() - 1),
+ indices_null_proportion);
if (monotonic_indices) {
auto arg_sorter = *SortIndices(*indices);
@@ -213,14 +225,26 @@ struct TakeBenchmark {
}
std::shared_ptr chunked_indices;
if (chunk_indices_too) {
+ // Here we choose for indices chunks to have roughly the same length
+ // as values chunks, but there may be less of them if selection_factor < 1.0.
+ // The alternative is to have the same number of chunks, but with a potentially
+ // much smaller (and irrealistic) length.
std::vector> indices_chunks;
+ // Make sure there are at least two chunks of indices
+ const auto max_chunk_length = indices->length() / 2 + 1;
int64_t offset = 0;
for (int i = 0; i < values->num_chunks(); ++i) {
- auto chunk = indices->Slice(offset, values->chunk(i)->length());
+ const auto chunk_length = std::min(max_chunk_length, values->chunk(i)->length());
+ auto chunk = indices->Slice(offset, chunk_length);
indices_chunks.push_back(std::move(chunk));
- offset += values->chunk(i)->length();
+ offset += chunk_length;
+ if (offset >= indices->length()) {
+ break;
+ }
}
chunked_indices = std::make_shared(std::move(indices_chunks));
+ ARROW_CHECK_EQ(chunked_indices->length(), num_indices);
+ ARROW_CHECK_GT(chunked_indices->num_chunks(), 1);
}
if (chunk_indices_too) {
@@ -232,7 +256,8 @@ struct TakeBenchmark {
ABORT_NOT_OK(Take(values, indices).status());
}
}
- state.SetItemsProcessed(state.iterations() * values->length());
+ state.SetItemsProcessed(state.iterations() * num_indices);
+ state.counters["selection_factor"] = selection_factor;
}
};
@@ -432,12 +457,25 @@ static void TakeChunkedChunkedInt64RandomIndicesWithNulls(benchmark::State& stat
.ChunkedInt64(/*num_chunks=*/100, /*chunk_indices_too=*/true);
}
+static void TakeChunkedChunkedInt64FewRandomIndicesWithNulls(benchmark::State& state) {
+ TakeBenchmark(state, /*selection_factor=*/kSmallTakeSelectionFactor,
+ /*indices_with_nulls=*/true)
+ .ChunkedInt64(/*num_chunks=*/100, /*chunk_indices_too=*/true);
+}
+
static void TakeChunkedChunkedInt64MonotonicIndices(benchmark::State& state) {
TakeBenchmark(state, /*indices_with_nulls=*/false, /*monotonic=*/true)
.ChunkedInt64(
/*num_chunks=*/100, /*chunk_indices_too=*/true);
}
+static void TakeChunkedChunkedInt64FewMonotonicIndices(benchmark::State& state) {
+ TakeBenchmark(state, /*selection_factor=*/kSmallTakeSelectionFactor,
+ /*indices_with_nulls=*/false, /*monotonic=*/true)
+ .ChunkedInt64(
+ /*num_chunks=*/100, /*chunk_indices_too=*/true);
+}
+
static void TakeChunkedChunkedFSBRandomIndicesNoNulls(benchmark::State& state) {
TakeBenchmark(state, /*indices_with_nulls=*/false)
.ChunkedFSB(/*num_chunks=*/100, /*chunk_indices_too=*/true);
@@ -463,11 +501,23 @@ static void TakeChunkedChunkedStringRandomIndicesWithNulls(benchmark::State& sta
.ChunkedString(/*num_chunks=*/100, /*chunk_indices_too=*/true);
}
+static void TakeChunkedChunkedStringFewRandomIndicesWithNulls(benchmark::State& state) {
+ TakeBenchmark(state, /*selection_factor=*/kSmallTakeSelectionFactor,
+ /*indices_with_nulls=*/true)
+ .ChunkedString(/*num_chunks=*/100, /*chunk_indices_too=*/true);
+}
+
static void TakeChunkedChunkedStringMonotonicIndices(benchmark::State& state) {
TakeBenchmark(state, /*indices_with_nulls=*/false, /*monotonic=*/true)
.ChunkedString(/*num_chunks=*/100, /*chunk_indices_too=*/true);
}
+static void TakeChunkedChunkedStringFewMonotonicIndices(benchmark::State& state) {
+ TakeBenchmark(state, /*selection_factor=*/kSmallTakeSelectionFactor,
+ /*indices_with_nulls=*/false, /*monotonic=*/true)
+ .ChunkedString(/*num_chunks=*/100, /*chunk_indices_too=*/true);
+}
+
static void TakeChunkedFlatInt64RandomIndicesNoNulls(benchmark::State& state) {
TakeBenchmark(state, /*indices_with_nulls=*/false)
.ChunkedInt64(/*num_chunks=*/100, /*chunk_indices_too=*/false);
@@ -478,12 +528,25 @@ static void TakeChunkedFlatInt64RandomIndicesWithNulls(benchmark::State& state)
.ChunkedInt64(/*num_chunks=*/100, /*chunk_indices_too=*/false);
}
+static void TakeChunkedFlatInt64FewRandomIndicesWithNulls(benchmark::State& state) {
+ TakeBenchmark(state, /*selection_factor=*/kSmallTakeSelectionFactor,
+ /*indices_with_nulls=*/true)
+ .ChunkedInt64(/*num_chunks=*/100, /*chunk_indices_too=*/false);
+}
+
static void TakeChunkedFlatInt64MonotonicIndices(benchmark::State& state) {
TakeBenchmark(state, /*indices_with_nulls=*/false, /*monotonic=*/true)
.ChunkedInt64(
/*num_chunks=*/100, /*chunk_indices_too=*/false);
}
+static void TakeChunkedFlatInt64FewMonotonicIndices(benchmark::State& state) {
+ TakeBenchmark(state, /*selection_factor=*/kSmallTakeSelectionFactor,
+ /*indices_with_nulls=*/false, /*monotonic=*/true)
+ .ChunkedInt64(
+ /*num_chunks=*/100, /*chunk_indices_too=*/false);
+}
+
void FilterSetArgs(benchmark::internal::Benchmark* bench) {
for (int64_t size : g_data_sizes) {
for (int i = 0; i < static_cast(g_filter_params.size()); ++i) {
@@ -560,18 +623,24 @@ BENCHMARK(TakeStringMonotonicIndices)->Apply(TakeSetArgs);
// Chunked values x Chunked indices
BENCHMARK(TakeChunkedChunkedInt64RandomIndicesNoNulls)->Apply(TakeSetArgs);
BENCHMARK(TakeChunkedChunkedInt64RandomIndicesWithNulls)->Apply(TakeSetArgs);
+BENCHMARK(TakeChunkedChunkedInt64FewRandomIndicesWithNulls)->Apply(TakeSetArgs);
BENCHMARK(TakeChunkedChunkedInt64MonotonicIndices)->Apply(TakeSetArgs);
+BENCHMARK(TakeChunkedChunkedInt64FewMonotonicIndices)->Apply(TakeSetArgs);
BENCHMARK(TakeChunkedChunkedFSBRandomIndicesNoNulls)->Apply(TakeFSBSetArgs);
BENCHMARK(TakeChunkedChunkedFSBRandomIndicesWithNulls)->Apply(TakeFSBSetArgs);
BENCHMARK(TakeChunkedChunkedFSBMonotonicIndices)->Apply(TakeFSBSetArgs);
BENCHMARK(TakeChunkedChunkedStringRandomIndicesNoNulls)->Apply(TakeSetArgs);
BENCHMARK(TakeChunkedChunkedStringRandomIndicesWithNulls)->Apply(TakeSetArgs);
+BENCHMARK(TakeChunkedChunkedStringFewRandomIndicesWithNulls)->Apply(TakeSetArgs);
BENCHMARK(TakeChunkedChunkedStringMonotonicIndices)->Apply(TakeSetArgs);
+BENCHMARK(TakeChunkedChunkedStringFewMonotonicIndices)->Apply(TakeSetArgs);
// Chunked values x Flat indices
BENCHMARK(TakeChunkedFlatInt64RandomIndicesNoNulls)->Apply(TakeSetArgs);
BENCHMARK(TakeChunkedFlatInt64RandomIndicesWithNulls)->Apply(TakeSetArgs);
+BENCHMARK(TakeChunkedFlatInt64FewRandomIndicesWithNulls)->Apply(TakeSetArgs);
BENCHMARK(TakeChunkedFlatInt64MonotonicIndices)->Apply(TakeSetArgs);
+BENCHMARK(TakeChunkedFlatInt64FewMonotonicIndices)->Apply(TakeSetArgs);
} // namespace compute
} // namespace arrow
From f078942ce2df68de8f48c3b4233132133601ca53 Mon Sep 17 00:00:00 2001
From: Adam Reeve
Date: Thu, 22 Aug 2024 02:59:04 +1200
Subject: [PATCH 10/32] GH-43141: [C++][Parquet] Replace use of int with
int32_t in the internal Parquet encryption APIs (#43413)
### Rationale for this change
See #43141
### What changes are included in this PR?
* Changes uses of int to int32_t in the Encryptor and Decryptor APIs, except where interfacing with OpenSSL.
* Also change RandBytes to use size_t instead of int and check for overflow.
* Check the return code from OpenSSL's Rand_bytes in case there is a failure generating random bytes
### Are these changes tested?
Yes, this doesn't change behaviour and is covered by existing tests.
### Are there any user-facing changes?
No
* GitHub Issue: #43141
Authored-by: Adam Reeve
Signed-off-by: Antoine Pitrou
---
cpp/src/parquet/column_reader.cc | 4 +-
cpp/src/parquet/encryption/crypto_factory.cc | 6 +-
.../parquet/encryption/encryption_internal.cc | 251 ++++++++++--------
.../parquet/encryption/encryption_internal.h | 46 ++--
.../encryption/encryption_internal_nossl.cc | 47 ++--
.../encryption/encryption_internal_test.cc | 22 +-
.../parquet/encryption/file_key_wrapper.cc | 4 +-
.../encryption/internal_file_decryptor.cc | 12 +-
.../encryption/internal_file_decryptor.h | 8 +-
.../encryption/internal_file_encryptor.cc | 10 +-
.../encryption/internal_file_encryptor.h | 6 +-
.../encryption/key_toolkit_internal.cc | 2 +-
cpp/src/parquet/metadata.cc | 6 +-
cpp/src/parquet/thrift_internal.h | 2 +-
14 files changed, 233 insertions(+), 193 deletions(-)
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 05ee6a16c5448..60a8a2176b0a8 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -468,8 +468,8 @@ std::shared_ptr SerializedPageReader::NextPage() {
// Advance the stream offset
PARQUET_THROW_NOT_OK(stream_->Advance(header_size));
- int compressed_len = current_page_header_.compressed_page_size;
- int uncompressed_len = current_page_header_.uncompressed_page_size;
+ int32_t compressed_len = current_page_header_.compressed_page_size;
+ int32_t uncompressed_len = current_page_header_.uncompressed_page_size;
if (compressed_len < 0 || uncompressed_len < 0) {
throw ParquetException("Invalid page header");
}
diff --git a/cpp/src/parquet/encryption/crypto_factory.cc b/cpp/src/parquet/encryption/crypto_factory.cc
index 72506bdc014b6..56069d559771c 100644
--- a/cpp/src/parquet/encryption/crypto_factory.cc
+++ b/cpp/src/parquet/encryption/crypto_factory.cc
@@ -72,8 +72,7 @@ std::shared_ptr CryptoFactory::GetFileEncryptionProper
int dek_length = dek_length_bits / 8;
std::string footer_key(dek_length, '\0');
- RandBytes(reinterpret_cast(&footer_key[0]),
- static_cast(footer_key.size()));
+ RandBytes(reinterpret_cast(footer_key.data()), footer_key.size());
std::string footer_key_metadata =
key_wrapper.GetEncryptionKeyMetadata(footer_key, footer_key_id, true);
@@ -148,8 +147,7 @@ ColumnPathToEncryptionPropertiesMap CryptoFactory::GetColumnEncryptionProperties
}
std::string column_key(dek_length, '\0');
- RandBytes(reinterpret_cast(&column_key[0]),
- static_cast(column_key.size()));
+ RandBytes(reinterpret_cast(column_key.data()), column_key.size());
std::string column_key_key_metadata =
key_wrapper->GetEncryptionKeyMetadata(column_key, column_key_id, false);
diff --git a/cpp/src/parquet/encryption/encryption_internal.cc b/cpp/src/parquet/encryption/encryption_internal.cc
index 99d1707f4a8d4..a0d9367b619c6 100644
--- a/cpp/src/parquet/encryption/encryption_internal.cc
+++ b/cpp/src/parquet/encryption/encryption_internal.cc
@@ -18,6 +18,7 @@
#include "parquet/encryption/encryption_internal.h"
#include
+#include
#include
#include
@@ -36,10 +37,10 @@ using parquet::ParquetException;
namespace parquet::encryption {
-constexpr int kGcmMode = 0;
-constexpr int kCtrMode = 1;
-constexpr int kCtrIvLength = 16;
-constexpr int kBufferSizeLength = 4;
+constexpr int32_t kGcmMode = 0;
+constexpr int32_t kCtrMode = 1;
+constexpr int32_t kCtrIvLength = 16;
+constexpr int32_t kBufferSizeLength = 4;
#define ENCRYPT_INIT(CTX, ALG) \
if (1 != EVP_EncryptInit_ex(CTX, ALG, nullptr, nullptr, nullptr)) { \
@@ -53,17 +54,17 @@ constexpr int kBufferSizeLength = 4;
class AesEncryptor::AesEncryptorImpl {
public:
- explicit AesEncryptorImpl(ParquetCipher::type alg_id, int key_len, bool metadata,
+ explicit AesEncryptorImpl(ParquetCipher::type alg_id, int32_t key_len, bool metadata,
bool write_length);
~AesEncryptorImpl() { WipeOut(); }
- int Encrypt(span plaintext, span key,
- span aad, span ciphertext);
+ int32_t Encrypt(span plaintext, span key,
+ span aad, span ciphertext);
- int SignedFooterEncrypt(span footer, span key,
- span aad, span nonce,
- span encrypted_footer);
+ int32_t SignedFooterEncrypt(span footer, span key,
+ span aad, span nonce,
+ span encrypted_footer);
void WipeOut() {
if (nullptr != ctx_) {
EVP_CIPHER_CTX_free(ctx_);
@@ -89,21 +90,22 @@ class AesEncryptor::AesEncryptorImpl {
private:
EVP_CIPHER_CTX* ctx_;
- int aes_mode_;
- int key_length_;
- int ciphertext_size_delta_;
- int length_buffer_length_;
+ int32_t aes_mode_;
+ int32_t key_length_;
+ int32_t ciphertext_size_delta_;
+ int32_t length_buffer_length_;
- int GcmEncrypt(span plaintext, span key,
- span nonce, span aad,
- span ciphertext);
+ int32_t GcmEncrypt(span plaintext, span key,
+ span nonce, span aad,
+ span ciphertext);
- int CtrEncrypt(span plaintext, span key,
- span nonce, span ciphertext);
+ int32_t CtrEncrypt(span plaintext, span key,
+ span nonce, span ciphertext);
};
-AesEncryptor::AesEncryptorImpl::AesEncryptorImpl(ParquetCipher::type alg_id, int key_len,
- bool metadata, bool write_length) {
+AesEncryptor::AesEncryptorImpl::AesEncryptorImpl(ParquetCipher::type alg_id,
+ int32_t key_len, bool metadata,
+ bool write_length) {
openssl::EnsureInitialized();
ctx_ = nullptr;
@@ -151,11 +153,9 @@ AesEncryptor::AesEncryptorImpl::AesEncryptorImpl(ParquetCipher::type alg_id, int
}
}
-int AesEncryptor::AesEncryptorImpl::SignedFooterEncrypt(span footer,
- span key,
- span aad,
- span nonce,
- span encrypted_footer) {
+int32_t AesEncryptor::AesEncryptorImpl::SignedFooterEncrypt(
+ span footer, span key, span aad,
+ span nonce, span encrypted_footer) {
if (static_cast(key_length_) != key.size()) {
std::stringstream ss;
ss << "Wrong key length " << key.size() << ". Should be " << key_length_;
@@ -176,10 +176,10 @@ int AesEncryptor::AesEncryptorImpl::SignedFooterEncrypt(span foot
return GcmEncrypt(footer, key, nonce, aad, encrypted_footer);
}
-int AesEncryptor::AesEncryptorImpl::Encrypt(span plaintext,
- span key,
- span aad,
- span ciphertext) {
+int32_t AesEncryptor::AesEncryptorImpl::Encrypt(span plaintext,
+ span key,
+ span aad,
+ span ciphertext) {
if (static_cast(key_length_) != key.size()) {
std::stringstream ss;
ss << "Wrong key length " << key.size() << ". Should be " << key_length_;
@@ -205,13 +205,13 @@ int AesEncryptor::AesEncryptorImpl::Encrypt(span plaintext,
return CtrEncrypt(plaintext, key, nonce, ciphertext);
}
-int AesEncryptor::AesEncryptorImpl::GcmEncrypt(span plaintext,
- span key,
- span nonce,
- span aad,
- span ciphertext) {
+int32_t AesEncryptor::AesEncryptorImpl::GcmEncrypt(span plaintext,
+ span key,
+ span nonce,
+ span aad,
+ span ciphertext) {
int len;
- int ciphertext_len;
+ int32_t ciphertext_len;
std::array tag{};
@@ -227,12 +227,22 @@ int AesEncryptor::AesEncryptorImpl::GcmEncrypt(span plaintext,
}
// Setting additional authenticated data
+ if (aad.size() > static_cast(std::numeric_limits::max())) {
+ std::stringstream ss;
+ ss << "AAD size " << aad.size() << " overflows int";
+ throw ParquetException(ss.str());
+ }
if ((!aad.empty()) && (1 != EVP_EncryptUpdate(ctx_, nullptr, &len, aad.data(),
static_cast(aad.size())))) {
throw ParquetException("Couldn't set AAD");
}
// Encryption
+ if (plaintext.size() > static_cast(std::numeric_limits::max())) {
+ std::stringstream ss;
+ ss << "Plaintext size " << plaintext.size() << " overflows int";
+ throw ParquetException(ss.str());
+ }
if (1 !=
EVP_EncryptUpdate(ctx_, ciphertext.data() + length_buffer_length_ + kNonceLength,
&len, plaintext.data(), static_cast(plaintext.size()))) {
@@ -256,7 +266,7 @@ int AesEncryptor::AesEncryptorImpl::GcmEncrypt(span plaintext,
}
// Copying the buffer size, nonce and tag to ciphertext
- int buffer_size = kNonceLength + ciphertext_len + kGcmTagLength;
+ int32_t buffer_size = kNonceLength + ciphertext_len + kGcmTagLength;
if (length_buffer_length_ > 0) {
ciphertext[3] = static_cast(0xff & (buffer_size >> 24));
ciphertext[2] = static_cast(0xff & (buffer_size >> 16));
@@ -271,12 +281,12 @@ int AesEncryptor::AesEncryptorImpl::GcmEncrypt(span plaintext,
return length_buffer_length_ + buffer_size;
}
-int AesEncryptor::AesEncryptorImpl::CtrEncrypt(span plaintext,
- span key,
- span nonce,
- span ciphertext) {
+int32_t AesEncryptor::AesEncryptorImpl::CtrEncrypt(span plaintext,
+ span key,
+ span nonce,
+ span ciphertext) {
int len;
- int ciphertext_len;
+ int32_t ciphertext_len;
if (nonce.size() != static_cast(kNonceLength)) {
std::stringstream ss;
@@ -298,6 +308,11 @@ int AesEncryptor::AesEncryptorImpl::CtrEncrypt(span plaintext,
}
// Encryption
+ if (plaintext.size() > static_cast(std::numeric_limits::max())) {
+ std::stringstream ss;
+ ss << "Plaintext size " << plaintext.size() << " overflows int";
+ throw ParquetException(ss.str());
+ }
if (1 !=
EVP_EncryptUpdate(ctx_, ciphertext.data() + length_buffer_length_ + kNonceLength,
&len, plaintext.data(), static_cast(plaintext.size()))) {
@@ -316,7 +331,7 @@ int AesEncryptor::AesEncryptorImpl::CtrEncrypt(span plaintext,
ciphertext_len += len;
// Copying the buffer size and nonce to ciphertext
- int buffer_size = kNonceLength + ciphertext_len;
+ int32_t buffer_size = kNonceLength + ciphertext_len;
if (length_buffer_length_ > 0) {
ciphertext[3] = static_cast(0xff & (buffer_size >> 24));
ciphertext[2] = static_cast(0xff & (buffer_size >> 16));
@@ -331,9 +346,11 @@ int AesEncryptor::AesEncryptorImpl::CtrEncrypt(span plaintext,
AesEncryptor::~AesEncryptor() {}
-int AesEncryptor::SignedFooterEncrypt(span footer, span key,
- span aad, span nonce,
- span encrypted_footer) {
+int32_t AesEncryptor::SignedFooterEncrypt(span footer,
+ span key,
+ span aad,
+ span nonce,
+ span encrypted_footer) {
return impl_->SignedFooterEncrypt(footer, key, aad, nonce, encrypted_footer);
}
@@ -343,25 +360,25 @@ int32_t AesEncryptor::CiphertextLength(int64_t plaintext_len) const {
return impl_->CiphertextLength(plaintext_len);
}
-int AesEncryptor::Encrypt(span plaintext, span key,
- span aad, span ciphertext) {
+int32_t AesEncryptor::Encrypt(span plaintext, span key,
+ span aad, span ciphertext) {
return impl_->Encrypt(plaintext, key, aad, ciphertext);
}
-AesEncryptor::AesEncryptor(ParquetCipher::type alg_id, int key_len, bool metadata,
+AesEncryptor::AesEncryptor(ParquetCipher::type alg_id, int32_t key_len, bool metadata,
bool write_length)
: impl_{std::unique_ptr(
new AesEncryptorImpl(alg_id, key_len, metadata, write_length))} {}
class AesDecryptor::AesDecryptorImpl {
public:
- explicit AesDecryptorImpl(ParquetCipher::type alg_id, int key_len, bool metadata,
+ explicit AesDecryptorImpl(ParquetCipher::type alg_id, int32_t key_len, bool metadata,
bool contains_length);
~AesDecryptorImpl() { WipeOut(); }
- int Decrypt(span