Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] JSON tree algorithms refactor II: Constructing device JSON column #16205

Draft
wants to merge 40 commits into
base: branch-24.10
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
1ec9617
added csr data struct
shrshi Jun 11, 2024
022d7ce
formatting
shrshi Jun 11, 2024
382633f
added test
shrshi Jun 25, 2024
1823854
formatting
shrshi Jun 25, 2024
4a7e2a5
Merge branch 'branch-24.08' into json-tree-refactor-ii
shrshi Jun 25, 2024
8d5ddfb
Merge branch 'branch-24.08' into json-tree-refactor
shrshi Jun 26, 2024
84a7749
fixing csr construction
shrshi Jun 28, 2024
810c389
moving the csr algorithms
shrshi Jun 28, 2024
6a1a415
formatting
shrshi Jun 28, 2024
d3468ba
Merge branch 'branch-24.08' into json-tree-refactor-ii
shrshi Jun 28, 2024
85c197d
Merge branch 'branch-24.08' into json-tree-refactor
shrshi Jun 28, 2024
996c6dd
Merge branch 'json-tree-refactor' of github.com:shrshi/cudf into json…
shrshi Jun 28, 2024
3675140
ignoring leaf nodes with non-leaf siblings
shrshi Jul 5, 2024
389df50
formatting
shrshi Jul 6, 2024
4bba629
moving to experimental namespace
shrshi Jul 15, 2024
25530f6
Merge branch 'branch-24.08' into json-tree-refactor
shrshi Jul 15, 2024
df9e65b
formatting
shrshi Jul 15, 2024
d1588c8
removed node properties from csr struct - will be introduced in stage…
shrshi Jul 15, 2024
b809703
partial commit before merge
shrshi Jul 16, 2024
c576370
added two validation checks for column tree
shrshi Jul 17, 2024
b04cebc
formatting
shrshi Jul 17, 2024
b804209
partial work commit
shrshi Jul 21, 2024
b8e8c07
formatting
shrshi Jul 22, 2024
7e1a756
merging branch 24.08 into current branch
shrshi Jul 24, 2024
5541b93
partial commit
shrshi Jul 24, 2024
1490ce9
Merge branch 'branch-24.10' into json-tree-refactor
shrshi Jul 24, 2024
d05e670
better csr construction
shrshi Jul 30, 2024
1ce88be
formatting
shrshi Jul 30, 2024
d6d724c
exec policy is no sync
shrshi Jul 30, 2024
2622d6b
fix copyright year
shrshi Jul 30, 2024
9498372
fixing max row offsets
shrshi Jul 31, 2024
4339b0a
formatting
shrshi Jul 31, 2024
e61288b
Merge branch 'branch-24.10' into json-tree-refactor
shrshi Jul 31, 2024
9b6b7ff
struct docs
shrshi Jul 31, 2024
53db174
Merge branch 'json-tree-refactor' of github.com:shrshi/cudf into json…
shrshi Jul 31, 2024
85608eb
cudf exports!
shrshi Jul 31, 2024
3c21e04
merge after 15979 update
shrshi Aug 1, 2024
3900ee3
refactoring after the csr updates
shrshi Aug 2, 2024
3949cda
minor fixes
shrshi Aug 2, 2024
4d88fe5
formatting
shrshi Aug 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
better csr construction
  • Loading branch information
shrshi committed Jul 30, 2024
commit d05e670916cf4ece469c2c76c8eedcafd93b9a65
199 changes: 78 additions & 121 deletions cpp/src/io/json/json_column_csr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,15 @@
#include <thrust/scan.h>
#include <thrust/sort.h>
#include <thrust/transform.h>
#include <thrust/transform_scan.h>
#include <thrust/unique.h>

#include <cub/device/device_segmented_reduce.cuh>

