Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cpp/src/arrow/chunked_array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ bool ChunkedArray::ApproxEquals(const ChunkedArray& other,
.ok();
}

Result<std::shared_ptr<Scalar>> ChunkedArray::GetScalar(int64_t index) const {
for (const auto& chunk : chunks_) {
if (index < chunk->length()) {
return chunk->GetScalar(index);
}
index -= chunk->length();
}
return Status::Invalid("index out of bounds");
}

std::shared_ptr<ChunkedArray> ChunkedArray::Slice(int64_t offset, int64_t length) const {
ARROW_CHECK_LE(offset, length_) << "Slice offset greater than array length";
bool offset_equals_length = offset == length_;
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/chunked_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ class ARROW_EXPORT ChunkedArray {

const std::shared_ptr<DataType>& type() const { return type_; }

/// \brief Return a Scalar containing the value of this array at index
Result<std::shared_ptr<Scalar>> GetScalar(int64_t index) const;

/// \brief Determine if two chunked arrays are equal.
///
/// Two chunked arrays can be equal only if they have equal datatypes.
Expand Down
22 changes: 22 additions & 0 deletions cpp/src/arrow/chunked_array_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <gtest/gtest.h>

#include "arrow/chunked_array.h"
#include "arrow/scalar.h"
#include "arrow/status.h"
#include "arrow/testing/gtest_common.h"
#include "arrow/testing/gtest_util.h"
Expand Down Expand Up @@ -241,4 +242,25 @@ TEST_F(TestChunkedArray, View) {
AssertChunkedEqual(*expected, *result);
}

TEST_F(TestChunkedArray, GetScalar) {
auto ty = int32();
ArrayVector chunks{ArrayFromJSON(ty, "[6, 7, null]"), ArrayFromJSON(ty, "[]"),
ArrayFromJSON(ty, "[null]"), ArrayFromJSON(ty, "[3, 4, 5]")};
ChunkedArray carr(chunks);

auto check_scalar = [](const ChunkedArray& array, int64_t index,
const Scalar& expected) {
ASSERT_OK_AND_ASSIGN(auto actual, array.GetScalar(index));
AssertScalarsEqual(expected, *actual, /*verbose=*/true);
};

check_scalar(carr, 0, **MakeScalar(ty, 6));
check_scalar(carr, 2, *MakeNullScalar(ty));
check_scalar(carr, 3, *MakeNullScalar(ty));
check_scalar(carr, 4, **MakeScalar(ty, 3));
check_scalar(carr, 6, **MakeScalar(ty, 5));

ASSERT_RAISES(Invalid, carr.GetScalar(7));
}

} // namespace arrow
72 changes: 62 additions & 10 deletions cpp/src/arrow/compute/api_vector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ using internal::checked_cast;
using internal::checked_pointer_cast;

namespace internal {

using compute::DictionaryEncodeOptions;
using compute::FilterOptions;
using compute::NullPlacement;

template <>
struct EnumTraits<FilterOptions::NullSelectionBehavior>
: BasicEnumTraits<FilterOptions::NullSelectionBehavior, FilterOptions::DROP,
Expand Down Expand Up @@ -71,6 +74,21 @@ struct EnumTraits<DictionaryEncodeOptions::NullEncodingBehavior>
return "<INVALID>";
}
};
template <>
struct EnumTraits<NullPlacement>
: BasicEnumTraits<NullPlacement, NullPlacement::AtStart, NullPlacement::AtEnd> {
static std::string name() { return "NullPlacement"; }
static std::string value_name(NullPlacement value) {
switch (value) {
case NullPlacement::AtStart:
return "AtStart";
case NullPlacement::AtEnd:
return "AtEnd";
}
return "<INVALID>";
}
};

} // namespace internal

