Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
32cfbb4
ARROW-11591: [C++] Prototype version of hash group by
michalursa Feb 11, 2021
3f65726
extract sum and count to GroupedAggregator interface
bkietz Feb 16, 2021
3792de8
implement sum for more DataTypes
bkietz Feb 17, 2021
4fe0613
Add support for multiple key columns in group by (no testing yet)
michalursa Feb 19, 2021
c72a178
add randomized testing for group_by
bkietz Feb 19, 2021
3085d04
Fixing bugs in group_by. Current tests should be passing now.
michalursa Feb 22, 2021
e7fee76
formatting
bkietz Feb 22, 2021
879b6fd
add simple group_by benchmark
bkietz Feb 22, 2021
6377dd9
add group_by benchmark keyed on integers
bkietz Feb 22, 2021
04e4f76
store key bytes -> group id mapping with unordered_map
bkietz Feb 22, 2021
2b7b76e
repair null bitmap materialization, simplify testing against random data
bkietz Feb 25, 2021
3b82be6
simplify ValidateGroupBy further
bkietz Feb 26, 2021
0d71442
update compute.rst with group_by
bkietz Mar 1, 2021
9bf51e6
remove named output fields
bkietz Mar 1, 2021
8ec135e
reference ARROW-11840, add FunctionDoc
bkietz Mar 2, 2021
3e95d6e
add support for grouped min_max
bkietz Mar 2, 2021
4694a11
remove speculative empty case for now
bkietz Mar 2, 2021
6a105c3
first pass at review comments
bkietz Mar 11, 2021
9ece614
remove group_by from function registry
bkietz Mar 11, 2021
d4e3f11
add HashAggregateKernel
bkietz Mar 16, 2021
eb90bf6
refactor for simplicity
bkietz Mar 16, 2021
2ab608d
only consider bit width of key types
bkietz Mar 16, 2021
0d4b1d4
extract key encoding to KeyEncoder
bkietz Mar 16, 2021
2f939f5
separate group identification from aggregation
bkietz Mar 17, 2021
2651dfd
extract GroupIdentifier for reuse
bkietz Mar 17, 2021
f612ef8
reuse GroupIdentifier for partitioned writing
bkietz Mar 17, 2021
f1b1664
add DictionaryKeyEncoder
bkietz Mar 17, 2021
b44f13b
reintroduce R binding
bkietz Mar 17, 2021
23c2cf6
rename aggregand to argument
bkietz Mar 17, 2021
a57268c
lint: nullptr
bkietz Mar 17, 2021
ea4d387
fix benchmark
bkietz Mar 18, 2021
fff5985
crispy compilers
bkietz Mar 18, 2021
362aa4f
fix unaligned load
bkietz Mar 18, 2021
a39e941
bitshift width was unclear
bkietz Mar 18, 2021
b0e410f
msvc: explicit cast
bkietz Mar 18, 2021
9266067
GroupIdentifier->Grouper, use HashAggregateFunctions to store kernels
bkietz Mar 19, 2021
1c2973b
add unit tests for Grouper
bkietz Mar 21, 2021
a0114c9
remove dataset::{MakeGrouping,ApplyGroupings}
bkietz Mar 22, 2021
63b833d
provide Grouper::num_groups()
bkietz Mar 22, 2021
d0c01ac
validate ApplyGroupings
bkietz Mar 22, 2021
d0006e9
rewrite tests for readability, remove redundant cases
bkietz Mar 22, 2021
d437683
~max_id~ -> num_groups
bkietz Mar 22, 2021
a07fc66
rename FunctionDoc vars to enable unity build
bkietz Mar 22, 2021
201fa3b
Expose HashAggregateKernel in python, ensure hash_agg funcs can't Exe…
bkietz Mar 22, 2021
5fa524e
__dllexport ExecBatch
bkietz Mar 22, 2021
46b4069
BufferBuilder* bytes_builder()
bkietz Mar 22, 2021
5b79c32
review comments
bkietz Mar 22, 2021
2bff008
add more benchmarks
bkietz Mar 22, 2021
147fe1f
rename python tests, export HashAgg*, don't ref codegen_internal in t…
bkietz Mar 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ if(ARROW_COMPUTE)
compute/kernels/aggregate_tdigest.cc
compute/kernels/aggregate_var_std.cc
compute/kernels/codegen_internal.cc
compute/kernels/hash_aggregate.cc
compute/kernels/scalar_arithmetic.cc
compute/kernels/scalar_boolean.cc
compute/kernels/scalar_cast_boolean.cc
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/arrow/array/array_binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,13 @@ class BaseBinaryArray : public FlatArray {
}
}

IteratorType begin() { return IteratorType(*this); }
IteratorType begin() const { return IteratorType(*this); }

IteratorType end() { return IteratorType(*this, length()); }
IteratorType end() const { return IteratorType(*this, length()); }