namespace cudf::io::json::experimental::detail {

using row_offset_t = size_type;

struct unvalidated_column_tree {
rmm::device_uvector<NodeIndexT> rowidx;
rmm::device_uvector<NodeIndexT> colidx;
rmm::device_uvector<row_offset_t> max_row_offsets;
rmm::device_uvector<NodeT> column_categories;
};

struct level_ordering {
device_span<TreeDepthT> node_levels;
device_span<NodeIndexT> col_ids;
Expand All @@ -71,6 +67,24 @@ struct level_ordering {
}
};

struct parent_nodeids_to_colids {
device_span<NodeIndexT> col_ids;
device_span<NodeIndexT> rev_mapped_col_ids;
__device__ auto operator()(NodeIndexT parent_node_id) -> NodeIndexT {
return parent_node_id == parent_node_sentinel ? parent_node_sentinel : rev_mapped_col_ids[col_ids[parent_node_id]];
}
};

template <typename T>
void print(device_span<T const> d_vec, std::string name, rmm::cuda_stream_view stream) {
auto h_vec = cudf::detail::make_std_vector_async(d_vec, stream);
std::cout << name << " = ";
for(auto e : h_vec) {
std::cout << e << " ";
}
std::cout << std::endl;
}

/**
* @brief Reduces node tree representation to column tree CSR representation.
*
Expand All @@ -85,10 +99,10 @@ struct level_ordering {
* @return A tuple of column tree representation of JSON string, column ids of columns, and
* max row offsets of columns
*/
unvalidated_column_tree reduce_to_column_tree_csr(
std::tuple<csr, column_tree_properties> reduce_to_column_tree(
tree_meta_t& tree,
device_span<NodeIndexT> col_ids,
device_span<size_type> row_offsets,
device_span<row_offset_t> row_offsets,
bool is_array_of_arrays,
NodeIndexT const row_array_parent_col_id,
rmm::cuda_stream_view stream)
Expand All @@ -109,13 +123,19 @@ unvalidated_column_tree reduce_to_column_tree_csr(
thrust::unique_count(rmm::exec_policy_nosync(stream), level_ordered_col_ids.begin(), level_ordered_col_ids.end());
rmm::device_uvector<NodeIndexT> level_ordered_unique_node_ids(num_columns, stream);
rmm::device_uvector<NodeIndexT> mapped_col_ids(num_columns, stream);
rmm::device_uvector<NodeIndexT> rev_mapped_col_ids(num_columns, stream);
thrust::unique_by_key_copy(rmm::exec_policy_nosync(stream), level_ordered_col_ids.begin(), level_ordered_node_ids.end(), level_ordered_node_ids.begin(), mapped_col_ids.begin(), level_ordered_unique_node_ids.begin());
auto rev_mapped_col_ids_it = thrust::make_permutation_iterator(thrust::make_counting_iterator(0), mapped_col_ids.begin());
auto *dev_num_levels_ptr = thrust::max_element(rmm::exec_policy(stream), tree.node_levels.begin(), tree.node_levels.end());

rmm::device_uvector<NodeIndexT> mapped_col_ids_copy(num_columns, stream);
thrust::copy(rmm::exec_policy(stream), mapped_col_ids.begin(), mapped_col_ids.end(), mapped_col_ids_copy.begin());
thrust::sequence(rmm::exec_policy(stream), rev_mapped_col_ids.begin(), rev_mapped_col_ids.end());
thrust::sort_by_key(rmm::exec_policy(stream), mapped_col_ids_copy.begin(), mapped_col_ids_copy.end(), rev_mapped_col_ids.begin());

// 2. maximum number of rows per column: computed with reduce_by_key {col_id}, {row_offset}, max.
// 3. category for each column node by aggregating all nodes in node tree corresponding to same column:
// reduce_by_key {col_id}, {node_categories} - custom opp (*+v=*, v+v=v, *+#=E)
rmm::device_uvector<size_type> max_row_offsets(num_columns, stream);
rmm::device_uvector<row_offset_t> max_row_offsets(num_columns, stream);
rmm::device_uvector<NodeT> column_categories(num_columns, stream);
auto ordered_row_offsets =
thrust::make_permutation_iterator(row_offsets.begin(), level_ordered_node_ids.begin());
Expand All @@ -133,37 +153,34 @@ unvalidated_column_tree reduce_to_column_tree_csr(
auto type_a = thrust::get<1>(a);
auto type_b = thrust::get<1>(b);

NodeT max_offset;
NodeT ctg;
auto is_a_leaf = (type_a == NC_VAL || type_a == NC_STR);
auto is_b_leaf = (type_b == NC_VAL || type_b == NC_STR);
// (v+v=v, *+*=*, *+v=*, *+#=E, NESTED+VAL=NESTED)
// *+*=*, v+v=v
if (type_a == type_b) {
max_offset = type_a;
ctg = type_a;
} else if (is_a_leaf) {
// *+v=*, N+V=N
// STRUCT/LIST + STR/VAL = STRUCT/LIST, STR/VAL + FN = ERR, STR/VAL + STR = STR
max_offset = type_b == NC_FN ? NC_ERR : (is_b_leaf ? NC_STR : type_b);
ctg = (type_b == NC_FN ? NC_ERR : (is_b_leaf ? NC_STR : type_b));
} else if (is_b_leaf) {
max_offset = type_a == NC_FN ? NC_ERR : (is_a_leaf ? NC_STR : type_a);
ctg = (type_a == NC_FN ? NC_ERR : (is_a_leaf ? NC_STR : type_a));
}
// *+#=E
max_offset = NC_ERR;
else ctg = NC_ERR;

thrust::maximum<size_type> row_offset_op;
return thrust::make_tuple(row_offset_op(row_offset_a, row_offset_b), max_offset);
return thrust::make_tuple(row_offset_op(row_offset_a, row_offset_b), ctg);
});

// 4. construct parent_col_ids using permutation iterator
rmm::device_uvector<NodeIndexT> parent_col_ids(num_columns, stream);
thrust::transform_output_iterator parent_col_ids_it(parent_col_ids.begin(), parent_nodeids_to_colids{col_ids, rev_mapped_col_ids});
thrust::copy_n(
rmm::exec_policy(stream),
thrust::make_permutation_iterator(tree.parent_node_ids.begin(), level_ordered_unique_node_ids.begin()),
num_columns,
thrust::make_transform_output_iterator(parent_col_ids.begin(),
[col_ids = col_ids.begin(), rev_mapped_col_ids_it] __device__(auto parent_node_id) -> NodeIndexT {
return parent_node_id == parent_node_sentinel ? parent_node_sentinel : rev_mapped_col_ids_it[col_ids[parent_node_id]];
}));
parent_col_ids_it);

/*
5. CSR construction:
Expand All @@ -176,7 +193,6 @@ unvalidated_column_tree reduce_to_column_tree_csr(

rmm::device_uvector<NodeIndexT> rowidx(num_columns + 1, stream);
thrust::fill(rmm::exec_policy(stream), rowidx.begin(), rowidx.end(), 0);

// Note that the first element of csr_parent_col_ids is -1 (parent_node_sentinel)
// children adjacency
auto num_non_leaf_columns = thrust::unique_count(
Expand All @@ -188,19 +204,18 @@ unvalidated_column_tree reduce_to_column_tree_csr(
thrust::make_discard_iterator(),
rowidx.begin() + 1,
thrust::equal_to<TreeDepthT>());
thrust::inclusive_scan(
rmm::exec_policy(stream), rowidx.begin() + 1, rowidx.end(), rowidx.begin() + 1);
// We are discarding the parent of the root node. Add the parent adjacency. Since we have already
// performed the scan, we use a counting iterator to add
thrust::transform(rmm::exec_policy(stream),
rowidx.begin() + 2,
rowidx.end(),
thrust::make_counting_iterator(1),
rowidx.begin() + 2,
thrust::plus<NodeIndexT>());
thrust::transform_inclusive_scan(rmm::exec_policy(stream),
thrust::make_zip_iterator(thrust::make_counting_iterator(1), rowidx.begin() + 1),
thrust::make_zip_iterator(thrust::make_counting_iterator(1) + num_columns, rowidx.end()),
rowidx.begin() + 1,
cuda::proclaim_return_type<NodeIndexT>([] __device__(auto a) {
auto n = thrust::get<0>(a);
auto idx = thrust::get<1>(a);
return n == 1 ? idx : idx + 1;
}),
thrust::plus<NodeIndexT>{});

rmm::device_uvector<NodeIndexT> colidx((num_columns - 1) * 2, stream);

// Skip the parent of root node
thrust::scatter(rmm::exec_policy(stream),
parent_col_ids.begin() + 1,
Expand Down Expand Up @@ -232,102 +247,44 @@ unvalidated_column_tree reduce_to_column_tree_csr(
map.begin(),
colidx.begin());

// condition is true if parent is not a list, or sentinel/root
// Special case to return true if parent is a list and is_array_of_arrays is true
auto is_non_list_parent = [column_categories = column_categories.begin(),
is_array_of_arrays,
row_array_parent_col_id] __device__(auto parent_col_id) -> bool {
return !(parent_col_id == parent_node_sentinel ||
column_categories[parent_col_id] == NC_LIST &&
(!is_array_of_arrays || parent_col_id != row_array_parent_col_id));
};
// Mixed types in List children go to different columns,
// so all immediate children of list column should have same max_row_offsets.
// create list's children max_row_offsets array. (initialize to zero)
// atomicMax on children max_row_offsets array.
// gather the max_row_offsets from children row offset array.
{
rmm::device_uvector<NodeIndexT> list_parents_children_max_row_offsets(num_columns, stream);
thrust::fill(rmm::exec_policy(stream),
list_parents_children_max_row_offsets.begin(),
list_parents_children_max_row_offsets.end(),
0);
auto list_nodes = thrust::make_permutation_iterator
auto max_row_offsets_it = thrust::make_permutation_iterator(max_row_offsets.begin(), colidx.begin());
rmm::device_uvector<row_offset_t> max_children_max_row_offsets(num_columns, stream);
size_t temp_storage_bytes = 0;
cub::DeviceSegmentedReduce::Max(nullptr, temp_storage_bytes, max_row_offsets_it, max_children_max_row_offsets.begin(), num_columns, rowidx.begin(), rowidx.begin() + 1, stream.value());
rmm::device_buffer d_temp_storage(temp_storage_bytes, stream);
cub::DeviceSegmentedReduce::Max(d_temp_storage.data(), temp_storage_bytes, max_row_offsets_it, max_children_max_row_offsets.begin(), num_columns, rowidx.begin(), rowidx.begin() + 1, stream.value());

thrust::for_each(rmm::exec_policy(stream),
csr_unique_col_ids.begin(),
csr_unique_col_ids.end(),
[csr_column_categories = csr_column_categories.begin(),
csr_parent_col_ids = csr_parent_col_ids.begin(),
csr_max_row_offsets = csr_max_row_offsets.begin(),
list_parents_children_max_row_offsets =
list_parents_children_max_row_offsets.begin()] __device__(auto col_id) {
auto csr_parent_col_id = csr_parent_col_ids[col_id];
if (csr_parent_col_id != parent_node_sentinel and
csr_column_categories[csr_parent_col_id] == node_t::NC_LIST) {
cuda::atomic_ref<NodeIndexT, cuda::thread_scope_device> ref{
*(list_parents_children_max_row_offsets + csr_parent_col_id)};
ref.fetch_max(csr_max_row_offsets[col_id],
cuda::std::memory_order_relaxed);
}
});
thrust::gather_if(
rmm::exec_policy(stream),
csr_parent_col_ids.begin(),
csr_parent_col_ids.end(),
csr_parent_col_ids.begin(),
list_parents_children_max_row_offsets.begin(),
csr_max_row_offsets.begin(),
[csr_column_categories = csr_column_categories.begin()] __device__(size_type parent_col_id) {
return parent_col_id != parent_node_sentinel and
csr_column_categories[parent_col_id] == node_t::NC_LIST;
});
rmm::device_uvector<NodeIndexT> list_ancestors(num_columns, stream);
thrust::for_each_n(rmm::exec_policy(stream), thrust::make_counting_iterator(0), num_columns,
[rowidx = rowidx.begin(),
colidx = colidx.begin(),
column_categories = column_categories.begin(),
dev_num_levels_ptr,
list_ancestors = list_ancestors.begin()] __device__(NodeIndexT node) {
auto num_levels = *dev_num_levels_ptr;
list_ancestors[node] = node;
for(int level = 0; level < num_levels; level++) {
if(list_ancestors[node] > 0)
list_ancestors[node] = colidx[rowidx[list_ancestors[node]]];
else list_ancestors[node] = -1;
if(list_ancestors[node] == -1 || column_categories[list_ancestors[node]] == NC_LIST) break;
}
});
thrust::gather_if(rmm::exec_policy(stream), list_ancestors.begin(), list_ancestors.end(), list_ancestors.begin(),
max_children_max_row_offsets.begin(), max_row_offsets.begin(),
[] __device__(auto ancestor) {
return ancestor != -1;
});
}

// copy lists' max_row_offsets to children.
// all structs should have same size.
thrust::transform_if(
rmm::exec_policy(stream),
csr_unique_col_ids.begin(),
csr_unique_col_ids.end(),
csr_max_row_offsets.begin(),
[csr_column_categories = csr_column_categories.begin(),
is_non_list_parent,
csr_parent_col_ids = csr_parent_col_ids.begin(),
csr_max_row_offsets = csr_max_row_offsets.begin()] __device__(size_type col_id) {
auto parent_col_id = csr_parent_col_ids[col_id];
// condition is true if parent is not a list, or sentinel/root
while (is_non_list_parent(parent_col_id)) {
col_id = parent_col_id;
parent_col_id = csr_parent_col_ids[parent_col_id];
}
return csr_max_row_offsets[col_id];
},
[csr_column_categories = csr_column_categories.begin(),
is_non_list_parent,
parent_col_ids = csr_parent_col_ids.begin()] __device__(size_type col_id) {
auto parent_col_id = parent_col_ids[col_id];
// condition is true if parent is not a list, or sentinel/root
return is_non_list_parent(parent_col_id);
});

// For Struct and List (to avoid copying entire strings when mixed type as string is enabled)
thrust::transform_if(
rmm::exec_policy(stream),
csr_col_range_begin.begin(),
csr_col_range_begin.end(),
csr_column_categories.begin(),
csr_col_range_end.begin(),
[] __device__(auto i) { return i + 1; },
[] __device__(NodeT type) { return type == NC_STRUCT || type == NC_LIST; });

return std::tuple{column_tree_csr{std::move(rowidx),
std::move(colidx),
std::move(csr_unique_col_ids),
std::move(csr_column_categories),
std::move(csr_col_range_begin),
std::move(csr_col_range_end)},
std::move(csr_max_row_offsets)};
return std::tuple{csr{std::move(rowidx), std::move(colidx)},
column_tree_properties{std::move(column_categories), std::move(max_row_offsets), std::move(mapped_col_ids)}};
}

} // namespace cudf::io::json::experimental::detail
19 changes: 16 additions & 3 deletions cpp/src/io/json/nested_json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,20 @@ struct device_json_column {
};

namespace experimental {
/*
* @brief Sparse graph adjacency matrix stored in Compressed Sparse Row (CSR) format.
*/
struct csr {
rmm::device_uvector<NodeIndexT> rowidx;
rmm::device_uvector<NodeIndexT> colidx;
};

struct column_tree_properties {
rmm::device_uvector<NodeT> categories;
rmm::device_uvector<size_type> max_row_offsets;
rmm::device_uvector<NodeIndexT> mapped_ids;
};

/*
* @brief Unvalidated column tree stored in Compressed Sparse Row (CSR) format. The device json column
* subtree - the subgraph that conforms to column tree properties - is extracted and further processed
Expand All @@ -195,6 +209,7 @@ namespace experimental {
*/
struct column_tree {
// position of nnzs
csr adjacency;
rmm::device_uvector<NodeIndexT> rowidx;
rmm::device_uvector<NodeIndexT> colidx;
// device_json_column properties
Expand Down Expand Up @@ -223,11 +238,9 @@ namespace detail {
* in each column
*/

std::tuple<column_tree_csr, rmm::device_uvector<size_type>> reduce_to_column_tree_csr(
std::tuple<csr, column_tree_properties> reduce_to_column_tree(
tree_meta_t& tree,
device_span<NodeIndexT> original_col_ids,
device_span<NodeIndexT> sorted_col_ids,
device_span<NodeIndexT> ordered_node_ids,
device_span<size_type> row_offsets,
bool is_array_of_arrays,
NodeIndexT const row_array_parent_col_id,
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ ConfigureTest(ARROW_IO_SOURCE_TEST io/arrow_io_source_test.cpp)
ConfigureTest(MULTIBYTE_SPLIT_TEST io/text/multibyte_split_test.cpp)
ConfigureTest(JSON_QUOTE_NORMALIZATION io/json/json_quote_normalization_test.cpp)
ConfigureTest(JSON_WHITESPACE_NORMALIZATION io/json/json_whitespace_normalization_test.cu)
ConfigureTest(JSON_TREE_CSR io/json/json_tree_csr.cu)
ConfigureTest(
DATA_CHUNK_SOURCE_TEST io/text/data_chunk_source_test.cpp
GPUS 1
Expand Down
Loading