Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement top/bottom-k aggregations in sort-based groupby #15272

Draft
wants to merge 7 commits into
base: branch-24.06
Choose a base branch
from

Conversation

wence-
Copy link
Contributor

@wence- wence- commented Mar 11, 2024

Description

Provide an implementation of top_k/bottom_k in groupby-aggregations. Since there isn't currently an implementation of a segmented quickselect in thrust/cub, we do this by sorting the group segments and then extracting the pieces.

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

@github-actions github-actions bot added libcudf Affects libcudf (C++/CUDA) code. Python Affects Python cuDF API. CMake CMake build issue labels Mar 11, 2024
@wence- wence- added improvement Improvement / enhancement to an existing function non-breaking Non-breaking change labels Mar 11, 2024
table_view{{values}},
group_offsets,
{order},
{null_order::BEFORE},
Copy link
Contributor Author

@wence- wence- Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should expose this choice in the aggregation construction.

Comment on lines +88 to +96
auto ordered_values = cudf::detail::segmented_sort_by_key(table_view{{values}},
table_view{{values}},
group_offsets,
{order},
{null_order::BEFORE},
stream,
mr);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relevant CCCL feature requests here NVIDIA/cccl#931 and NVIDIA/cccl#685

size_type num_groups,
size_type k)
{
// warp per group writes output indices
Copy link
Contributor Author

@wence- wence- Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assumption: k is typically small.

Comment on lines +43 to +48
__global__ void compute_topk_indices(cudf::device_span<size_type const> group_offsets,
cudf::device_span<size_type const> index_offsets,
cudf::device_span<size_type> indices,
size_type num_groups,
size_type k)
{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible/plausible that this could be done instead with a morally similar approach to that of the list-based segmented_gather and instead of constructing indices, one could construct a boolean mask... Let me try that..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a go at thrust-based approaches, in a standalone test case, either generating a boolean mask or else the indices via a hand-rolled kernel, it seems that the hand-rolled one comes out ahead (sometimes quite a lot, generally with low numbers of groups). I need to do a bit more profiling with different distributions of groups and plumb things through a bit more though.

code
#include <cuda/functional>
#include <functional>
#include <iostream>
#include <nvtx3/nvtx3.hpp>
#include <thrust/binary_search.h>
#include <thrust/device_vector.h>
#include <thrust/host_vector.h>
#include <thrust/random.h>
#include <thrust/scan.h>
#include <thrust/transform_scan.h>

using size_type = int;
using thread_index_type = long;

using vec = thrust::device_vector<size_type>;
size_type constexpr warp_size{32};
__global__ void kernel(size_type const *group_offsets,
                       size_type const *index_offsets, size_type *indices,
                       size_type num_groups, size_type k) {
  for (thread_index_type idx = threadIdx.x + blockDim.x * blockIdx.x;
       idx < warp_size * num_groups; idx += gridDim.x * blockDim.x) {
    auto const group{idx / warp_size};
    auto const lane{idx % warp_size};
    if (group >= num_groups)
      break;
    auto const off1 = group_offsets[group + 1];
    auto const off0 = group_offsets[group];
    auto const k_loc = min(off1 - off0, k);
    auto const out_offset = index_offsets[group];
    for (size_type off = lane; off < k_loc; off += warp_size) {
      indices[out_offset + off] = off0 + off;
    }
  }
}
template <typename Vector> void print(const std::string &s, const Vector &v) {
  typedef typename Vector::value_type T;

  std::cout << s;
  thrust::copy(v.begin(), v.end(), std::ostream_iterator<T>(std::cout, " "));
  std::cout << std::endl;
}

void hand_kernel(vec group_offsets, vec group_sizes, vec &indices,
                 size_type num_groups, size_type k) {
  nvtx3::scoped_range r{"hand_kernel"};
  thrust::device_vector<size_type> index_offsets(group_offsets.size(), 0);

  thrust::transform_inclusive_scan(
      group_sizes.begin(), group_sizes.end(), index_offsets.begin() + 1,
      cuda::proclaim_return_type<size_type>(
          [k] __device__(size_type s) -> size_type { return min(k, s); }),
      thrust::plus<size_type>());

  auto num_indices = index_offsets[group_sizes.size()];
  indices = thrust::device_vector<size_type>(num_indices, -1);
  std::cout << num_groups << " " << group_offsets[group_sizes.size()] << " "
            << num_indices << std::endl;

  auto const threads_per_block{128};
  auto const num_blocks{((warp_size * num_groups) / threads_per_block) +
                        (((warp_size * num_groups) % threads_per_block) != 0)};
  kernel<<<num_blocks, threads_per_block, 0>>>(
      group_offsets.data().get(), index_offsets.data().get(),
      indices.data().get(), num_groups, k);
}

void thrust_seq_upper_bound(vec group_offsets, thrust::device_vector<bool> mask,
                            size_type k) {
  nvtx3::scoped_range r{"thrust_seq_upper_bound"};
  auto goff_begin = group_offsets.begin();
  auto goff_end = group_offsets.end();
  auto index_transformer = cuda::proclaim_return_type<bool>(
      [goff_begin, goff_end, k] __device__(size_type index) -> bool {
        auto gstart =
            thrust::upper_bound(thrust::seq, goff_begin, goff_end, index);
        auto group_index = index - *(gstart - 1);
        return group_index < k;
      });
  auto count = thrust::make_counting_iterator(size_type{0});
  thrust::transform(count, count + mask.size(), mask.begin(),
                    index_transformer);
}

void thrust_par_scatter(vec group_offsets, thrust::device_vector<bool> mask,
                        size_type num_groups, size_type k) {
  nvtx3::scoped_range r{"thrust_par_scatter"};
  thrust::device_vector<size_type> upper_bound(mask.size(), 0);
  auto count = thrust::make_counting_iterator(size_type{0});
  // Precondition: no empty groups
  thrust::scatter(count, count + num_groups, group_offsets.begin(),
                  upper_bound.begin());
  thrust::inclusive_scan(upper_bound.begin(), upper_bound.end(),
                         upper_bound.begin(), thrust::maximum<size_type>());
  auto goff_begin = group_offsets.begin();
  auto fn = cuda::proclaim_return_type<bool>(
      [goff_begin, k] __device__(auto pair) -> bool {
        auto index = thrust::get<0>(pair);
        auto off = thrust::get<1>(pair);
        auto group_index = index - *(goff_begin + off);
        return group_index < k;
      });
  auto zipit = thrust::make_zip_iterator(count, upper_bound.begin());
  thrust::transform(zipit, zipit + mask.size(), mask.begin(), fn);
}

int main(int argc, char **argv) {
  size_type num_groups{32 << 10};
  size_type k{5};
  size_type max_group_size{4000};
  if (argc >= 2) {
    num_groups = std::atoi(argv[1]);
  }
  if (argc >= 3) {
    k = std::atoi(argv[2]);
  }
  if (argc == 4) {
    max_group_size = std::atoi(argv[3]);
  }
  std::cout << "num_groups: " << num_groups << std::endl;
  std::cout << "k: " << k << std::endl;
  std::cout << "max_group_size: " << max_group_size << std::endl;

  // Prepare data
  thrust::default_random_engine rng(1337);
  thrust::uniform_int_distribution<int> dist(1, max_group_size);
  thrust::host_vector<int> h_group_sizes(num_groups);
  thrust::generate(h_group_sizes.begin(), h_group_sizes.end(),
                   cuda::proclaim_return_type<size_type>(
                       [&] __host__() { return dist(rng); }));
  thrust::device_vector<size_type> group_sizes = h_group_sizes;
  thrust::device_vector<size_type> group_offsets(group_sizes.size() + 1, 0);
  thrust::inclusive_scan(group_sizes.begin(), group_sizes.end(),
                         group_offsets.begin() + 1, thrust::plus<size_type>());
  auto num_rows = group_offsets[group_sizes.size()];

  thrust::device_vector<bool> scatter_mask(num_rows, false);
  thrust_par_scatter(group_offsets, scatter_mask, num_groups, k);

  thrust::device_vector<bool> seq_mask(num_rows, false);
  thrust_seq_upper_bound(group_offsets, seq_mask, k);

  thrust::device_vector<size_type> indices;
  hand_kernel(group_offsets, group_sizes, indices, num_groups, k);
  return 0;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be sure to compare this with Thrust's nosync execution policy, and use streams -- that can reduce overhead. I will try to look at this more closely later, to make sure it's using an optimal algorithmic approach -- or at least something like what I had in mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated code, with use of rmm::exec_policy_nosync and a memory pool (to avoid allocation costs). Hand kernel is still superior. It is data dependent, but in the typical case of $k \ll \text{group\_size}$ it is a lot faster. I also changed to use an (approximately) log-normal distribution of group sizes, rather than a uniform distribution.

code
#include <cmath>
#include <cuda/functional>
#include <functional>
#include <iostream>
#include <memory>
#include <nvtx3/nvtx3.hpp>
#include <optional>
#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/device_vector.hpp>
#include <rmm/exec_policy.hpp>
#include <rmm/mr/device/cuda_memory_resource.hpp>
#include <rmm/mr/device/per_device_resource.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>
#include <thrust/binary_search.h>
#include <thrust/host_vector.h>
#include <thrust/random.h>
#include <thrust/scan.h>
#include <thrust/transform_scan.h>

using size_type = int;
using thread_index_type = long;

using vec = rmm::device_vector<size_type>;
size_type constexpr warp_size{32};
__global__ void kernel(size_type const *group_offsets,
                       size_type const *index_offsets, size_type *indices,
                       size_type num_groups, size_type k) {
  for (thread_index_type idx = threadIdx.x + blockDim.x * blockIdx.x;
       idx < warp_size * num_groups; idx += gridDim.x * blockDim.x) {
    auto const group{idx / warp_size};
    auto const lane{idx % warp_size};
    if (group >= num_groups)
      break;
    auto const off1 = group_offsets[group + 1];
    auto const off0 = group_offsets[group];
    auto const k_loc = min(off1 - off0, k);
    auto const out_offset = index_offsets[group];
    for (size_type off = lane; off < k_loc; off += warp_size) {
      indices[out_offset + off] = off0 + off;
    }
  }
}
template <typename Vector> void print(const std::string &s, const Vector &v) {
  typedef typename Vector::value_type T;

  std::cout << s;
  thrust::copy(v.begin(), v.end(), std::ostream_iterator<T>(std::cout, " "));
  std::cout << std::endl;
}

std::unique_ptr<rmm::device_uvector<size_type>>
hand_kernel(rmm::device_vector<size_type> group_offsets,
            rmm::device_vector<size_type> group_sizes, size_type num_groups,
            size_type k, rmm::cuda_stream_view stream) {
  nvtx3::scoped_range r{"hand_kernel"};
  rmm::device_uvector<size_type> index_offsets{group_offsets.size(), stream};
  thrust::transform_inclusive_scan(
      rmm::exec_policy_nosync(stream), group_sizes.begin(), group_sizes.end(),
      index_offsets.begin() + 1,
      cuda::proclaim_return_type<size_type>(
          [k] __device__(size_type s) -> size_type { return min(k, s); }),
      thrust::plus<size_type>());
  auto num_indices = index_offsets.back_element(stream);
  auto indices =
      std::make_unique<rmm::device_uvector<size_type>>(num_indices, stream);
  auto const threads_per_block{128};
  auto const num_blocks{((warp_size * num_groups) / threads_per_block) +
                        (((warp_size * num_groups) % threads_per_block) != 0)};
  kernel<<<num_blocks, threads_per_block, 0, stream.value()>>>(
      group_offsets.data().get(), index_offsets.data(), indices->data(),
      num_groups, k);
  return std::move(indices);
}

std::unique_ptr<rmm::device_uvector<bool>>
thrust_seq_upper_bound(rmm::device_vector<size_type> group_offsets,
                       size_type num_rows, size_type num_groups, size_type k,
                       rmm::cuda_stream_view stream) {
  nvtx3::scoped_range r{"thrust_seq_upper_bound"};
  auto goff_begin = group_offsets.begin();
  auto goff_end = group_offsets.end();
  auto index_transformer = cuda::proclaim_return_type<bool>(
      [goff_begin, goff_end, k] __device__(size_type index) -> bool {
        auto gstart =
            thrust::upper_bound(thrust::seq, goff_begin, goff_end, index);
        auto group_index = index - *(gstart - 1);
        return group_index < k;
      });
  auto count = thrust::make_counting_iterator(size_type{0});
  auto mask = std::make_unique<rmm::device_uvector<bool>>(num_rows, stream);
  thrust::transform(rmm::exec_policy_nosync(stream), count, count + num_rows,
                    mask->begin(), index_transformer);
  return mask;
}

std::unique_ptr<rmm::device_uvector<bool>>
thrust_par_scatter(rmm::device_vector<size_type> group_offsets,
                   size_type num_rows, size_type num_groups, size_type k,
                   rmm::cuda_stream_view stream) {
  nvtx3::scoped_range r{"thrust_par_scatter"};
  rmm::device_uvector<size_type> upper_bound(num_rows, stream);
  thrust::uninitialized_fill_n(rmm::exec_policy_nosync(stream),
                               upper_bound.begin(), num_rows, 0);
  auto count = thrust::make_counting_iterator(size_type{0});
  // Precondition: no empty groups
  thrust::scatter(rmm::exec_policy_nosync(stream), count, count + num_groups,
                  group_offsets.begin(), upper_bound.begin());
  thrust::inclusive_scan(rmm::exec_policy_nosync(stream), upper_bound.begin(),
                         upper_bound.end(), upper_bound.begin(),
                         thrust::maximum<size_type>());
  auto goff_begin = group_offsets.begin();
  auto fn = cuda::proclaim_return_type<bool>(
      [goff_begin, k] __device__(auto pair) -> bool {
        auto index = thrust::get<0>(pair);
        auto off = thrust::get<1>(pair);
        auto group_index = index - *(goff_begin + off);
        return group_index < k;
      });
  auto zipit = thrust::make_zip_iterator(count, upper_bound.begin());
  auto mask = std::make_unique<rmm::device_uvector<bool>>(num_rows, stream);
  thrust::transform(rmm::exec_policy_nosync(stream), zipit, zipit + num_rows,
                    mask->begin(), fn);
  return mask;
}

int main(int argc, char **argv) {
  size_type num_groups{32 << 10};
  size_type k{5};
  size_type median_group_size{2000};
  if (argc >= 2) {
    num_groups = std::atoi(argv[1]);
  }
  if (argc >= 3) {
    k = std::atoi(argv[2]);
  }
  if (argc == 4) {
    median_group_size = std::atoi(argv[3]);
  }
  std::cout << "num_groups: " << num_groups << std::endl;
  std::cout << "k: " << k << std::endl;
  std::cout << "median_group_size: " << median_group_size << std::endl;

  auto stream = rmm::cuda_stream_view{};
  auto policy = rmm::exec_policy{stream};
  auto upstream = rmm::mr::cuda_memory_resource{};
  auto constexpr initial_size = 16UL << 30UL;
  auto mr = rmm::mr::pool_memory_resource(&upstream, initial_size);
  rmm::mr::set_current_device_resource(&mr);
  // Prepare data
  thrust::default_random_engine rng(1337);
  // We'll make a log-normal distribution of group sizes
  // with parameters mu and sigma and standard deviation sigma
  // aiming for median group size as specified.
  //
  thrust::normal_distribution<double> dist(0, 1);
  thrust::host_vector<int> h_group_sizes(num_groups);
  thrust::generate(h_group_sizes.begin(), h_group_sizes.end(),
                   cuda::proclaim_return_type<size_type>([&] __host__() {
                     auto Z = dist(rng);
                     auto mu = std::log(static_cast<double>(median_group_size));
                     auto sigma = 2;
                     return std::max(1, static_cast<size_type>(std::exp(mu + sigma * Z)));
                   }));
  auto minsize = thrust::reduce(h_group_sizes.begin(), h_group_sizes.end(),
                                std::numeric_limits<size_type>::max(),
                                thrust::minimum<size_type>());
  auto maxsize = thrust::reduce(h_group_sizes.begin(), h_group_sizes.end(),
                                std::numeric_limits<size_type>::min(),
                                thrust::maximum<size_type>());
  std::cout << "Min size: " << minsize << std::endl;
  std::cout << "Max size: " << maxsize << std::endl;
  rmm::device_vector<size_type> group_sizes(num_groups, 0);
  group_sizes = h_group_sizes;
  rmm::device_vector<size_type> group_offsets(group_sizes.size() + 1, 0);
  thrust::inclusive_scan(group_sizes.begin(), group_sizes.end(),
                         group_offsets.begin() + 1, thrust::plus<size_type>());
  auto num_rows = group_offsets[group_sizes.size()];

  std::cout << "Num input rows: " << num_rows << std::endl;
  stream.synchronize();
  auto scatter_mask =
      thrust_par_scatter(group_offsets, num_rows, num_groups, k, stream);
  auto seq_mask =
      thrust_seq_upper_bound(group_offsets, num_rows, num_groups, k, stream);
  auto indices = hand_kernel(group_offsets, group_sizes, num_groups, k, stream);
  std::cout << "Num output rows: " << indices->size() << std::endl;
  return 0;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is really a special case of constructing a gather map for a segmented_slice. Since, for now, we don't have a segmented_nth_element (or, for the full frame, nth_element) on device. The segmented_slice would more generally be useful, but has a (probably) broader range of performance characteristics that may mean a different implementation is superior.

However, probably we want to implement (in the copying API to match cudf::slice):

(for columns and tables)

segmented_slice(table_view const &values, column_view const &offsets,
                size_type start, 
                size_type end, 
                size_type stride, # maybe?
                stream, mr);

Which returns a table (respectively column) obtained by extracting the requested slice from each segment. With the convention that a slice larger than the segment just returns the full segment and an out of range slice returns an empty segment.

Copy link
Contributor

@bdice bdice left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing comments (forgot to submit review yesterday).

cpp/include/cudf/aggregation.hpp Outdated Show resolved Hide resolved
* is returned.
*
* @param k Number of values to return.
* @param order What order should the groups be sorted in.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this how you control "top k" vs. "bottom k"? Maybe we can phrase this more clearly. Something like this (edits welcome).

Suggested change
* @param order What order should the groups be sorted in.
* @param order Controls the sorting order of the group, i.e. whether to return the greatest or least k elements.

cpp/src/groupby/sort/aggregate.cpp Show resolved Hide resolved
cpp/src/groupby/sort/group_reductions.hpp Outdated Show resolved Hide resolved
@@ -936,6 +936,20 @@ def nth(self, n):
del self.obj._data["__groupbynth_order__"]
return result

@_cudf_nvtx_annotate
def topk(self, k, bottom=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that pandas calls this nlargest / nsmallest, but only exposes it for SeriesGroupBy and not DataFrameGroupBy. https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.core.groupby.SeriesGroupBy.nlargest.html#pandas.core.groupby.SeriesGroupBy.nlargest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Their implementation is a bit more involved, since it has tie-breaking rules that necessitate (if keep != "first") introspecting the data rather than just the group size. So I can fill out nlargest/smallest for keep="first" and raise otherwise.

Comment on lines +43 to +48
__global__ void compute_topk_indices(cudf::device_span<size_type const> group_offsets,
cudf::device_span<size_type const> index_offsets,
cudf::device_span<size_type> indices,
size_type num_groups,
size_type k)
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be sure to compare this with Thrust's nosync execution policy, and use streams -- that can reduce overhead. I will try to look at this more closely later, to make sure it's using an optimal algorithmic approach -- or at least something like what I had in mind.

}

template std::unique_ptr<aggregation> make_top_k_aggregation(size_type k, order order);
template std::unique_ptr<groupby_aggregation> make_top_k_aggregation(size_type k, order order);
/// Factory to create a LAG aggregation
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Factory to create a LAG aggregation
/// Factory to create a LAG aggregation

wence- added 7 commits March 20, 2024 15:45
We do this in a sort-based groupby by computing the segmented sorted
order of the requested aggregated column, and then determining the
first min(k, group_size) entries of each group and gathering those.
@wence- wence- changed the base branch from branch-24.04 to branch-24.06 March 20, 2024 15:47
@wence- wence- force-pushed the wence/fea/groupby-topk branch from 579012c to 5c3e749 Compare March 20, 2024 15:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CMake CMake build issue improvement Improvement / enhancement to an existing function libcudf Affects libcudf (C++/CUDA) code. non-breaking Non-breaking change Python Affects Python cuDF API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants