Skip to content

Commit

Permalink
Merge branch 'branch-24.10' into fea-json_spark_validation
Browse files Browse the repository at this point in the history
  • Loading branch information
karthikeyann authored Aug 26, 2024
2 parents a225ce0 + d15d470 commit 7a2a451
Show file tree
Hide file tree
Showing 22 changed files with 639 additions and 581 deletions.
191 changes: 96 additions & 95 deletions cpp/src/join/mixed_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

#include "join_common_utils.cuh"
#include "join_common_utils.hpp"
#include "mixed_join_kernels.cuh"
#include "mixed_join_kernel.hpp"
#include "mixed_join_size_kernel.hpp"

#include <cudf/ast/detail/expression_parser.hpp>
#include <cudf/ast/expressions.hpp>
Expand Down Expand Up @@ -178,9 +179,6 @@ mixed_join(
join_size = output_size_data->first;
matches_per_row_span = output_size_data->second;
} else {
// Allocate storage for the counter used to get the size of the join output
rmm::device_scalar<std::size_t> size(0, stream, mr);

matches_per_row =
rmm::device_uvector<size_type>{static_cast<std::size_t>(outer_num_rows), stream, mr};
// Note that the view goes out of scope after this else statement, but the
Expand All @@ -190,37 +188,38 @@ mixed_join(
matches_per_row_span = cudf::device_span<size_type const>{
matches_per_row->begin(), static_cast<std::size_t>(outer_num_rows)};
if (has_nulls) {
compute_mixed_join_output_size<DEFAULT_JOIN_BLOCK_SIZE, true>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_conditional_view,
*right_conditional_view,
*probe_view,
*build_view,
hash_probe,
equality_probe,
kernel_join_type,
hash_table_view,
parser.device_expression_data,
swap_tables,
size.data(),
mutable_matches_per_row_span);
join_size = launch_compute_mixed_join_output_size<true>(*left_conditional_view,
*right_conditional_view,
*probe_view,
*build_view,
hash_probe,
equality_probe,
kernel_join_type,
hash_table_view,
parser.device_expression_data,
swap_tables,
mutable_matches_per_row_span,
config,
shmem_size_per_block,
stream,
mr);
} else {
compute_mixed_join_output_size<DEFAULT_JOIN_BLOCK_SIZE, false>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_conditional_view,
*right_conditional_view,
*probe_view,
*build_view,
hash_probe,
equality_probe,
kernel_join_type,
hash_table_view,
parser.device_expression_data,
swap_tables,
size.data(),
mutable_matches_per_row_span);
join_size = launch_compute_mixed_join_output_size<false>(*left_conditional_view,
*right_conditional_view,
*probe_view,
*build_view,
hash_probe,
equality_probe,
kernel_join_type,
hash_table_view,
parser.device_expression_data,
swap_tables,
mutable_matches_per_row_span,
config,
shmem_size_per_block,
stream,
mr);
}
join_size = size.value(stream);
}

// The initial early exit clauses guarantee that we will not reach this point
Expand Down Expand Up @@ -249,37 +248,39 @@ mixed_join(
auto const& join_output_r = right_indices->data();

if (has_nulls) {
mixed_join<DEFAULT_JOIN_BLOCK_SIZE, true>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_conditional_view,
*right_conditional_view,
*probe_view,
*build_view,
hash_probe,
equality_probe,
kernel_join_type,
hash_table_view,
join_output_l,
join_output_r,
parser.device_expression_data,
join_result_offsets.data(),
swap_tables);
launch_mixed_join<true>(*left_conditional_view,
*right_conditional_view,
*probe_view,
*build_view,
hash_probe,
equality_probe,
kernel_join_type,
hash_table_view,
join_output_l,
join_output_r,
parser.device_expression_data,
join_result_offsets.data(),
swap_tables,
config,
shmem_size_per_block,
stream);
} else {
mixed_join<DEFAULT_JOIN_BLOCK_SIZE, false>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_conditional_view,
*right_conditional_view,
*probe_view,
*build_view,
hash_probe,
equality_probe,
kernel_join_type,
hash_table_view,
join_output_l,
join_output_r,
parser.device_expression_data,
join_result_offsets.data(),
swap_tables);
launch_mixed_join<false>(*left_conditional_view,
*right_conditional_view,
*probe_view,
*build_view,
hash_probe,
equality_probe,
kernel_join_type,
hash_table_view,
join_output_l,
join_output_r,
parser.device_expression_data,
join_result_offsets.data(),
swap_tables,
config,
shmem_size_per_block,
stream);
}

auto join_indices = std::pair(std::move(left_indices), std::move(right_indices));
Expand Down Expand Up @@ -423,9 +424,6 @@ compute_mixed_join_output_size(table_view const& left_equality,
detail::grid_1d const config(outer_num_rows, DEFAULT_JOIN_BLOCK_SIZE);
auto const shmem_size_per_block = parser.shmem_per_thread * config.num_threads_per_block;

// Allocate storage for the counter used to get the size of the join output
rmm::device_scalar<std::size_t> size(0, stream, mr);

auto const preprocessed_probe =
experimental::row::equality::preprocessed_table::create(probe, stream);
auto const row_hash = cudf::experimental::row::hash::row_hasher{preprocessed_probe};
Expand All @@ -436,39 +434,42 @@ compute_mixed_join_output_size(table_view const& left_equality,

// Determine number of output rows without actually building the output to simply
// find what the size of the output will be.
std::size_t size = 0;
if (has_nulls) {
compute_mixed_join_output_size<DEFAULT_JOIN_BLOCK_SIZE, true>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_conditional_view,
*right_conditional_view,
*probe_view,
*build_view,
hash_probe,
equality_probe,
join_type,
hash_table_view,
parser.device_expression_data,
swap_tables,
size.data(),
matches_per_row_span);
size = launch_compute_mixed_join_output_size<true>(*left_conditional_view,
*right_conditional_view,
*probe_view,
*build_view,
hash_probe,
equality_probe,
join_type,
hash_table_view,
parser.device_expression_data,
swap_tables,
matches_per_row_span,
config,
shmem_size_per_block,
stream,
mr);
} else {
compute_mixed_join_output_size<DEFAULT_JOIN_BLOCK_SIZE, false>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
*left_conditional_view,
*right_conditional_view,
*probe_view,
*build_view,
hash_probe,
equality_probe,
join_type,
hash_table_view,
parser.device_expression_data,
swap_tables,
size.data(),
matches_per_row_span);
size = launch_compute_mixed_join_output_size<false>(*left_conditional_view,
*right_conditional_view,
*probe_view,
*build_view,
hash_probe,
equality_probe,
join_type,
hash_table_view,
parser.device_expression_data,
swap_tables,
matches_per_row_span,
config,
shmem_size_per_block,
stream,
mr);
}

return {size.value(stream), std::move(matches_per_row)};
return {size, std::move(matches_per_row)};
}

} // namespace detail
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/join/mixed_join_kernel.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,11 +15,12 @@
*/

#include "mixed_join_kernel.cuh"
#include "mixed_join_kernel.hpp"

namespace cudf {
namespace detail {

template __global__ void mixed_join<DEFAULT_JOIN_BLOCK_SIZE, false>(
template void launch_mixed_join<false>(
table_device_view left_table,
table_device_view right_table,
table_device_view probe,
Expand All @@ -32,7 +33,10 @@ template __global__ void mixed_join<DEFAULT_JOIN_BLOCK_SIZE, false>(
size_type* join_output_r,
cudf::ast::detail::expression_device_view device_expression_data,
cudf::size_type const* join_result_offsets,
bool const swap_tables);
bool const swap_tables,
detail::grid_1d const config,
int64_t shmem_size_per_block,
rmm::cuda_stream_view stream);

} // namespace detail

Expand Down
64 changes: 50 additions & 14 deletions cpp/src/join/mixed_join_kernel.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "join_common_utils.cuh"
#include "join_common_utils.hpp"
#include "mixed_join_common_utils.cuh"
#include "mixed_join_kernel.hpp"

#include <cudf/ast/detail/expression_evaluator.cuh>
#include <cudf/ast/detail/expression_parser.hpp>
Expand All @@ -39,20 +40,20 @@ namespace cg = cooperative_groups;
#pragma GCC diagnostic ignored "-Wattributes"

template <cudf::size_type block_size, bool has_nulls>
CUDF_HIDDEN __launch_bounds__(block_size) __global__
void mixed_join(table_device_view left_table,
table_device_view right_table,
table_device_view probe,
table_device_view build,
row_hash const hash_probe,
row_equality const equality_probe,
join_kind const join_type,
cudf::detail::mixed_multimap_type::device_view hash_table_view,
size_type* join_output_l,
size_type* join_output_r,
cudf::ast::detail::expression_device_view device_expression_data,
cudf::size_type const* join_result_offsets,
bool const swap_tables)
CUDF_KERNEL void __launch_bounds__(block_size)
mixed_join(table_device_view left_table,
table_device_view right_table,
table_device_view probe,
table_device_view build,
row_hash const hash_probe,
row_equality const equality_probe,
join_kind const join_type,
cudf::detail::mixed_multimap_type::device_view hash_table_view,
size_type* join_output_l,
size_type* join_output_r,
cudf::ast::detail::expression_device_view device_expression_data,
cudf::size_type const* join_result_offsets,
bool const swap_tables)
{
// Normally the casting of a shared memory array is used to create multiple
// arrays of different types from the shared memory buffer, but here it is
Expand Down Expand Up @@ -111,6 +112,41 @@ CUDF_HIDDEN __launch_bounds__(block_size) __global__
}
}

template <bool has_nulls>
void launch_mixed_join(table_device_view left_table,
table_device_view right_table,
table_device_view probe,
table_device_view build,
row_hash const hash_probe,
row_equality const equality_probe,
join_kind const join_type,
cudf::detail::mixed_multimap_type::device_view hash_table_view,
size_type* join_output_l,
size_type* join_output_r,
cudf::ast::detail::expression_device_view device_expression_data,
cudf::size_type const* join_result_offsets,
bool const swap_tables,
detail::grid_1d const config,
int64_t shmem_size_per_block,
rmm::cuda_stream_view stream)
{
mixed_join<DEFAULT_JOIN_BLOCK_SIZE, true>
<<<config.num_blocks, config.num_threads_per_block, shmem_size_per_block, stream.value()>>>(
left_table,
right_table,
probe,
build,
hash_probe,
equality_probe,
join_type,
hash_table_view,
join_output_l,
join_output_r,
device_expression_data,
join_result_offsets,
swap_tables);
}

} // namespace detail

} // namespace cudf
Loading

0 comments on commit 7a2a451

Please sign in to comment.