2020#include " arrow/array/array_base.h"
2121#include " arrow/compute/kernels/common.h"
2222#include " arrow/result.h"
23+ #include " arrow/visitor_inline.h"
2324
2425namespace arrow {
2526namespace compute {
@@ -34,35 +35,79 @@ Status ListFlatten(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
3435 return Status::OK ();
3536}
3637
37- template <typename Type, typename offset_type = typename Type::offset_type>
38- Status ListParentIndices (KernelContext* ctx, const ExecBatch& batch, Datum* out) {
39- typename TypeTraits<Type>::ArrayType list (batch[0 ].array ());
40- ArrayData* out_arr = out->mutable_array ();
41-
42- const offset_type* offsets = list.raw_value_offsets ();
43- offset_type values_length = offsets[list.length ()] - offsets[0 ];
44-
45- out_arr->length = values_length;
46- out_arr->null_count = 0 ;
47- ARROW_ASSIGN_OR_RAISE (out_arr->buffers [1 ],
48- ctx->Allocate (values_length * sizeof (offset_type)));
49- auto out_indices = reinterpret_cast <offset_type*>(out_arr->buffers [1 ]->mutable_data ());
50- for (int64_t i = 0 ; i < list.length (); ++i) {
51- // Note: In most cases, null slots are empty, but when they are non-empty
52- // we write out the indices so make sure they are accounted for. This
53- // behavior could be changed if needed in the future.
54- for (offset_type j = offsets[i]; j < offsets[i + 1 ]; ++j) {
55- *out_indices++ = static_cast <offset_type>(i);
38+ struct ListParentIndicesArray {
39+ KernelContext* ctx;
40+ const std::shared_ptr<ArrayData>& input;
41+ int64_t base_output_offset;
42+ std::shared_ptr<ArrayData> out;
43+
44+ template <typename Type, typename offset_type = typename Type::offset_type>
45+ Status VisitList (const Type&) {
46+ typename TypeTraits<Type>::ArrayType list (input);
47+
48+ const offset_type* offsets = list.raw_value_offsets ();
49+ offset_type values_length = offsets[list.length ()] - offsets[0 ];
50+
51+ ARROW_ASSIGN_OR_RAISE (auto indices,
52+ ctx->Allocate (values_length * sizeof (offset_type)));
53+ auto out_indices = reinterpret_cast <offset_type*>(indices->mutable_data ());
54+ for (int64_t i = 0 ; i < list.length (); ++i) {
55+ // Note: In most cases, null slots are empty, but when they are non-empty
56+ // we write out the indices so make sure they are accounted for. This
57+ // behavior could be changed if needed in the future.
58+ for (offset_type j = offsets[i]; j < offsets[i + 1 ]; ++j) {
59+ *out_indices++ = static_cast <offset_type>(i + base_output_offset);
60+ }
5661 }
62+
63+ BufferVector buffers{nullptr , std::move (indices)};
64+ int64_t null_count = 0 ;
65+ if (sizeof (offset_type) == 4 ) {
66+ out = std::make_shared<ArrayData>(int32 (), values_length, std::move (buffers),
67+ null_count);
68+ } else {
69+ out = std::make_shared<ArrayData>(int64 (), values_length, std::move (buffers),
70+ null_count);
71+ }
72+ return Status::OK ();
73+ }
74+
75+ Status Visit (const ListType& type) { return VisitList (type); }
76+
77+ Status Visit (const LargeListType& type) { return VisitList (type); }
78+
79+ Status Visit (const DataType& type) {
80+ return Status::TypeError (" Function 'list_parent_indices' expects list input, got " ,
81+ type.ToString ());
5782 }
58- return Status::OK ();
59- }
6083
61- Result<ValueDescr> ValuesType (KernelContext*, const std::vector<ValueDescr>& args) {
84+ static Result<std::shared_ptr<ArrayData>> Exec (KernelContext* ctx,
85+ const std::shared_ptr<ArrayData>& input,
86+ int64_t base_output_offset = 0 ) {
87+ ListParentIndicesArray self{ctx, input, base_output_offset, /* out=*/ nullptr };
88+ RETURN_NOT_OK (VisitTypeInline (*input->type , &self));
89+ DCHECK_NE (self.out , nullptr );
90+ return self.out ;
91+ }
92+ };
93+
94+ Result<ValueDescr> ListValuesType (KernelContext*, const std::vector<ValueDescr>& args) {
6295 const auto & list_type = checked_cast<const BaseListType&>(*args[0 ].type );
6396 return ValueDescr::Array (list_type.value_type ());
6497}
6598
99+ Result<std::shared_ptr<DataType>> ListParentIndicesType (const DataType& input_type) {
100+ switch (input_type.id ()) {
101+ case Type::LIST:
102+ return int32 ();
103+ case Type::LARGE_LIST:
104+ return int64 ();
105+ default :
106+ return Status::TypeError (" Function 'list_parent_indices' expects list input, got " ,
107+ input_type.ToString ());
108+ }
109+ }
110+
66111const FunctionDoc list_flatten_doc (
67112 " Flatten list values" ,
68113 (" `lists` must have a list-like type.\n "
@@ -77,24 +122,53 @@ const FunctionDoc list_parent_indices_doc(
77122 " is emitted." ),
78123 {" lists" });
79124
125+ class ListParentIndicesFunction : public MetaFunction {
126+ public:
127+ ListParentIndicesFunction ()
128+ : MetaFunction(" list_parent_indices" , Arity::Unary(), &list_parent_indices_doc) {}
129+
130+ Result<Datum> ExecuteImpl (const std::vector<Datum>& args,
131+ const FunctionOptions* options,
132+ ExecContext* ctx) const override {
133+ KernelContext kernel_ctx (ctx);
134+ switch (args[0 ].kind ()) {
135+ case Datum::ARRAY:
136+ return ListParentIndicesArray::Exec (&kernel_ctx, args[0 ].array ());
137+ case Datum::CHUNKED_ARRAY: {
138+ const auto & input = args[0 ].chunked_array ();
139+ ARROW_ASSIGN_OR_RAISE (auto out_ty, ListParentIndicesType (*input->type ()));
140+
141+ int64_t base_output_offset = 0 ;
142+ ArrayVector out_chunks;
143+ for (const auto & chunk : input->chunks ()) {
144+ ARROW_ASSIGN_OR_RAISE (auto out_chunk,
145+ ListParentIndicesArray::Exec (&kernel_ctx, chunk->data (),
146+ base_output_offset));
147+ out_chunks.push_back (MakeArray (std::move (out_chunk)));
148+ base_output_offset += chunk->length ();
149+ }
150+ return std::make_shared<ChunkedArray>(std::move (out_chunks), std::move (out_ty));
151+ }
152+ default :
153+ return Status::NotImplemented (
154+ " Unsupported input type for function 'list_parent_indices': " ,
155+ args[0 ].ToString ());
156+ }
157+ }
158+ };
159+
80160} // namespace
81161
82162void RegisterVectorNested (FunctionRegistry* registry) {
83163 auto flatten =
84164 std::make_shared<VectorFunction>(" list_flatten" , Arity::Unary (), &list_flatten_doc);
85- DCHECK_OK (flatten->AddKernel ({InputType::Array (Type::LIST)}, OutputType (ValuesType ),
165+ DCHECK_OK (flatten->AddKernel ({InputType::Array (Type::LIST)}, OutputType (ListValuesType ),
86166 ListFlatten<ListType>));
87167 DCHECK_OK (flatten->AddKernel ({InputType::Array (Type::LARGE_LIST)},
88- OutputType (ValuesType ), ListFlatten<LargeListType>));
168+ OutputType (ListValuesType ), ListFlatten<LargeListType>));
89169 DCHECK_OK (registry->AddFunction (std::move (flatten)));
90170
91- auto list_parent_indices = std::make_shared<VectorFunction>(
92- " list_parent_indices" , Arity::Unary (), &list_parent_indices_doc);
93- DCHECK_OK (list_parent_indices->AddKernel ({InputType::Array (Type::LIST)}, int32 (),
94- ListParentIndices<ListType>));
95- DCHECK_OK (list_parent_indices->AddKernel ({InputType::Array (Type::LARGE_LIST)}, int64 (),
96- ListParentIndices<LargeListType>));
97- DCHECK_OK (registry->AddFunction (std::move (list_parent_indices)));
171+ DCHECK_OK (registry->AddFunction (std::make_shared<ListParentIndicesFunction>()));
98172}
99173
100174} // namespace internal
0 commit comments