Skip to content

Conversation

@bkietz
Copy link
Member

@bkietz bkietz commented Mar 2, 2021

This patch adds basic building blocks for grouped aggregation:

  • Grouper for producing integer arrays encoding group id from batches of keys
  • HashAggregateKernel for consuming batches of arguments and group ids, updating internal sums/counts/...

For testing purposes, a one-shot grouped aggregation function is provided:

std::shared_ptr<arrow::Array> needs_sum = ...;
std::shared_ptr<arrow::Array> needs_min_max = ...;
std::shared_ptr<arrow::Array> key_0 = ...;
std::shared_ptr<arrow::Array> key_1 = ...;

ARROW_ASSIGN_OR_RAISE(arrow::Datum out,
  arrow::compute::internal::GroupBy({
    needs_sum,
    needs_min_max,
  }, {
    key_0,
    key_1,
  }, {
    {"sum", nullptr},  // first argument will be summed
    {"min_max", &min_max_options},  // second argument's extrema will be found
}));

// Unpack struct array result (a four-field array)
auto out_array = out.array_as<StructArray>();
std::shared_ptr<arrow::Array> sums = out_array->field(0);
std::shared_ptr<arrow::Array> mins_and_maxes = out_array->field(1);
std::shared_ptr<arrow::Array> group_key_0 = out_array->field(2);
std::shared_ptr<arrow::Array> group_key_1 = out_array->field(3);

@bkietz bkietz requested review from pitrou and wesm March 2, 2021 21:42
Copy link
Member

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before digging into the details too much, my main issue with what I see is that I don't agree with making hash aggregation a callable function through CallFunction.

In the context of a query engine, the interface for this operator looks something like:

class ExecNode {
 public:
  virtual void Push(int node_index, const ExecBatch& batch) = 0;
  virtual void PushDone(int node_index) = 0;
};

class HashAggregationNode : public ExecNode {
  ...
}; 

(some query engines use a "pull"-based model, in which the data flow is inverted — there are pros and cons to both approaches, see https://www.vldb.org/pvldb/vol11/p2209-kersten.pdf)

If you want a simple one-shot version of the algorithm (rather than a general "streaming" one like the above), then you can break the input data into record batches of the desired size (e.g. 4K - 64K rows, depending on heuristics) and then push the chunks into the node that you create (note that the HashAggregationNode should push its result into a terminal "OutputNode" when you invoke hash_agg_node->PushDone(0)).

The API can be completely crude / preliminary, but would it be possible to use the query-engine-type approach for this? I think it would be best to start taking some strides in this direction rather than bolting this onto the array function execution machinery which doesn't make sense in a query processing context (because aggregation is fundamentally a streaming algorithm)

On the hash aggregation functions themselves, perhaps it makes sense to add a HASH_AGGREGATE function type and define the kernel interface for these functions, then look up these functions using the general dispatch machinery?

@github-actions
Copy link

github-actions bot commented Mar 2, 2021

@wesm
Copy link
Member

wesm commented Mar 3, 2021

Regarding the HASH_AGGREGATE function type, one of the inputs on each invocation should be the current hash table cardinality so you do not need to inspect the group ids to infer the cardinality.

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very interesting. Here is an assorted round of comments :-)

groupings.value_offsets(), sorted.make_array());
}

struct ScalarVectorToArray {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it would be nice to have AppendScalar methods on common builders?
(perhaps even as a virtual function on the base builder class)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::vector<uint8_t*> key_buf_ptrs_;
std::vector<uint32_t> group_ids_batch_;

std::unordered_map<std::string, uint32_t> map_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments for the non-trivial members here?

}
std::vector<int32_t> offsets_batch_;
std::vector<uint8_t> key_bytes_batch_;
std::vector<uint8_t*> key_buf_ptrs_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an optimization, we may also want a std::vector<int64_t> key_null_counts_... though this may not be beneficial.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this would be supported by using the "count" aggregation

@nealrichardson
Copy link
Member

With an assist from @bkietz, I've written a very basic R wrapper that exercises this in aa530cb. It's enough to expose some issues to address, to say nothing of the interface questions.

library(arrow)
library(dplyr)

# The commit uses this option to switch to use the group_by compute function
options(arrow.summarize = TRUE)
# If the Arrow aggregation function isn't implemented, or if the Arrow call errors,
# it falls back to pulling the data in R and evaluating in R.

# mtcars is a standard dataset that ships with R
mt <- Table$create(mtcars)
mt %>%
  group_by(cyl) %>%
  summarize(total_hp = sum(hp))
# Warning: Error : NotImplemented: Key of typedouble
# ../src/arrow/compute/function.cc:178  kernel_ctx.status()
# ; pulling data into R
# # A tibble: 3 x 2
#     cyl total_hp
# * <dbl>    <dbl>
# 1     4      909
# 2     6      856
# 3     8     2929