namespace compute {
Expand Down Expand Up @@ -106,11 +124,14 @@ static auto kDictionaryEncodeOptionsType =
GetFunctionOptionsType<DictionaryEncodeOptions>(DataMember(
"null_encoding_behavior", &DictionaryEncodeOptions::null_encoding_behavior));
static auto kArraySortOptionsType = GetFunctionOptionsType<ArraySortOptions>(
DataMember("order", &ArraySortOptions::order));
static auto kSortOptionsType =
GetFunctionOptionsType<SortOptions>(DataMember("sort_keys", &SortOptions::sort_keys));
DataMember("order", &ArraySortOptions::order),
DataMember("null_placement", &ArraySortOptions::null_placement));
static auto kSortOptionsType = GetFunctionOptionsType<SortOptions>(
DataMember("sort_keys", &SortOptions::sort_keys),
DataMember("null_placement", &SortOptions::null_placement));
static auto kPartitionNthOptionsType = GetFunctionOptionsType<PartitionNthOptions>(
DataMember("pivot", &PartitionNthOptions::pivot));
DataMember("pivot", &PartitionNthOptions::pivot),
DataMember("null_placement", &PartitionNthOptions::null_placement));
static auto kSelectKOptionsType = GetFunctionOptionsType<SelectKOptions>(
DataMember("k", &SelectKOptions::k),
DataMember("sort_keys", &SelectKOptions::sort_keys));
Expand All @@ -131,16 +152,22 @@ DictionaryEncodeOptions::DictionaryEncodeOptions(NullEncodingBehavior null_encod
null_encoding_behavior(null_encoding) {}
constexpr char DictionaryEncodeOptions::kTypeName[];

ArraySortOptions::ArraySortOptions(SortOrder order)
: FunctionOptions(internal::kArraySortOptionsType), order(order) {}
ArraySortOptions::ArraySortOptions(SortOrder order, NullPlacement null_placement)
: FunctionOptions(internal::kArraySortOptionsType),
order(order),
null_placement(null_placement) {}
constexpr char ArraySortOptions::kTypeName[];

SortOptions::SortOptions(std::vector<SortKey> sort_keys)
: FunctionOptions(internal::kSortOptionsType), sort_keys(std::move(sort_keys)) {}
SortOptions::SortOptions(std::vector<SortKey> sort_keys, NullPlacement null_placement)
: FunctionOptions(internal::kSortOptionsType),
sort_keys(std::move(sort_keys)),
null_placement(null_placement) {}
constexpr char SortOptions::kTypeName[];

PartitionNthOptions::PartitionNthOptions(int64_t pivot)
: FunctionOptions(internal::kPartitionNthOptionsType), pivot(pivot) {}
PartitionNthOptions::PartitionNthOptions(int64_t pivot, NullPlacement null_placement)
: FunctionOptions(internal::kPartitionNthOptionsType),
pivot(pivot),
null_placement(null_placement) {}
constexpr char PartitionNthOptions::kTypeName[];

SelectKOptions::SelectKOptions(int64_t k, std::vector<SortKey> sort_keys)
Expand All @@ -164,6 +191,14 @@ void RegisterVectorOptions(FunctionRegistry* registry) {
// ----------------------------------------------------------------------
// Direct exec interface to kernels

Result<std::shared_ptr<Array>> NthToIndices(const Array& values,
const PartitionNthOptions& options,
ExecContext* ctx) {
ARROW_ASSIGN_OR_RAISE(Datum result, CallFunction("partition_nth_indices",
{Datum(values)}, &options, ctx));
return result.make_array();
}

Result<std::shared_ptr<Array>> NthToIndices(const Array& values, int64_t n,
ExecContext* ctx) {
PartitionNthOptions options(/*pivot=*/n);
Expand All @@ -185,6 +220,14 @@ Result<Datum> ReplaceWithMask(const Datum& values, const Datum& mask,
return CallFunction("replace_with_mask", {values, mask, replacements}, ctx);
}

Result<std::shared_ptr<Array>> SortIndices(const Array& values,
const ArraySortOptions& options,
ExecContext* ctx) {
ARROW_ASSIGN_OR_RAISE(
Datum result, CallFunction("array_sort_indices", {Datum(values)}, &options, ctx));
return result.make_array();
}

Result<std::shared_ptr<Array>> SortIndices(const Array& values, SortOrder order,
ExecContext* ctx) {
ArraySortOptions options(order);
Expand All @@ -193,6 +236,15 @@ Result<std::shared_ptr<Array>> SortIndices(const Array& values, SortOrder order,
return result.make_array();
}

Result<std::shared_ptr<Array>> SortIndices(const ChunkedArray& chunked_array,
const ArraySortOptions& array_options,
ExecContext* ctx) {
SortOptions options({SortKey("", array_options.order)}, array_options.null_placement);
ARROW_ASSIGN_OR_RAISE(
Datum result, CallFunction("sort_indices", {Datum(chunked_array)}, &options, ctx));
return result.make_array();
}

Result<std::shared_ptr<Array>> SortIndices(const ChunkedArray& chunked_array,
SortOrder order, ExecContext* ctx) {
SortOptions options({SortKey("not-used", order)});
Expand Down
94 changes: 75 additions & 19 deletions cpp/src/arrow/compute/api_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ enum class SortOrder {
Descending,
};

enum class NullPlacement {
/// Place nulls and NaNs before any non-null values.
/// NaNs will come after nulls.
AtStart,
/// Place nulls and NaNs after any non-null values.
/// NaNs will come before nulls.
AtEnd,
};

/// \brief One sort key for PartitionNthIndices (TODO) and SortIndices
class ARROW_EXPORT SortKey : public util::EqualityComparable<SortKey> {
public:
Expand All @@ -106,22 +115,28 @@ class ARROW_EXPORT SortKey : public util::EqualityComparable<SortKey> {

class ARROW_EXPORT ArraySortOptions : public FunctionOptions {
public:
explicit ArraySortOptions(SortOrder order = SortOrder::Ascending);
explicit ArraySortOptions(SortOrder order = SortOrder::Ascending,
NullPlacement null_placement = NullPlacement::AtEnd);
constexpr static char const kTypeName[] = "ArraySortOptions";
static ArraySortOptions Defaults() { return ArraySortOptions(); }

/// Sorting order
SortOrder order;
/// Whether nulls and NaNs are placed at the start or at the end
NullPlacement null_placement;
};

class ARROW_EXPORT SortOptions : public FunctionOptions {
public:
explicit SortOptions(std::vector<SortKey> sort_keys = {});
explicit SortOptions(std::vector<SortKey> sort_keys = {},
NullPlacement null_placement = NullPlacement::AtEnd);
constexpr static char const kTypeName[] = "SortOptions";
static SortOptions Defaults() { return SortOptions(); }

/// Column key(s) to order by and how to order by these sort keys.
std::vector<SortKey> sort_keys;
/// Whether nulls and NaNs are placed at the start or at the end
NullPlacement null_placement;
};

/// \brief SelectK options
Expand Down Expand Up @@ -162,12 +177,15 @@ class ARROW_EXPORT SelectKOptions : public FunctionOptions {
/// \brief Partitioning options for NthToIndices
class ARROW_EXPORT PartitionNthOptions : public FunctionOptions {
public:
explicit PartitionNthOptions(int64_t pivot);
explicit PartitionNthOptions(int64_t pivot,
NullPlacement null_placement = NullPlacement::AtEnd);
PartitionNthOptions() : PartitionNthOptions(0) {}
constexpr static char const kTypeName[] = "PartitionNthOptions";

/// The index into the equivalent sorted array of the partition pivot element.
int64_t pivot;
/// Whether nulls and NaNs are partitioned at the start or at the end
NullPlacement null_placement;
};

/// @}
Expand Down Expand Up @@ -273,8 +291,7 @@ Result<Datum> DropNull(const Datum& values, ExecContext* ctx = NULLPTR);
ARROW_EXPORT
Result<std::shared_ptr<Array>> DropNull(const Array& values, ExecContext* ctx = NULLPTR);

/// \brief Returns indices that partition an array around n-th
/// sorted element.
/// \brief Return indices that partition an array around n-th sorted element.
///
/// Find index of n-th(0 based) smallest value and perform indirect
/// partition of an array around that element. Output indices[0 ~ n-1]
Expand All @@ -291,14 +308,27 @@ ARROW_EXPORT
Result<std::shared_ptr<Array>> NthToIndices(const Array& values, int64_t n,
ExecContext* ctx = NULLPTR);

/// \brief Returns the indices that would select the first `k` elements of the array in
/// the specified order.
/// \brief Return indices that partition an array around n-th sorted element.
///
/// This overload takes a PartitionNthOptions specifiying the pivot index
/// and the null handling.
///
/// \param[in] values array to be partitioned
/// \param[in] options options including pivot index and null handling
/// \param[in] ctx the function execution context, optional
/// \return offsets indices that would partition an array
ARROW_EXPORT
Result<std::shared_ptr<Array>> NthToIndices(const Array& values,
const PartitionNthOptions& options,
ExecContext* ctx = NULLPTR);

/// \brief Return indices that would select the first `k` elements.
///
// Perform an indirect sort of the datum, keeping only the first `k` elements. The output
// array will contain indices such that the item indicated by the k-th index will be in
// the position it would be if the datum were sorted by `options.sort_keys`. However,
// indices of null values will not be part of the output. The sort is not guaranteed to be
// stable.
/// Perform an indirect sort of the datum, keeping only the first `k` elements. The output
/// array will contain indices such that the item indicated by the k-th index will be in
/// the position it would be if the datum were sorted by `options.sort_keys`. However,
/// indices of null values will not be part of the output. The sort is not guaranteed to
/// be stable.
///
/// \param[in] datum datum to be partitioned
/// \param[in] options options
Expand All @@ -309,8 +339,7 @@ Result<std::shared_ptr<Array>> SelectKUnstable(const Datum& datum,
const SelectKOptions& options,
ExecContext* ctx = NULLPTR);

/// \brief Returns the indices that would sort an array in the
/// specified order.
/// \brief Return the indices that would sort an array.
///
/// Perform an indirect sort of array. The output array will contain
/// indices that would sort an array, which would be the same length
Expand All @@ -330,8 +359,21 @@ Result<std::shared_ptr<Array>> SortIndices(const Array& array,
SortOrder order = SortOrder::Ascending,
ExecContext* ctx = NULLPTR);

/// \brief Returns the indices that would sort a chunked array in the
/// specified order.
/// \brief Return the indices that would sort an array.
///
/// This overload takes a ArraySortOptions specifiying the sort order
/// and the null handling.
///
/// \param[in] array array to sort
/// \param[in] options options including sort order and null handling
/// \param[in] ctx the function execution context, optional
/// \return offsets indices that would sort an array
ARROW_EXPORT
Result<std::shared_ptr<Array>> SortIndices(const Array& array,
const ArraySortOptions& options,
ExecContext* ctx = NULLPTR);

/// \brief Return the indices that would sort a chunked array.
///
/// Perform an indirect sort of chunked array. The output array will
/// contain indices that would sort a chunked array, which would be
Expand All @@ -351,14 +393,28 @@ Result<std::shared_ptr<Array>> SortIndices(const ChunkedArray& chunked_array,
SortOrder order = SortOrder::Ascending,
ExecContext* ctx = NULLPTR);

/// \brief Returns the indices that would sort an input in the
/// \brief Return the indices that would sort a chunked array.
///
/// This overload takes a ArraySortOptions specifiying the sort order
/// and the null handling.
///
/// \param[in] chunked_array chunked array to sort
/// \param[in] options options including sort order and null handling
/// \param[in] ctx the function execution context, optional
/// \return offsets indices that would sort an array
ARROW_EXPORT
Result<std::shared_ptr<Array>> SortIndices(const ChunkedArray& chunked_array,
const ArraySortOptions& options,
ExecContext* ctx = NULLPTR);

/// \brief Return the indices that would sort an input in the
/// specified order. Input is one of array, chunked array record batch
/// or table.
///
/// Perform an indirect sort of input. The output array will contain
/// indices that would sort an input, which would be the same length
/// as input. Nulls will be stably partitioned to the end of the
/// output regardless of order.
/// as input. Nulls will be stably partitioned to the start or to the end
/// of the output depending on SortOrder::null_placement.
///
/// For example given input (table) = {
/// "column1": [[null, 1], [ 3, null, 2, 1]],
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ GroupByNode{"aggregate", inputs=[groupby: "project"], outputs=["filter"], keys=[
hash_count(multiply(i32, 2), {mode=NON_NULL}),
]}
FilterNode{"filter", inputs=[target: "aggregate"], outputs=["order_by_sink"], filter=(sum(multiply(i32, 2)) > 10)}
OrderBySinkNode{"order_by_sink", inputs=[collected: "filter"], by={sort_keys=[sum(multiply(i32, 2)) ASC]}}
OrderBySinkNode{"order_by_sink", inputs=[collected: "filter"], by={sort_keys=[sum(multiply(i32, 2)) ASC], null_placement=AtEnd}}
)a");

ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make());
Expand Down
Loading