Skip to content

Commit 7ad49ee

Browse files
wesmkou
andcommitted
ARROW-8792: [C++][Python][R][GLib] New Array compute kernels implementation and execution framework
This patch is a major reworking of our development strategy for implementing array-valued functions and applying them in a query processing setting. The design was partly inspired by my previous work designing Ibis (https://github.com/ibis-project/ibis -- the "expr" subsystem and the way that operators validate input types and resolve output types). Using only function names and input types, you can determine the output types of each function and resolve the "execute" function that performs a unit of work processing a batch of data. This will allow us to build deferred column expressions and then (eventually) do parallel execution. There are a ton of details, but one nice thing is that there is now a single API entry point for invoking any function by its name: ```c++ Result<Datum> CallFunction(ExecContext* ctx, const std::string& func_name, const std::vector<Datum>& args, const FunctionOptions* options = NULLPTR); ``` What occurs when you do this: * A `Function` instance is looked up in the global `FunctionRegistry` * Given the descriptors of `args` (their types and shapes -- array or scalar), the Function searches for `Kernel` that is able to process those types and shapes. A kernel might be able to do `array[T0], array[T1]` or only `scalar[T0], scalar[T1]`, for example. This permits kernel specialization to treat different type and shape combinations * The kernel is executed iteratively against `args` based on what `args` contains -- if there are ChunkedArrays, they will be split into contiguous pieces. Kernels never see ChunkedArray, only Array or Scalar * The Executor implementation is able to split contiguous Array inputs into smaller chunks, which is important for parallel execution. See `ExecContext::set_exec_chunksize` To summarize: the REGISTRY contains FUNCTIONS. A FUNCTION contains KERNELS. A KERNEL is a specific implementation of a function that services a particular type combination. An additional effort in this patch is to radically simplify the process of creating kernels that are based on a scalar function. To do this, there is a growing collection of template-based kernel generation classes in compute/kernels/codegen_internal.h that will surely be the topic of much debate. I want to make it a lot easier for people to add new kernels. There are some other incidental changes in the PR, such as changing the convenience APIs like `Cast` to return `Result`. I'm afraid we may have to live with the API breakage unless someone else wants to add backward compatibility code for the old APIs. I have to apologize for making such a large PR. I've been working long hours on this for nearly a month and the process of porting all of our existing functionality and making the unit tests pass caused much iteration in the "framework" part of the code, such that it would have been a huge time drain to review incomplete iterations of the framework that had not been proven to capture the functionality that previously existed in the project. Given the size of this PR and that fact that it completely blocks any work into src/arrow/compute, I don't think we should let this sit unmerged for more than 4 or 5 days, tops. I'm committed to responding to all of your questions and working to address your feedback about the design and improving the documentation and code comments. I tried to leave copious comments to explain my thought process in various places. Feel free to make any and all comments in this PR or in whatever form you like. I don't think that merging should be blocked on stylistic issues. Closes #7240 from wesm/ARROW-8792-kernels-revamp Lead-authored-by: Wes McKinney <wesm+git@apache.org> Co-authored-by: Sutou Kouhei <kou@clear-code.com> Signed-off-by: Wes McKinney <wesm+git@apache.org>
1 parent 6e84421 commit 7ad49ee

File tree

158 files changed

+13976
-10897
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

158 files changed

+13976
-10897
lines changed

c_glib/arrow-glib/compute.cpp

Lines changed: 211 additions & 415 deletions
Large diffs are not rendered by default.

c_glib/arrow-glib/error.cpp

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,18 @@ G_BEGIN_DECLS
3939

4040
G_DEFINE_QUARK(garrow-error-quark, garrow_error)
4141

42-
static GArrowError
43-
garrow_error_code(const arrow::Status &status)
42+
G_END_DECLS
43+
44+
gboolean
45+
garrow_error_check(GError **error,
46+
const arrow::Status &status,
47+
const char *context)
48+
{
49+
return garrow::check(error, status, context);
50+
}
51+
52+
GArrowError
53+
garrow_error_from_status(const arrow::Status &status)
4454
{
4555
switch (status.code()) {
4656
case arrow::StatusCode::OK:
@@ -73,50 +83,38 @@ garrow_error_code(const arrow::Status &status)
7383
return GARROW_ERROR_EXECUTION;
7484
case arrow::StatusCode::AlreadyExists:
7585
return GARROW_ERROR_ALREADY_EXISTS;
76-
7786
default:
7887
return GARROW_ERROR_UNKNOWN;
7988
}
8089
}
8190

82-
G_END_DECLS
91+
arrow::Status
92+
garrow_error_to_status(GError *error,
93+
arrow::StatusCode code,
94+
const char *context)
95+
{
96+
std::stringstream message;
97+
message << context << ": " << g_quark_to_string(error->domain);
98+
message << "(" << error->code << "): ";
99+
message << error->message;
100+
g_error_free(error);
101+
return arrow::Status(code, message.str());
102+
}
83103

84104
namespace garrow {
85-
gboolean
86-
check(GError **error,
87-
const arrow::Status &status,
88-
const char *context) {
105+
gboolean check(GError **error,
106+
const arrow::Status &status,
107+
const char *context) {
89108
if (status.ok()) {
90109
return TRUE;
91110
} else {
92111
g_set_error(error,
93112
GARROW_ERROR,
94-
garrow_error_code(status),
113+
garrow_error_from_status(status),
95114
"%s: %s",
96115
context,
97116
status.ToString().c_str());
98117
return FALSE;
99118
}
100119
}
101120
}
102-
103-
gboolean
104-
garrow_error_check(GError **error,
105-
const arrow::Status &status,
106-
const char *context)
107-
{
108-
return garrow::check(error, status, context);
109-
}
110-
111-
arrow::Status
112-
garrow_error_to_status(GError *error,
113-
arrow::StatusCode code,
114-
const char *context)
115-
{
116-
std::stringstream message;
117-
message << context << ": " << g_quark_to_string(error->domain);
118-
message << "(" << error->code << "): ";
119-
message << error->message;
120-
g_error_free(error);
121-
return arrow::Status(code, message.str());
122-
}

c_glib/arrow-glib/error.hpp

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,48 @@
2323

2424
#include <arrow-glib/error.h>
2525

26+
gboolean garrow_error_check(GError **error,
27+
const arrow::Status &status,
28+
const char *context);
29+
GArrowError garrow_error_from_status(const arrow::Status &status);
30+
arrow::Status garrow_error_to_status(GError *error,
31+
arrow::StatusCode code,
32+
const char *context);
33+
2634
namespace garrow {
2735
gboolean check(GError **error,
2836
const arrow::Status &status,
2937
const char *context);
3038

31-
template <typename TYPE>
39+
template <typename CONTEXT_FUNC>
3240
gboolean check(GError **error,
33-
const arrow::Result<TYPE> &result,
34-
const char *context) {
35-
if (result.ok()) {
41+
const arrow::Status &status,
42+
CONTEXT_FUNC &&context_func) {
43+
if (status.ok()) {
3644
return TRUE;
3745
} else {
38-
return check(error, result.status(), context);
46+
std::string context = std::move(context_func());
47+
g_set_error(error,
48+
GARROW_ERROR,
49+
garrow_error_from_status(status),
50+
"%s: %s",
51+
context.c_str(),
52+
status.ToString().c_str());
53+
return FALSE;
3954
}
4055
}
41-
}
4256

43-
gboolean garrow_error_check(GError **error,
44-
const arrow::Status &status,
45-
const char *context);
46-
arrow::Status garrow_error_to_status(GError *error,
47-
arrow::StatusCode code,
48-
const char *context);
57+
template <typename TYPE>
58+
gboolean check(GError **error,
59+
const arrow::Result<TYPE> &result,
60+
const char *context) {
61+
return check(error, result.status(), context);
62+
}
63+
64+
template <typename TYPE, typename CONTEXT_FUNC>
65+
gboolean check(GError **error,
66+
const arrow::Result<TYPE> &result,
67+
CONTEXT_FUNC &&context_func) {
68+
return check(error, result.status(), context_func);
69+
}
70+
}

cpp/src/arrow/CMakeLists.txt

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ endfunction()
121121

122122
set(ARROW_SRCS
123123
array.cc
124-
builder.cc
125124
array/builder_adaptive.cc
126125
array/builder_base.cc
127126
array/builder_binary.cc
@@ -134,8 +133,10 @@ set(ARROW_SRCS
134133
array/dict_internal.cc
135134
array/diff.cc
136135
array/validate.cc
136+
builder.cc
137137
buffer.cc
138138
compare.cc
139+
datum.cc
139140
device.cc
140141
extension_type.cc
141142
memory_pool.cc
@@ -319,24 +320,30 @@ endif()
319320

320321
if(ARROW_COMPUTE)
321322
list(APPEND ARROW_SRCS
322-
compute/context.cc
323-
compute/kernels/aggregate.cc
324-
compute/kernels/boolean.cc
325-
compute/kernels/cast.cc
326-
compute/kernels/compare.cc
327-
compute/kernels/count.cc
328-
compute/kernels/hash.cc
329-
compute/kernels/filter.cc
330-
compute/kernels/mean.cc
331-
compute/kernels/minmax.cc
332-
compute/kernels/sort_to_indices.cc
333-
compute/kernels/nth_to_indices.cc
334-
compute/kernels/sum.cc
335-
compute/kernels/add.cc
336-
compute/kernels/take.cc
337-
compute/kernels/isin.cc
338-
compute/kernels/match.cc
339-
compute/kernels/util_internal.cc)
323+
compute/api_aggregate.cc
324+
compute/api_scalar.cc
325+
compute/api_vector.cc
326+
compute/cast.cc
327+
compute/exec.cc
328+
compute/function.cc
329+
compute/kernel.cc
330+
compute/registry.cc
331+
compute/kernels/aggregate_basic.cc
332+
compute/kernels/codegen_internal.cc
333+
compute/kernels/scalar_arithmetic.cc
334+
compute/kernels/scalar_boolean.cc
335+
compute/kernels/scalar_cast_boolean.cc
336+
compute/kernels/scalar_cast_internal.cc
337+
compute/kernels/scalar_cast_nested.cc
338+
compute/kernels/scalar_cast_numeric.cc
339+
compute/kernels/scalar_cast_string.cc
340+
compute/kernels/scalar_cast_temporal.cc
341+
compute/kernels/scalar_compare.cc
342+
compute/kernels/scalar_set_lookup.cc
343+
compute/kernels/vector_filter.cc
344+
compute/kernels/vector_hash.cc
345+
compute/kernels/vector_sort.cc
346+
compute/kernels/vector_take.cc)
340347
endif()
341348

342349
if(ARROW_FILESYSTEM)
@@ -524,6 +531,7 @@ endif()
524531

525532
add_arrow_test(misc_test
526533
SOURCES
534+
datum_test.cc
527535
memory_pool_test.cc
528536
result_test.cc
529537
pretty_print_test.cc

cpp/src/arrow/adapters/orc/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ elseif(NOT MSVC)
4444
set(ORC_MIN_TEST_LIBS ${ORC_MIN_TEST_LIBS} pthread ${CMAKE_DL_LIBS})
4545
endif()
4646

47-
set(ORC_STATIC_TEST_LINK_LIBS ${ORC_MIN_TEST_LIBS} ${ARROW_LIBRARIES_FOR_STATIC_TESTS}
47+
set(ORC_STATIC_TEST_LINK_LIBS ${ARROW_LIBRARIES_FOR_STATIC_TESTS} ${ORC_MIN_TEST_LIBS}
4848
orc::liborc)
4949

5050
add_arrow_test(adapter_test

cpp/src/arrow/array/diff_test.cc

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@
3333
#include "arrow/array/diff.h"
3434
#include "arrow/buffer.h"
3535
#include "arrow/builder.h"
36-
#include "arrow/compute/context.h"
37-
#include "arrow/compute/kernels/filter.h"
36+
#include "arrow/compute/api.h"
3837
#include "arrow/status.h"
3938
#include "arrow/testing/gtest_common.h"
4039
#include "arrow/testing/random.h"
@@ -119,20 +118,19 @@ class DiffTest : public ::testing::Test {
119118

120119
void BaseAndTargetFromRandomFilter(std::shared_ptr<Array> values,
121120
double filter_probability) {
122-
compute::Datum out_datum, base_filter, target_filter;
121+
std::shared_ptr<Array> base_filter, target_filter;
123122
do {
124123
base_filter = this->rng_.Boolean(values->length(), filter_probability, 0.0);
125124
target_filter = this->rng_.Boolean(values->length(), filter_probability, 0.0);
126-
} while (base_filter.Equals(target_filter));
125+
} while (base_filter->Equals(target_filter));
127126

128-
ASSERT_OK(compute::Filter(&ctx_, values, base_filter, {}, &out_datum));
127+
ASSERT_OK_AND_ASSIGN(Datum out_datum, compute::Filter(values, base_filter));
129128
base_ = out_datum.make_array();
130129

131-
ASSERT_OK(compute::Filter(&ctx_, values, target_filter, {}, &out_datum));
130+
ASSERT_OK_AND_ASSIGN(out_datum, compute::Filter(values, target_filter));
132131
target_ = out_datum.make_array();
133132
}
134133

135-
compute::FunctionContext ctx_;
136134
random::RandomArrayGenerator rng_;
137135
std::shared_ptr<StructArray> edits_;
138136
std::shared_ptr<Array> base_, target_;
@@ -616,7 +614,6 @@ void MakeSameLength(std::shared_ptr<Array>* a, std::shared_ptr<Array>* b) {
616614
}
617615

618616
TEST_F(DiffTest, CompareRandomStruct) {
619-
compute::FunctionContext ctx;
620617
for (auto null_probability : {0.0, 0.25}) {
621618
constexpr auto length = 1 << 10;
622619
auto int32_values = this->rng_.Int32(length, 0, 127, null_probability);

cpp/src/arrow/compute/CMakeLists.txt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,12 @@ function(ADD_ARROW_COMPUTE_TEST REL_TEST_NAME)
5858
${ARG_UNPARSED_ARGUMENTS})
5959
endfunction()
6060

61-
add_arrow_compute_test(compute_test)
62-
add_arrow_benchmark(compute_benchmark)
61+
add_arrow_compute_test(internals_test
62+
SOURCES
63+
function_test.cc
64+
exec_test.cc
65+
kernel_test.cc
66+
registry_test.cc)
67+
add_arrow_compute_test(exec_test)
6368

6469
add_subdirectory(kernels)

cpp/src/arrow/compute/README.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
## Apache Arrow C++ Compute Functions
21+
22+
This submodule contains analytical functions that process primarily Arrow
23+
columnar data; some functions can process scalar or Arrow-based array
24+
inputs. These are intended for use inside query engines, data frame libraries,
25+
etc.
26+
27+
Many functions have SQL-like semantics in that they perform elementwise or
28+
scalar operations on whole arrays at a time. Other functions are not SQL-like
29+
and compute results that may be a different length or whose results depend on
30+
the order of the values.
31+
32+
Some basic terminology:
33+
34+
* We use the term "function" to refer to particular general operation that may
35+
have many different implementations corresponding to different combinations
36+
of types or function behavior options.
37+
* We call a specific implementation of a function a "kernel". When executing a
38+
function on inputs, we must first select a suitable kernel (kernel selection
39+
is called "dispatching") corresponding to the value types of the inputs
40+
* Functions along with their kernel implementations are collected in a
41+
"function registry". Given a function name and argument types, we can look up
42+
that function and dispatch to a compatible kernel.
43+
44+
Types of functions
45+
46+
* Scalar functions: elementwise functions that perform scalar operations in a
47+
vectorized manner. These functions are generally valid for SQL-like
48+
context. These are called "scalar" in that the functions executed consider
49+
each value in an array independently, and the output array or arrays have the
50+
same length as the input arrays. The result for each array cell is generally
51+
independent of its position in the array.
52+
* Vector functions, which produce a result whose output is generally dependent
53+
on the entire contents of the input arrays. These functions **are generally
54+
not valid** for SQL-like processing because the output size may be different
55+
than the input size, and the result may change based on the order of the
56+
values in the array. This includes things like array subselection, sorting,
57+
hashing, and more.
58+
* Scalar aggregate functions of which can be used in a SQL-like context

cpp/src/arrow/compute/api.h

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,17 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
#pragma once
18+
// NOTE: API is EXPERIMENTAL and will change without going through a
19+
// deprecation cycle
1920

20-
#include "arrow/compute/context.h" // IWYU pragma: export
21-
#include "arrow/compute/kernel.h" // IWYU pragma: export
21+
#pragma once
2222

23-
#include "arrow/compute/kernels/boolean.h" // IWYU pragma: export
24-
#include "arrow/compute/kernels/cast.h" // IWYU pragma: export
25-
#include "arrow/compute/kernels/compare.h" // IWYU pragma: export
26-
#include "arrow/compute/kernels/count.h" // IWYU pragma: export
27-
#include "arrow/compute/kernels/filter.h" // IWYU pragma: export
28-
#include "arrow/compute/kernels/hash.h" // IWYU pragma: export
29-
#include "arrow/compute/kernels/isin.h" // IWYU pragma: export
30-
#include "arrow/compute/kernels/mean.h" // IWYU pragma: export
31-
#include "arrow/compute/kernels/nth_to_indices.h" // IWYU pragma: export
32-
#include "arrow/compute/kernels/sort_to_indices.h" // IWYU pragma: export
33-
#include "arrow/compute/kernels/sum.h" // IWYU pragma: export
34-
#include "arrow/compute/kernels/take.h" // IWYU pragma: export
23+
#include "arrow/compute/api_aggregate.h" // IWYU pragma: export
24+
#include "arrow/compute/api_scalar.h" // IWYU pragma: export
25+
#include "arrow/compute/api_vector.h" // IWYU pragma: export
26+
#include "arrow/compute/cast.h" // IWYU pragma: export
27+
#include "arrow/compute/exec.h" // IWYU pragma: export
28+
#include "arrow/compute/function.h" // IWYU pragma: export
29+
#include "arrow/compute/kernel.h" // IWYU pragma: export
30+
#include "arrow/compute/registry.h" // IWYU pragma: export
31+
#include "arrow/datum.h" // IWYU pragma: export

0 commit comments

Comments
 (0)