# That's unfortunate. R blurs the distinction for users between integer and double,
# so it's not uncommon to have integer data stored as a float.
# (Also, the error message is missing some whitespace.)

# We can cast that to an integer and try again

mt$cyl <- mt$cyl$cast(int32())
unique(mt$cyl)
# Array
# <int32>
# [
#   6,
#   4,
#   8
# ]

mt %>%
  group_by(cyl) %>%
  summarize(total_hp = sum(hp))
# StructArray
# <struct<: double, : int32>>
# -- is_valid: all not null
# -- child 0 type: double
#   [
#     856,
#     909,
#     2929
#   ]
# -- child 1 type: int64
#   [
#     17179869190,
#     8,
#     0
#   ]

# Alright, it computed and got the same numbers, but the StructArray
# is not valid. Type says int32 but data says int64 and we have misplaced bits

# Let's try a different stock dataset
ir <- Table$create(iris)
ir %>%
  group_by(Species) %>%
  summarize(total_length = sum(Sepal.Length))
# Warning: Error : NotImplemented: Key of typedictionary<values=string, indices=int8, ordered=0>
# ../src/arrow/compute/function.cc:178  kernel_ctx.status()
# ; pulling data into R
# # A tibble: 3 x 2
#   Species    total_length
# * <fct>             <dbl>
# 1 setosa             250.
# 2 versicolor         297.
# 3 virginica          329.

# Hmm. dictionary types really need to be supported.
# Let's work around and cast it to string

ir$Species <- ir$Species$cast(utf8())
unique(ir$Species)
# Array
# <string>
# [
#   "setosa",
#   "versicolor",
#   "virginica"
# ]
ir %>%
  group_by(Species) %>%
  summarize(total_length = sum(Sepal.Length))
# Warning: Error : Invalid: Negative buffer resize: -219443965
# ../src/arrow/buffer.cc:262  buffer->Resize(size)
# ../src/arrow/compute/kernels/aggregate_basic.cc:1005  (_error_or_value9).status()
# ../src/arrow/compute/function.cc:193  executor->Execute(implicitly_cast_args, listener.get())
# ; pulling data into R
# # A tibble: 3 x 2
#   Species    total_length
# * <chr>             <dbl>
# 1 setosa             250.
# 2 versicolor         297.
# 3 virginica          329.

@michalursa
Copy link
Contributor

Before digging into the details too much, my main issue with what I see is that I don't agree with making hash aggregation a callable function through CallFunction.

In the context of a query engine, the interface for this operator looks something like:

class ExecNode {
 public:
  virtual void Push(int node_index, const ExecBatch& batch) = 0;
  virtual void PushDone(int node_index) = 0;
};

class HashAggregationNode : public ExecNode {
  ...
}; 

(some query engines use a "pull"-based model, in which the data flow is inverted — there are pros and cons to both approaches, see https://www.vldb.org/pvldb/vol11/p2209-kersten.pdf)

If you want a simple one-shot version of the algorithm (rather than a general "streaming" one like the above), then you can break the input data into record batches of the desired size (e.g. 4K - 64K rows, depending on heuristics) and then push the chunks into the node that you create (note that the HashAggregationNode should push its result into a terminal "OutputNode" when you invoke hash_agg_node->PushDone(0)).

The API can be completely crude / preliminary, but would it be possible to use the query-engine-type approach for this? I think it would be best to start taking some strides in this direction rather than bolting this onto the array function execution machinery which doesn't make sense in a query processing context (because aggregation is fundamentally a streaming algorithm)

On the hash aggregation functions themselves, perhaps it makes sense to add a HASH_AGGREGATE function type and define the kernel interface for these functions, then look up these functions using the general dispatch machinery?

I like these points. Let me break into subtopics what I think we are facing now and what we should think about for the future. I read the comment above as advocating bringing support for two concepts: a) relational operators (that are building blocks of a query execution pipeline or a more general query execution plan) and b) pipelines / streaming processing.

1. How we got here

Part of the problem is that we don't have a problem just yet. We didn't bite a big enough chunk of work to force us to do the appropriate refactoring. And the problem I am referring to is that of pipelining, which we do not have support for yet. From what I understand, currently we cannot bind together multiple operations into a single processing pipeline executed using a single Arrow function call, computing expressions on the fly without persisting their results for an entire set of rows.

Related to the pipelining is the fact that in this PR we do not stream the group by output. That way, at some level of abstraction (squinting eyes) we can treat it as a scalar aggregate (we output a single item which happens to be an entire collection of arrays; variants make it possible).

But to me the bigger problem here is that we are mixing together two separate worlds: scalar operators and relational operators and that muddies the general picture. That mixing happens as a consequence of treating the group by as a modified scalar aggregate in the code.

** 2. Scalar operators vs relational operators**