protected:
// For subclasses
BaseBinaryArray() : raw_value_offsets_(NULLPTR), raw_data_(NULLPTR) {}
BaseBinaryArray() = default;

// Protected method for constructors
void SetData(const std::shared_ptr<ArrayData>& data) {
Expand All @@ -132,8 +132,8 @@ class BaseBinaryArray : public FlatArray {
raw_data_ = data->GetValuesSafe<uint8_t>(2, /*offset=*/0);
}

const offset_type* raw_value_offsets_;
const uint8_t* raw_data_;
const offset_type* raw_value_offsets_ = NULLPTR;
const uint8_t* raw_data_ = NULLPTR;
};

/// Concrete Array class for variable-size binary data
Expand Down Expand Up @@ -231,9 +231,9 @@ class ARROW_EXPORT FixedSizeBinaryArray : public PrimitiveArray {

const uint8_t* raw_values() const { return raw_values_ + data_->offset * byte_width_; }

IteratorType begin() { return IteratorType(*this); }
IteratorType begin() const { return IteratorType(*this); }

IteratorType end() { return IteratorType(*this, length()); }
IteratorType end() const { return IteratorType(*this, length()); }

protected:
void SetData(const std::shared_ptr<ArrayData>& data) {
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/array/array_primitive.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ class NumericArray : public PrimitiveArray {
// For API compatibility with BinaryArray etc.
value_type GetView(int64_t i) const { return Value(i); }

IteratorType begin() { return IteratorType(*this); }
IteratorType begin() const { return IteratorType(*this); }

IteratorType end() { return IteratorType(*this, length()); }
IteratorType end() const { return IteratorType(*this, length()); }

protected:
using PrimitiveArray::PrimitiveArray;
Expand Down Expand Up @@ -99,9 +99,9 @@ class ARROW_EXPORT BooleanArray : public PrimitiveArray {
/// values. Result is not cached.
int64_t true_count() const;

IteratorType begin() { return IteratorType(*this); }
IteratorType begin() const { return IteratorType(*this); }

IteratorType end() { return IteratorType(*this, length()); }
IteratorType end() const { return IteratorType(*this, length()); }

protected:
using PrimitiveArray::PrimitiveArray;
Expand Down
28 changes: 28 additions & 0 deletions cpp/src/arrow/buffer_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ class ARROW_EXPORT BufferBuilder {
return Status::OK();
}

Result<std::shared_ptr<Buffer>> Finish(bool shrink_to_fit = true) {
std::shared_ptr<Buffer> out;
ARROW_RETURN_NOT_OK(Finish(&out, shrink_to_fit));
return out;
}

void Reset() {
buffer_ = NULLPTR;
capacity_ = size_ = 0;
Expand Down Expand Up @@ -202,6 +208,11 @@ class TypedBufferBuilder<
MemoryPool* pool = default_memory_pool())
: bytes_builder_(std::move(buffer), pool) {}

explicit TypedBufferBuilder(BufferBuilder builder)
: bytes_builder_(std::move(builder)) {}

BufferBuilder* bytes_builder() { return &bytes_builder_; }

Status Append(T value) {
return bytes_builder_.Append(reinterpret_cast<uint8_t*>(&value), sizeof(T));
}
Expand Down Expand Up @@ -256,6 +267,12 @@ class TypedBufferBuilder<
return bytes_builder_.Finish(out, shrink_to_fit);
}

Result<std::shared_ptr<Buffer>> Finish(bool shrink_to_fit = true) {
std::shared_ptr<Buffer> out;
ARROW_RETURN_NOT_OK(Finish(&out, shrink_to_fit));
return out;
}

void Reset() { bytes_builder_.Reset(); }

int64_t length() const { return bytes_builder_.length() / sizeof(T); }
Expand All @@ -274,6 +291,11 @@ class TypedBufferBuilder<bool> {
explicit TypedBufferBuilder(MemoryPool* pool = default_memory_pool())
: bytes_builder_(pool) {}

explicit TypedBufferBuilder(BufferBuilder builder)
: bytes_builder_(std::move(builder)) {}

BufferBuilder* bytes_builder() { return &bytes_builder_; }

Status Append(bool value) {
ARROW_RETURN_NOT_OK(Reserve(1));
UnsafeAppend(value);
Expand Down Expand Up @@ -371,6 +393,12 @@ class TypedBufferBuilder<bool> {
return bytes_builder_.Finish(out, shrink_to_fit);
}

Result<std::shared_ptr<Buffer>> Finish(bool shrink_to_fit = true) {
std::shared_ptr<Buffer> out;
ARROW_RETURN_NOT_OK(Finish(&out, shrink_to_fit));
return out;
}

void Reset() {
bytes_builder_.Reset();
bit_length_ = false_count_ = 0;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compare.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class EqualOptions {
return res;
}

static EqualOptions Defaults() { return EqualOptions(); }
static EqualOptions Defaults() { return {}; }

protected:
double atol_ = kDefaultAbsoluteTolerance;
Expand Down
97 changes: 97 additions & 0 deletions cpp/src/arrow/compute/api_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,5 +306,102 @@ Result<Datum> TDigest(const Datum& value,
const TDigestOptions& options = TDigestOptions::Defaults(),
ExecContext* ctx = NULLPTR);

namespace internal {

/// Internal use only: streaming group identifier.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's internal, why is it exposed in api_aggregate.h? I would expect another header, e.g. compute/group_by_internal.h.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are made available for testing from R, which could not access an _internal header (since it wouldn't be installed)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, fair enough. But what's the point of calling those from R?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing purposes--we found a bunch of issues in earlier iterations by experimenting with this in R.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It allowed @nealrichardson to explore the hash aggregate kernels and expose a number of issues. We'll probably remove GroupBy altogether in ARROW-12010

/// Consumes batches of keys and yields batches of the group ids.
class ARROW_EXPORT Grouper {
public:
virtual ~Grouper() = default;

/// Construct a Grouper which receives the specified key types
static Result<std::unique_ptr<Grouper>> Make(const std::vector<ValueDescr>& descrs,
ExecContext* ctx = default_exec_context());

/// Consume a batch of keys, producing the corresponding group ids as an integer array.
/// Currently only uint32 indices will be produced, eventually the bit width will only
/// be as wide as necessary.
virtual Result<Datum> Consume(const ExecBatch& batch) = 0;

/// Get current unique keys. May be called multiple times.
virtual Result<ExecBatch> GetUniques() = 0;

/// Get the current number of groups.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"groups" or "keys"? Vocabulary should probably be consistent accross docstrings.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of key(column)s is fixed throughout the lifetime of the Grouper. The number of groups is incremented each time a unique row of keys is encountered.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I see. But the docstring above talks about "current unique keys"...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll include a definition of keys, unique keys, and groups in the doccomment for Grouper

virtual uint32_t num_groups() const = 0;

/// \brief Assemble lists of indices of identical elements.
///
/// \param[in] ids An unsigned, all-valid integral array which will be
/// used as grouping criteria.
/// \param[in] num_groups An upper bound for the elements of ids
/// \return A num_groups-long ListArray where the slot at i contains a
/// list of indices where i appears in ids.
///
/// MakeGroupings([
/// 2,
/// 2,
/// 5,
/// 5,
/// 2,
/// 3
/// ], 8) == [
/// [],
/// [],
/// [0, 1, 4],
/// [5],
/// [],
/// [2, 3],
/// [],
/// []
/// ]
static Result<std::shared_ptr<ListArray>> MakeGroupings(
const UInt32Array& ids, uint32_t num_groups,
ExecContext* ctx = default_exec_context());

/// \brief Produce a ListArray whose slots are selections of `array` which correspond to
/// the provided groupings.
///
/// For example,
/// ApplyGroupings([
/// [],
/// [],
/// [0, 1, 4],
/// [5],
/// [],
/// [2, 3],
/// [],
/// []
/// ], [2, 2, 5, 5, 2, 3]) == [
/// [],
/// [],
/// [2, 2, 2],
/// [3],
/// [],
/// [5, 5],
/// [],
/// []
/// ]
static Result<std::shared_ptr<ListArray>> ApplyGroupings(
const ListArray& groupings, const Array& array,
ExecContext* ctx = default_exec_context());
};

/// \brief Configure a grouped aggregation
struct ARROW_EXPORT Aggregate {
/// the name of the aggregation function
std::string function;

/// options for the aggregation function
const FunctionOptions* options;
};

/// Internal use only: helper function for testing HashAggregateKernels.
/// This will be replaced by streaming execution operators.
ARROW_EXPORT
Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Datum>& keys,
const std::vector<Aggregate>& aggregates,
ExecContext* ctx = default_exec_context());

} // namespace internal
} // namespace compute
} // namespace arrow
41 changes: 41 additions & 0 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "arrow/compute/registry.h"
#include "arrow/compute/util_internal.h"
#include "arrow/datum.h"
#include "arrow/record_batch.h"
#include "arrow/scalar.h"
#include "arrow/status.h"
#include "arrow/type.h"
Expand All @@ -57,6 +58,44 @@ using internal::CpuInfo;

namespace compute {

ExecContext* default_exec_context() {
static ExecContext default_ctx;
return &default_ctx;
}

ExecBatch::ExecBatch(const RecordBatch& batch)
: values(batch.num_columns()), length(batch.num_rows()) {
auto columns = batch.column_data();
std::move(columns.begin(), columns.end(), values.begin());
}

Result<ExecBatch> ExecBatch::Make(std::vector<Datum> values) {
if (values.empty()) {
return Status::Invalid("Cannot infer ExecBatch length without at least one value");
}

int64_t length = -1;
for (const auto& value : values) {
if (value.is_scalar()) {
if (length == -1) {
length = 1;
}
continue;
}

if (length == -1) {
length = value.length();
continue;
}

if (length != value.length()) {
return Status::Invalid(
"Arrays used to construct an ExecBatch must have equal length");
}
}

return ExecBatch(std::move(values), length);
}
namespace {

Result<std::shared_ptr<Buffer>> AllocateDataBuffer(KernelContext* ctx, int64_t length,
Expand Down Expand Up @@ -838,6 +877,7 @@ class ScalarAggExecutor : public KernelExecutorImpl<ScalarAggregateKernel> {

private:
Status Consume(const ExecBatch& batch) {
// FIXME(ARROW-11840) don't merge *any* aggegates for every batch
auto batch_state = kernel_->init(kernel_ctx_, {kernel_, *input_descrs_, options_});
ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);

Expand All @@ -855,6 +895,7 @@ class ScalarAggExecutor : public KernelExecutorImpl<ScalarAggregateKernel> {

kernel_->merge(kernel_ctx_, std::move(*batch_state), state());
ARROW_CTX_RETURN_IF_ERROR(kernel_ctx_);

return Status::OK();
}

Expand Down
8 changes: 7 additions & 1 deletion cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class ARROW_EXPORT ExecContext {
bool use_threads_ = true;
};

ARROW_EXPORT ExecContext* default_exec_context();

// TODO: Consider standardizing on uint16 selection vectors and only use them
// when we can ensure that each value is 64K length or smaller

Expand Down Expand Up @@ -164,11 +166,15 @@ class ARROW_EXPORT SelectionVector {
/// TODO: Datum uses arrow/util/variant.h which may be a bit heavier-weight
/// than is desirable for this class. Microbenchmarks would help determine for
/// sure. See ARROW-8928.
struct ExecBatch {
struct ARROW_EXPORT ExecBatch {
ExecBatch() = default;
ExecBatch(std::vector<Datum> values, int64_t length)
: values(std::move(values)), length(length) {}

explicit ExecBatch(const RecordBatch& batch);

static Result<ExecBatch> Make(std::vector<Datum> values);

/// The values representing positional arguments to be passed to a kernel's
/// exec function for processing.
std::vector<Datum> values;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/compute/exec_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ class ARROW_EXPORT KernelExecutor {
public:
virtual ~KernelExecutor() = default;

/// The Kernel's `init` method must be called and any KernelState set in the
/// KernelContext *before* KernelExecutor::Init is called. This is to facilitate
/// the case where init may be expensive and does not need to be called again for
/// each execution of the kernel, for example the same lookup table can be re-used
/// for all scanned batches in a dataset filter.
virtual Status Init(KernelContext*, KernelInitArgs) = 0;

/// XXX: Better configurability for listener
Expand Down
18 changes: 17 additions & 1 deletion cpp/src/arrow/compute/function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ const Kernel* DispatchExactImpl(const Function* func,
checked_cast<const ScalarAggregateFunction*>(func)->kernels(), values);
}

if (func->kind() == Function::HASH_AGGREGATE) {
return DispatchExactImpl(checked_cast<const HashAggregateFunction*>(func)->kernels(),
values);
}

return nullptr;
}

Expand Down Expand Up @@ -184,8 +189,10 @@ Result<Datum> Function::Execute(const std::vector<Datum>& args,
executor = detail::KernelExecutor::MakeScalar();
} else if (kind() == Function::VECTOR) {
executor = detail::KernelExecutor::MakeVector();
} else {
} else if (kind() == Function::SCALAR_AGGREGATE) {
executor = detail::KernelExecutor::MakeScalarAggregate();
} else {
return Status::NotImplemented("Direct execution of HASH_AGGREGATE functions");
}
RETURN_NOT_OK(executor->Init(&kernel_ctx, {kernel, inputs, options}));

Expand Down Expand Up @@ -263,6 +270,15 @@ Status ScalarAggregateFunction::AddKernel(ScalarAggregateKernel kernel) {
return Status::OK();
}

Status HashAggregateFunction::AddKernel(HashAggregateKernel kernel) {
RETURN_NOT_OK(CheckArity(kernel.signature->in_types()));
if (arity_.is_varargs && !kernel.signature->is_varargs()) {
return Status::Invalid("Function accepts varargs but kernel signature does not");
}
kernels_.emplace_back(std::move(kernel));
return Status::OK();
}

Result<Datum> MetaFunction::Execute(const std::vector<Datum>& args,
const FunctionOptions* options,
ExecContext* ctx) const {
Expand Down
Loading