One way to think about it is that scalar expressions (compositions of scalar operators) are like C++ lambdas (e.g. comparison of two values) provided in a call to C++ STL algorithms / containers (e.g. std::sort) while relational operators correspond to these algorithms / containers (except that they additionally have support for composability - creating execution pipelines / trees / DAGs).

The point of confusion may come from the fact that once you vectorize scalar expressions (which you probably want to do, for performance reasons), their interfaces start looking very similar (if not the same) as would some special relational operators (namely: filter, project and scalar aggregate). I claim that current kernel executor classes are relational operators in disguise - project (aka compute scalar) and reduce (aka scalar aggregate).
Relational operators inside current group by
Interestingly, group by present in this PR, can itself be treated as a DAG of multiple relational operators with some kind of pipelines connecting them. The input batch is first processed by the code that assigns integer group id to every row based on key columns. The output of that is then processed by zero, one or more group by aggregate operators that update aggregate related accumulators in their internal arrays. At the end of the input, the output array related to group id mapping component is concatenated to output arrays for individual group by aggregate operators to produce output collection of arrays. We can treat group id mapping and group by aggregates as separate relational operators or we can choose to treat them as internals of a single hash group by operator. Even hash computation for hash table lookup (part of group id mapping) can be treated as a separate processing step that is using a projection operator.

When we talk about relational operators and their connections we can talk at different levels of granularity. Sometimes it’s simpler to treat building blocks made of multiple operators with fixed, hard-coded connections between them as a single operator, sometimes it brings more opportunities for code reusability to treat smaller blocks separately.

** 3. Push and pull**

My personal feeling is that the pull model was good in the early query execution engines, based on processing of a single row at a time and using virtual function calls to switch between relational operators within the query. In my experience, the push model is easier to work with in both modern worlds of query execution: JIT compiled query processing and vectorized query processing.

It may seem abstract right now, which model is better push or pull, but I believe that once you have to actually solve an existing problem it becomes more obvious which one you prefer.

From my experience, in a vectorized execution model, it is easy to adapt existing push-based implementation to support pull-based interface. I am guessing that the same would be true the other way around.

Also, intuitively, when we think about consuming input, we think: pull (e.g. scanning in-memory data or a file), and when we think about producing output, we think: push (e.g. sending results of the computation over the network).

I would probably recommend: at a lower level - use whatever model results in a more readable, simpler, more natural code - and at a higher level - adapt all interfaces to push model.

** 4. When streaming output is not desired**

It may be a bit forward looking, but in some cases it is beneficial to give one relational operator direct access to internal row storage of another relational operator instead of always assuming that the operators stream their output. Streaming output should always be supported, but in addition to that, the direct access option may be useful.

It’s not unusual to request the output of group by to be sorted on group by key columns. In that case the sort operator, which needs to accumulate all of the input in the buffer before sorting, could work directly on the group by buffers without converting internal row storage of group by to batches and then batches back to internal row storage of sort.

Similarly, window functions, quantiles, may require random access to the entire set of input rows (within a group / partition of data). In this case, again, they may want to work on an internal storage of rows of sort rather than streaming it out and accumulating the output stream in internal buffers.

** 5. Summary**

I think that what we are talking about here has a goal that overlaps with group by but is wider in scope and somewhat separate and that is: a) refactoring the code to bring the concepts of relational and scalar operators, b) laying ground for the support of pipelining.

@bkietz bkietz force-pushed the groupby1 branch 2 times, most recently from b09fba2 to 38d3eea Compare March 17, 2021 21:43
@bkietz bkietz marked this pull request as ready for review March 17, 2021 21:44
@bkietz
Copy link
Member Author

bkietz commented Mar 17, 2021

@pitrou PTAL

Copy link
Member

@nealrichardson nealrichardson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the issues I previously identified in #9621 (comment) have been resolved

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice. Here are a bunch of comments.


namespace internal {

/// Internal use only: streaming group identifier.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's internal, why is it exposed in api_aggregate.h? I would expect another header, e.g. compute/group_by_internal.h.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are made available for testing from R, which could not access an _internal header (since it wouldn't be installed)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, fair enough. But what's the point of calling those from R?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing purposes--we found a bunch of issues in earlier iterations by experimenting with this in R.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It allowed @nealrichardson to explore the hash aggregate kernels and expose a number of issues. We'll probably remove GroupBy altogether in ARROW-12010

/// Get current unique keys. May be called multiple times.
virtual Result<ExecBatch> GetUniques() = 0;

/// Get the current number of groups.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"groups" or "keys"? Vocabulary should probably be consistent accross docstrings.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of key(column)s is fixed throughout the lifetime of the Grouper. The number of groups is incremented each time a unique row of keys is encountered.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I see. But the docstring above talks about "current unique keys"...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll include a definition of keys, unique keys, and groups in the doccomment for Grouper

@bkietz
Copy link
Member Author

bkietz commented Mar 23, 2021

+1, merging

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants