From 117eff6bf1eb8a46c597fd8f9e76a22fa363f03a Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 24 Apr 2024 12:26:09 -0700 Subject: [PATCH 1/7] Add BYTE_STREAM_SPLIT support to Parquet (#15311) Closes #15226. Part of #13501. Adds support for reading and writing `BYTE_STREAM_SPLIT` encoded Parquet data. Includes a "microkernel" version like those introduced by #15159. Authors: - Ed Seidl (https://github.com/etseidl) - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Muhammad Haseeb (https://github.com/mhaseeb123) - Vukasin Milovanovic (https://github.com/vuule) URL: https://github.com/rapidsai/cudf/pull/15311 --- cpp/include/cudf/io/types.hpp | 1 + cpp/src/io/parquet/decode_fixed.cu | 229 ++++++++++++++++++++++- cpp/src/io/parquet/page_data.cu | 198 +++++++++++++++++++- cpp/src/io/parquet/page_data.cuh | 76 ++++++++ cpp/src/io/parquet/page_decode.cuh | 1 + cpp/src/io/parquet/page_delta_decode.cu | 6 +- cpp/src/io/parquet/page_enc.cu | 137 ++++++++------ cpp/src/io/parquet/page_hdr.cu | 26 ++- cpp/src/io/parquet/page_string_decode.cu | 16 +- cpp/src/io/parquet/parquet_gpu.hpp | 84 +++++++-- cpp/src/io/parquet/reader_impl.cpp | 22 +++ cpp/src/io/parquet/writer_impl.cu | 18 +- cpp/tests/io/parquet_common.cpp | 1 + cpp/tests/io/parquet_writer_test.cpp | 159 +++++++++++++++- 14 files changed, 876 insertions(+), 98 deletions(-) diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index 65d4a4417f0..b3dea0ab280 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -113,6 +113,7 @@ enum class column_encoding { ///< valid for BYTE_ARRAY columns) DELTA_BYTE_ARRAY, ///< Use DELTA_BYTE_ARRAY encoding (only valid for ///< BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY columns) + BYTE_STREAM_SPLIT, ///< Use BYTE_STREAM_SPLIT encoding (valid for all fixed width types) // ORC encodings: DIRECT, ///< Use DIRECT encoding DIRECT_V2, ///< Use DIRECT_V2 encoding diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index 945a7dcb4c6..f3332a23992 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -225,6 +225,96 @@ __device__ inline void gpuDecodeValues( } } +template +__device__ inline void gpuDecodeSplitValues(page_state_s* s, + state_buf* const sb, + int start, + int end) +{ + using cudf::detail::warp_size; + constexpr int num_warps = decode_block_size / warp_size; + constexpr int max_batch_size = num_warps * warp_size; + + auto const t = threadIdx.x; + + PageNestingDecodeInfo* nesting_info_base = s->nesting_info; + int const dtype = s->col.physical_type; + auto const data_len = thrust::distance(s->data_start, s->data_end); + auto const num_values = data_len / s->dtype_len_in; + + // decode values + int pos = start; + while (pos < end) { + int const batch_size = min(max_batch_size, end - pos); + + int const target_pos = pos + batch_size; + int const src_pos = pos + t; + + // the position in the output column/buffer + int dst_pos = sb->nz_idx[rolling_index(src_pos)] - s->first_row; + + // target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values + // before first_row) in the flat hierarchy case. + if (src_pos < target_pos && dst_pos >= 0) { + // nesting level that is storing actual leaf values + int const leaf_level_index = s->col.max_nesting_depth - 1; + + uint32_t dtype_len = s->dtype_len; + uint8_t const* src = s->data_start + src_pos; + uint8_t* dst = + nesting_info_base[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; + auto const is_decimal = + s->col.logical_type.has_value() and s->col.logical_type->type == LogicalType::DECIMAL; + + // Note: non-decimal FIXED_LEN_BYTE_ARRAY will be handled in the string reader + if (is_decimal) { + switch (dtype) { + case INT32: gpuOutputByteStreamSplit(dst, src, num_values); break; + case INT64: gpuOutputByteStreamSplit(dst, src, num_values); break; + case FIXED_LEN_BYTE_ARRAY: + if (s->dtype_len_in <= sizeof(int32_t)) { + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast(dst), src, num_values, s->dtype_len_in); + break; + } else if (s->dtype_len_in <= sizeof(int64_t)) { + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast(dst), src, num_values, s->dtype_len_in); + break; + } else if (s->dtype_len_in <= sizeof(__int128_t)) { + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast<__int128_t*>(dst), src, num_values, s->dtype_len_in); + break; + } + // unsupported decimal precision + [[fallthrough]]; + + default: s->set_error_code(decode_error::UNSUPPORTED_ENCODING); + } + } else if (dtype_len == 8) { + if (s->dtype_len_in == 4) { + // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS + // TIME_MILLIS is the only duration type stored as int32: + // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype + gpuOutputByteStreamSplit(dst, src, num_values); + // zero out most significant bytes + memset(dst + 4, 0, 4); + } else if (s->ts_scale) { + gpuOutputSplitInt64Timestamp( + reinterpret_cast(dst), src, num_values, s->ts_scale); + } else { + gpuOutputByteStreamSplit(dst, src, num_values); + } + } else if (dtype_len == 4) { + gpuOutputByteStreamSplit(dst, src, num_values); + } else { + s->set_error_code(decode_error::UNSUPPORTED_ENCODING); + } + } + + pos += batch_size; + } +} + // is the page marked nullable or not __device__ inline bool is_nullable(page_state_s* s) { @@ -495,6 +585,123 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) if (t == 0 and s->error != 0) { set_error(s->error, error_code); } } +/** + * @brief Kernel for computing fixed width non dictionary column data stored in the pages + * + * This function will write the page data and the page data's validity to the + * output specified in the page's column chunk. If necessary, additional + * conversion will be performed to translate from the Parquet datatype to + * desired output datatype. + * + * @param pages List of pages + * @param chunks List of column chunks + * @param min_row Row index to start reading at + * @param num_rows Maximum number of rows to read + * @param error_code Error code to set if an error is encountered + */ +template +CUDF_KERNEL void __launch_bounds__(decode_block_size) + gpuDecodeSplitPageDataFlat(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + kernel_error::pointer error_code) +{ + __shared__ __align__(16) page_state_s state_g; + __shared__ __align__(16) page_state_buffers_s // unused in this kernel + state_buffers; + + page_state_s* const s = &state_g; + auto* const sb = &state_buffers; + int const page_idx = blockIdx.x; + int const t = threadIdx.x; + PageInfo* pp = &pages[page_idx]; + + if (!(BitAnd(pages[page_idx].kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT))) { + return; + } + + // must come after the kernel mask check + [[maybe_unused]] null_count_back_copier _{s, t}; + + if (!setupLocalPageInfo(s, + pp, + chunks, + min_row, + num_rows, + mask_filter{decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT}, + page_processing_stage::DECODE)) { + return; + } + + // the level stream decoders + __shared__ rle_run def_runs[rle_run_buffer_size]; + rle_stream def_decoder{def_runs}; + + // if we have no work to do (eg, in a skip_rows/num_rows case) in this page. + if (s->num_rows == 0) { return; } + + bool const nullable = is_nullable(s); + bool const nullable_with_nulls = nullable && has_nulls(s); + + // initialize the stream decoders (requires values computed in setupLocalPageInfo) + level_t* const def = reinterpret_cast(pp->lvl_decode_buf[level_type::DEFINITION]); + if (nullable_with_nulls) { + def_decoder.init(s->col.level_bits[level_type::DEFINITION], + s->abs_lvl_start[level_type::DEFINITION], + s->abs_lvl_end[level_type::DEFINITION], + def, + s->page.num_input_values); + } + __syncthreads(); + + // We use two counters in the loop below: processed_count and valid_count. + // - processed_count: number of rows out of num_input_values that we have decoded so far. + // the definition stream returns the number of total rows it has processed in each call + // to decode_next and we accumulate in process_count. + // - valid_count: number of non-null rows we have decoded so far. In each iteration of the + // loop below, we look at the number of valid items (which could be all for non-nullable), + // and valid_count is that running count. + int processed_count = 0; + int valid_count = 0; + // the core loop. decode batches of level stream data using rle_stream objects + // and pass the results to gpuDecodeValues + while (s->error == 0 && processed_count < s->page.num_input_values) { + int next_valid_count; + + // only need to process definition levels if this is a nullable column + if (nullable) { + if (nullable_with_nulls) { + processed_count += def_decoder.decode_next(t); + __syncthreads(); + } else { + processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); + } + + next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( + processed_count, s, sb, def, t, nullable_with_nulls); + } + // if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip + // this function call entirely since all it will ever generate is a mapping of (i -> i) for + // nz_idx. gpuDecodeValues would be the only work that happens. + else { + processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); + next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( + processed_count, s, sb, nullptr, t, false); + } + __syncthreads(); + + // decode the values themselves + gpuDecodeSplitValues(s, sb, valid_count, next_valid_count); + __syncthreads(); + + valid_count = next_valid_count; + } + if (t == 0 and s->error != 0) { set_error(s->error, error_code); } +} + } // anonymous namespace void __host__ DecodePageDataFixed(cudf::detail::hostdevice_span pages, @@ -528,7 +735,7 @@ void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span pa // dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block // 1 full warp, and 1 warp of 1 thread dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block - dim3 dim_grid(pages.size(), 1); // 1 thread block per pags => # blocks + dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks if (level_type_size == 1) { gpuDecodePageDataFixedDict<<>>( @@ -539,4 +746,24 @@ void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span pa } } +void __host__ DecodeSplitPageDataFlat(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream) +{ + dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block + dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks + + if (level_type_size == 1) { + gpuDecodeSplitPageDataFlat<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodeSplitPageDataFlat<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } +} + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 62ce5b9f9a5..7207173b82f 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -28,6 +28,177 @@ namespace { constexpr int decode_block_size = 128; constexpr int rolling_buf_size = decode_block_size * 2; +/** + * @brief Kernel for computing the BYTE_STREAM_SPLIT column data stored in the pages + * + * This is basically the PLAIN decoder, but with a pared down set of supported data + * types, and using output functions that piece together the individual streams. + * Supported physical types include INT32, INT64, FLOAT, DOUBLE and FIXED_LEN_BYTE_ARRAY. + * The latter is currently only used for large decimals. The Parquet specification also + * has FLOAT16 and UUID types that are currently not supported. FIXED_LEN_BYTE_ARRAY data + * that lacks a `LogicalType` annotation will be handled by the string decoder. + * + * @param pages List of pages + * @param chunks List of column chunks + * @param min_row Row index to start reading at + * @param num_rows Maximum number of rows to read + * @param error_code Error code to set if an error is encountered + */ +template +CUDF_KERNEL void __launch_bounds__(decode_block_size) + gpuDecodeSplitPageData(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + kernel_error::pointer error_code) +{ + using cudf::detail::warp_size; + __shared__ __align__(16) page_state_s state_g; + __shared__ __align__(16) + page_state_buffers_s + state_buffers; + + page_state_s* const s = &state_g; + auto* const sb = &state_buffers; + int page_idx = blockIdx.x; + int t = threadIdx.x; + [[maybe_unused]] null_count_back_copier _{s, t}; + + if (!setupLocalPageInfo(s, + &pages[page_idx], + chunks, + min_row, + num_rows, + mask_filter{decode_kernel_mask::BYTE_STREAM_SPLIT}, + page_processing_stage::DECODE)) { + return; + } + + bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0; + + auto const data_len = thrust::distance(s->data_start, s->data_end); + auto const num_values = data_len / s->dtype_len_in; + auto const out_thread0 = warp_size; + + PageNestingDecodeInfo* nesting_info_base = s->nesting_info; + + __shared__ level_t rep[rolling_buf_size]; // circular buffer of repetition level values + __shared__ level_t def[rolling_buf_size]; // circular buffer of definition level values + + // skipped_leaf_values will always be 0 for flat hierarchies. + uint32_t skipped_leaf_values = s->page.skipped_leaf_values; + while (s->error == 0 && + (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { + int target_pos; + int src_pos = s->src_pos; + + if (t < out_thread0) { + target_pos = min(src_pos + 2 * (decode_block_size - out_thread0), + s->nz_count + (decode_block_size - out_thread0)); + } else { + target_pos = min(s->nz_count, src_pos + decode_block_size - out_thread0); + } + // this needs to be here to prevent warp 1 modifying src_pos before all threads have read it + __syncthreads(); + + if (t < warp_size) { + // decode repetition and definition levels. + // - update validity vectors + // - updates offsets (for nested columns) + // - produces non-NULL value indices in s->nz_idx for subsequent decoding + gpuDecodeLevels(s, sb, target_pos, rep, def, t); + } else { + // WARP1..WARP3: Decode values + int const dtype = s->col.physical_type; + src_pos += t - out_thread0; + + // the position in the output column/buffer + int dst_pos = sb->nz_idx[rolling_index(src_pos)]; + + // for the flat hierarchy case we will be reading from the beginning of the value stream, + // regardless of the value of first_row. so adjust our destination offset accordingly. + // example: + // - user has passed skip_rows = 2, so our first_row to output is 2 + // - the row values we get from nz_idx will be + // 0, 1, 2, 3, 4 .... + // - by shifting these values by first_row, the sequence becomes + // -2, -1, 0, 1, 2 ... + // - so we will end up ignoring the first two input rows, and input rows 2..n will + // get written to the output starting at position 0. + // + if (!has_repetition) { dst_pos -= s->first_row; } + + // target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values + // before first_row) in the flat hierarchy case. + if (src_pos < target_pos && dst_pos >= 0) { + // src_pos represents the logical row position we want to read from. But in the case of + // nested hierarchies, there is no 1:1 mapping of rows to values. So our true read position + // has to take into account the # of values we have to skip in the page to get to the + // desired logical row. For flat hierarchies, skipped_leaf_values will always be 0. + uint32_t val_src_pos = src_pos + skipped_leaf_values; + + // nesting level that is storing actual leaf values + int leaf_level_index = s->col.max_nesting_depth - 1; + + uint32_t dtype_len = s->dtype_len; + uint8_t const* src = s->data_start + val_src_pos; + uint8_t* dst = + nesting_info_base[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; + auto const is_decimal = + s->col.logical_type.has_value() and s->col.logical_type->type == LogicalType::DECIMAL; + + // Note: non-decimal FIXED_LEN_BYTE_ARRAY will be handled in the string reader + if (is_decimal) { + switch (dtype) { + case INT32: gpuOutputByteStreamSplit(dst, src, num_values); break; + case INT64: gpuOutputByteStreamSplit(dst, src, num_values); break; + case FIXED_LEN_BYTE_ARRAY: + if (s->dtype_len_in <= sizeof(int32_t)) { + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast(dst), src, num_values, s->dtype_len_in); + break; + } else if (s->dtype_len_in <= sizeof(int64_t)) { + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast(dst), src, num_values, s->dtype_len_in); + break; + } else if (s->dtype_len_in <= sizeof(__int128_t)) { + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast<__int128_t*>(dst), src, num_values, s->dtype_len_in); + break; + } + // unsupported decimal precision + [[fallthrough]]; + + default: s->set_error_code(decode_error::UNSUPPORTED_ENCODING); + } + } else if (dtype_len == 8) { + if (s->dtype_len_in == 4) { + // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS + // TIME_MILLIS is the only duration type stored as int32: + // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype + gpuOutputByteStreamSplit(dst, src, num_values); + // zero out most significant bytes + memset(dst + 4, 0, 4); + } else if (s->ts_scale) { + gpuOutputSplitInt64Timestamp( + reinterpret_cast(dst), src, num_values, s->ts_scale); + } else { + gpuOutputByteStreamSplit(dst, src, num_values); + } + } else if (dtype_len == 4) { + gpuOutputByteStreamSplit(dst, src, num_values); + } else { + s->set_error_code(decode_error::UNSUPPORTED_ENCODING); + } + } + + if (t == out_thread0) { s->src_pos = target_pos; } + } + __syncthreads(); + } + if (t == 0 and s->error != 0) { set_error(s->error, error_code); } +} + /** * @brief Kernel for computing the column data stored in the pages * @@ -145,7 +316,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) // - the row values we get from nz_idx will be // 0, 1, 2, 3, 4 .... // - by shifting these values by first_row, the sequence becomes - // -1, -2, 0, 1, 2 ... + // -2, -1, 0, 1, 2 ... // - so we will end up ignoring the first two input rows, and input rows 2..n will // get written to the output starting at position 0. // @@ -267,4 +438,29 @@ void __host__ DecodePageData(cudf::detail::hostdevice_span pages, } } +/** + * @copydoc cudf::io::parquet::detail::DecodePageData + */ +void __host__ DecodeSplitPageData(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream) +{ + CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); + + dim3 dim_block(decode_block_size, 1); + dim3 dim_grid(pages.size(), 1); // 1 threadblock per page + + if (level_type_size == 1) { + gpuDecodeSplitPageData<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodeSplitPageData<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } +} + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/page_data.cuh b/cpp/src/io/parquet/page_data.cuh index df8d801d66c..f182747650e 100644 --- a/cpp/src/io/parquet/page_data.cuh +++ b/cpp/src/io/parquet/page_data.cuh @@ -396,4 +396,80 @@ inline __device__ void gpuOutputGeneric( } } } + +/** + * Output a BYTE_STREAM_SPLIT value of type `T`. + * + * Data is encoded as N == sizeof(T) streams of length M, forming an NxM sized matrix. + * Rows are streams, columns are individual values. + * + * @param dst pointer to output data + * @param src pointer to first byte of input data in stream 0 + * @param stride number of bytes per input stream (M) + */ +template +__device__ inline void gpuOutputByteStreamSplit(uint8_t* dst, uint8_t const* src, size_type stride) +{ + for (int i = 0; i < sizeof(T); i++) { + dst[i] = src[i * stride]; + } +} + +/** + * Output a 64-bit BYTE_STREAM_SPLIT encoded timestamp. + * + * Data is encoded as N streams of length M, forming an NxM sized matrix. Rows are streams, + * columns are individual values. + * + * @param dst pointer to output data + * @param src pointer to first byte of input data in stream 0 + * @param stride number of bytes per input stream (M) + * @param ts_scale timestamp scale + */ +inline __device__ void gpuOutputSplitInt64Timestamp(int64_t* dst, + uint8_t const* src, + size_type stride, + int32_t ts_scale) +{ + gpuOutputByteStreamSplit(reinterpret_cast(dst), src, stride); + if (ts_scale < 0) { + // round towards negative infinity + int sign = (*dst < 0); + *dst = ((*dst + sign) / -ts_scale) + sign; + } else { + *dst = *dst * ts_scale; + } +} + +/** + * Output a BYTE_STREAM_SPLIT encoded decimal as an integer type. + * + * Data is encoded as N streams of length M, forming an NxM sized matrix. Rows are streams, + * columns are individual values. + * + * @param dst pointer to output data + * @param src pointer to first byte of input data in stream 0 + * @param stride number of bytes per input stream (M) + * @param dtype_len_in length of the `FIXED_LEN_BYTE_ARRAY` used to represent the decimal + */ +template +__device__ void gpuOutputSplitFixedLenByteArrayAsInt(T* dst, + uint8_t const* src, + size_type stride, + uint32_t dtype_len_in) +{ + T unscaled = 0; + // fixed_len_byte_array decimals are big endian + for (unsigned int i = 0; i < dtype_len_in; i++) { + unscaled = (unscaled << 8) | src[i * stride]; + } + // Shift the unscaled value up and back down when it isn't all 8 bytes, + // which sign extend the value for correctly representing negative numbers. + if (dtype_len_in < sizeof(T)) { + unscaled <<= (sizeof(T) - dtype_len_in) * 8; + unscaled >>= (sizeof(T) - dtype_len_in) * 8; + } + *dst = unscaled; +} + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 83bf7fb0d73..0c139fced24 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -1316,6 +1316,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, } break; case Encoding::PLAIN: + case Encoding::BYTE_STREAM_SPLIT: s->dict_size = static_cast(end - cur); s->dict_val = 0; if (s->col.physical_type == BOOLEAN) { s->dict_run = s->dict_size * 2 + 1; } diff --git a/cpp/src/io/parquet/page_delta_decode.cu b/cpp/src/io/parquet/page_delta_decode.cu index 7c0092c6185..da1bbaebd73 100644 --- a/cpp/src/io/parquet/page_delta_decode.cu +++ b/cpp/src/io/parquet/page_delta_decode.cu @@ -315,7 +315,7 @@ CUDF_KERNEL void __launch_bounds__(96) using cudf::detail::warp_size; __shared__ __align__(16) delta_binary_decoder db_state; __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(16) page_state_buffers_s state_buffers; + __shared__ __align__(16) page_state_buffers_s state_buffers; page_state_s* const s = &state_g; auto* const sb = &state_buffers; @@ -440,7 +440,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) using cudf::detail::warp_size; __shared__ __align__(16) delta_byte_array_decoder db_state; __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(16) page_state_buffers_s state_buffers; + __shared__ __align__(16) page_state_buffers_s state_buffers; page_state_s* const s = &state_g; auto* const sb = &state_buffers; @@ -605,7 +605,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) using cudf::detail::warp_size; __shared__ __align__(16) delta_binary_decoder db_state; __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(16) page_state_buffers_s state_buffers; + __shared__ __align__(16) page_state_buffers_s state_buffers; __shared__ __align__(8) uint8_t const* page_string_data; __shared__ size_t string_offset; diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 2db6dc4270d..227f13db60e 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -15,6 +15,7 @@ */ #include "delta_enc.cuh" +#include "io/parquet/parquet_gpu.hpp" #include "io/utilities/block_utils.cuh" #include "page_string_utils.cuh" #include "parquet_gpu.cuh" @@ -238,8 +239,10 @@ void __device__ calculate_frag_size(frag_init_state_s* const s, int t) Encoding __device__ determine_encoding(PageType page_type, Type physical_type, bool use_dictionary, - bool write_v2_headers) + bool write_v2_headers, + bool is_split_stream) { + if (is_split_stream) { return Encoding::BYTE_STREAM_SPLIT; } // NOTE: For dictionary encoding, parquet v2 recommends using PLAIN in dictionary page and // RLE_DICTIONARY in data page, but parquet v1 uses PLAIN_DICTIONARY in both dictionary and // data pages (actual encoding is identical). @@ -514,6 +517,7 @@ __device__ encode_kernel_mask data_encoding_for_col(EncColumnChunk const* chunk, case column_encoding::DELTA_BINARY_PACKED: return encode_kernel_mask::DELTA_BINARY; case column_encoding::DELTA_LENGTH_BYTE_ARRAY: return encode_kernel_mask::DELTA_LENGTH_BA; case column_encoding::DELTA_BYTE_ARRAY: return encode_kernel_mask::DELTA_BYTE_ARRAY; + case column_encoding::BYTE_STREAM_SPLIT: return encode_kernel_mask::BYTE_STREAM_SPLIT; } } @@ -1608,6 +1612,19 @@ __device__ void finish_page_encode(state_buf* s, } } +// Encode a fixed-width data type int `dst`. `dst` points to the first byte +// of the result. `stride` is 1 for PLAIN encoding and num_values for +// BYTE_STREAM_SPLIT. +template +__device__ inline void encode_value(uint8_t* dst, T src, size_type stride) +{ + T v = src; + for (int i = 0; i < sizeof(T); i++) { + dst[i * stride] = v; + v >>= 8; + } +} + // PLAIN page data encoder // blockDim(128, 1, 1) template @@ -1616,7 +1633,8 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) device_span> comp_in, device_span> comp_out, device_span comp_results, - bool write_v2_headers) + bool write_v2_headers, + bool is_split_stream) { __shared__ __align__(8) page_enc_state_s<0> state_g; using block_scan = cub::BlockScan; @@ -1636,7 +1654,9 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) } __syncthreads(); - if (BitAnd(s->page.kernel_mask, encode_kernel_mask::PLAIN) == 0) { return; } + auto const allowed_mask = + is_split_stream ? encode_kernel_mask::BYTE_STREAM_SPLIT : encode_kernel_mask::PLAIN; + if (BitAnd(s->page.kernel_mask, allowed_mask) == 0) { return; } // Encode data values __syncthreads(); @@ -1650,18 +1670,20 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) }(); if (t == 0) { - uint8_t* dst = s->cur; - s->rle_run = 0; - s->rle_pos = 0; - s->rle_numvals = 0; - s->rle_out = dst; - s->page.encoding = - determine_encoding(s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers); + uint8_t* dst = s->cur; + s->rle_run = 0; + s->rle_pos = 0; + s->rle_numvals = 0; + s->rle_out = dst; + s->page.encoding = determine_encoding( + s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers, is_split_stream); s->page_start_val = row_to_value_idx(s->page.start_row, s->col); s->chunk_start_val = row_to_value_idx(s->ck.start_row, s->col); } __syncthreads(); + auto const stride = is_split_stream ? s->page.num_valid : 1; + for (uint32_t cur_val_idx = 0; cur_val_idx < s->page.num_leaf_values;) { uint32_t nvals = min(s->page.num_leaf_values - cur_val_idx, block_size); uint32_t len, pos; @@ -1708,6 +1730,13 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) uint32_t total_len = 0; block_scan(scan_storage).ExclusiveSum(len, pos, total_len); __syncthreads(); + + // if BYTE_STREAM_SPLIT, then translate byte positions to indexes + if (is_split_stream) { + pos /= dtype_len_out; + total_len /= dtype_len_out; + } + if (t == 0) { s->cur = dst + total_len; } if (is_valid) { switch (physical_type) { @@ -1725,13 +1754,11 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) } }(); - dst[pos + 0] = v; - dst[pos + 1] = v >> 8; - dst[pos + 2] = v >> 16; - dst[pos + 3] = v >> 24; + encode_value(dst + pos, v, stride); } break; + case DOUBLE: case INT64: { - int64_t v = s->col.leaf_column->element(val_idx); + auto v = s->col.leaf_column->element(val_idx); int32_t ts_scale = s->col.ts_scale; if (ts_scale != 0) { if (ts_scale < 0) { @@ -1740,16 +1767,10 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) v *= ts_scale; } } - dst[pos + 0] = v; - dst[pos + 1] = v >> 8; - dst[pos + 2] = v >> 16; - dst[pos + 3] = v >> 24; - dst[pos + 4] = v >> 32; - dst[pos + 5] = v >> 40; - dst[pos + 6] = v >> 48; - dst[pos + 7] = v >> 56; + encode_value(dst + pos, v, stride); } break; case INT96: { + // only PLAIN encoding is supported int64_t v = s->col.leaf_column->element(val_idx); int32_t ts_scale = s->col.ts_scale; if (ts_scale != 0) { @@ -1776,27 +1797,14 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) }(); // the 12 bytes of fixed length data. - v = last_day_nanos.count(); - dst[pos + 0] = v; - dst[pos + 1] = v >> 8; - dst[pos + 2] = v >> 16; - dst[pos + 3] = v >> 24; - dst[pos + 4] = v >> 32; - dst[pos + 5] = v >> 40; - dst[pos + 6] = v >> 48; - dst[pos + 7] = v >> 56; - uint32_t w = julian_days.count(); - dst[pos + 8] = w; - dst[pos + 9] = w >> 8; - dst[pos + 10] = w >> 16; - dst[pos + 11] = w >> 24; + v = last_day_nanos.count(); + encode_value(dst + pos, v, 1); + uint32_t w = julian_days.count(); + encode_value(dst + pos + 8, w, 1); } break; - case DOUBLE: { - auto v = s->col.leaf_column->element(val_idx); - memcpy(dst + pos, &v, 8); - } break; case BYTE_ARRAY: { + // only PLAIN encoding is supported auto const bytes = [](cudf::type_id const type_id, column_device_view const* leaf_column, uint32_t const val_idx) -> void const* { @@ -1810,11 +1818,8 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) default: CUDF_UNREACHABLE("invalid type id for byte array writing!"); } }(type_id, s->col.leaf_column, val_idx); - uint32_t v = len - 4; // string length - dst[pos + 0] = v; - dst[pos + 1] = v >> 8; - dst[pos + 2] = v >> 16; - dst[pos + 3] = v >> 24; + uint32_t v = len - 4; // string length + encode_value(dst + pos, v, 1); if (v != 0) memcpy(dst + pos + 4, bytes, v); } break; case FIXED_LEN_BYTE_ARRAY: { @@ -1822,10 +1827,16 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // When using FIXED_LEN_BYTE_ARRAY for decimals, the rep is encoded in big-endian auto const v = s->col.leaf_column->element(val_idx).value(); auto const v_char_ptr = reinterpret_cast(&v); - thrust::copy(thrust::seq, - thrust::make_reverse_iterator(v_char_ptr + sizeof(v)), - thrust::make_reverse_iterator(v_char_ptr), - dst + pos); + if (is_split_stream) { + for (int i = dtype_len_out - 1; i >= 0; i--, pos += stride) { + dst[pos] = v_char_ptr[i]; + } + } else { + thrust::copy(thrust::seq, + thrust::make_reverse_iterator(v_char_ptr + sizeof(v)), + thrust::make_reverse_iterator(v_char_ptr), + dst + pos); + } } } break; } @@ -1833,6 +1844,9 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) __syncthreads(); } + // for BYTE_STREAM_SPLIT, s->cur now points to the end of the first stream. + // need it to point to the end of the Nth stream. + if (is_split_stream and t == 0) { s->cur += (dtype_len_out - 1) * s->page.num_valid; } finish_page_encode( s, s->cur, pages, comp_in, comp_out, comp_results, write_v2_headers); } @@ -1883,13 +1897,13 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) ? s->ck.dict_rle_bits : -1; if (t == 0) { - uint8_t* dst = s->cur; - s->rle_run = 0; - s->rle_pos = 0; - s->rle_numvals = 0; - s->rle_out = dst; - s->page.encoding = - determine_encoding(s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers); + uint8_t* dst = s->cur; + s->rle_run = 0; + s->rle_pos = 0; + s->rle_numvals = 0; + s->rle_out = dst; + s->page.encoding = determine_encoding( + s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers, false); if (dict_bits >= 0 && physical_type != BOOLEAN) { dst[0] = dict_bits; s->rle_out = dst + 1; @@ -3417,7 +3431,14 @@ void EncodePages(device_span pages, gpuEncodePageLevels<<>>( pages, write_v2_headers, encode_kernel_mask::PLAIN); gpuEncodePages<<>>( - pages, comp_in, comp_out, comp_results, write_v2_headers); + pages, comp_in, comp_out, comp_results, write_v2_headers, false); + } + if (BitAnd(kernel_mask, encode_kernel_mask::BYTE_STREAM_SPLIT) != 0) { + auto const strm = streams[s_idx++]; + gpuEncodePageLevels<<>>( + pages, write_v2_headers, encode_kernel_mask::BYTE_STREAM_SPLIT); + gpuEncodePages<<>>( + pages, comp_in, comp_out, comp_results, write_v2_headers, true); } if (BitAnd(kernel_mask, encode_kernel_mask::DELTA_BINARY) != 0) { auto const strm = streams[s_idx++]; diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 07e03460ecb..6c6afde29e4 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -166,13 +166,7 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, ColumnChunkDesc const& chunk) { if (page.flags & PAGEINFO_FLAGS_DICTIONARY) { return decode_kernel_mask::NONE; } - if (!is_string_col(chunk) && !is_nested(chunk) && !is_byte_array(chunk) && !is_boolean(chunk)) { - if (page.encoding == Encoding::PLAIN) { - return decode_kernel_mask::FIXED_WIDTH_NO_DICT; - } else if (page.encoding == Encoding::PLAIN_DICTIONARY) { - return decode_kernel_mask::FIXED_WIDTH_DICT; - } - } + if (page.encoding == Encoding::DELTA_BINARY_PACKED) { return decode_kernel_mask::DELTA_BINARY; } else if (page.encoding == Encoding::DELTA_BYTE_ARRAY) { @@ -180,10 +174,26 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, } else if (page.encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { return decode_kernel_mask::DELTA_LENGTH_BA; } else if (is_string_col(chunk)) { + // check for string before byte_stream_split so FLBA will go to the right kernel return decode_kernel_mask::STRING; } - // non-string, non-delta + if (!is_nested(chunk) && !is_byte_array(chunk) && !is_boolean(chunk)) { + if (page.encoding == Encoding::PLAIN) { + return decode_kernel_mask::FIXED_WIDTH_NO_DICT; + } else if (page.encoding == Encoding::PLAIN_DICTIONARY || + page.encoding == Encoding::RLE_DICTIONARY) { + return decode_kernel_mask::FIXED_WIDTH_DICT; + } else if (page.encoding == Encoding::BYTE_STREAM_SPLIT) { + return decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT; + } + } + + if (page.encoding == Encoding::BYTE_STREAM_SPLIT) { + return decode_kernel_mask::BYTE_STREAM_SPLIT; + } + + // non-string, non-delta, non-split_stream return decode_kernel_mask::GENERAL; } diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 6f96d4dd1cf..5ba813f518f 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -1039,7 +1039,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) // - the row values we get from nz_idx will be // 0, 1, 2, 3, 4 .... // - by shifting these values by first_row, the sequence becomes - // -1, -2, 0, 1, 2 ... + // -2, -1, 0, 1, 2 ... // - so we will end up ignoring the first two input rows, and input rows 2..n will // get written to the output starting at position 0. // @@ -1062,7 +1062,19 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) // choose a character parallel string copy when the average string is longer than a warp auto const use_char_ll = warp_total / warp_size >= warp_size; - if (use_char_ll) { + if (s->page.encoding == Encoding::BYTE_STREAM_SPLIT) { + if (src_pos + i < target_pos && dst_pos >= 0) { + auto const stride = s->page.str_bytes / s->dtype_len_in; + auto offptr = + reinterpret_cast(nesting_info_base[leaf_level_index].data_out) + dst_pos; + *offptr = len; + auto str_ptr = nesting_info_base[leaf_level_index].string_out + offset; + for (int ii = 0; ii < s->dtype_len_in; ii++) { + str_ptr[ii] = s->data_start[src_pos + i + ii * stride]; + } + } + __syncwarp(); + } else if (use_char_ll) { __shared__ __align__(8) uint8_t const* pointers[warp_size]; __shared__ __align__(4) size_type offsets[warp_size]; __shared__ __align__(4) int dsts[warp_size]; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index b165c60b2cf..c06fb63acda 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -54,7 +54,13 @@ constexpr int LEVEL_DECODE_BUF_SIZE = 2048; template constexpr int rolling_index(int index) { - return index % rolling_size; + // Cannot divide by 0. But `rolling_size` will be 0 for unused arrays, so this case will never + // actual be executed. + if constexpr (rolling_size == 0) { + return index; + } else { + return index % rolling_size; + } } // PARQUET-2261 allows for not writing the level histograms in certain cases. @@ -81,7 +87,8 @@ constexpr bool is_supported_encoding(Encoding enc) case Encoding::RLE_DICTIONARY: case Encoding::DELTA_BINARY_PACKED: case Encoding::DELTA_LENGTH_BYTE_ARRAY: - case Encoding::DELTA_BYTE_ARRAY: return true; + case Encoding::DELTA_BYTE_ARRAY: + case Encoding::BYTE_STREAM_SPLIT: return true; default: return false; } } @@ -199,14 +206,16 @@ enum level_type { * Used to control which decode kernels to run. */ enum class decode_kernel_mask { - NONE = 0, - GENERAL = (1 << 0), // Run catch-all decode kernel - STRING = (1 << 1), // Run decode kernel for string data - DELTA_BINARY = (1 << 2), // Run decode kernel for DELTA_BINARY_PACKED data - DELTA_BYTE_ARRAY = (1 << 3), // Run decode kernel for DELTA_BYTE_ARRAY encoded data - DELTA_LENGTH_BA = (1 << 4), // Run decode kernel for DELTA_LENGTH_BYTE_ARRAY encoded data - FIXED_WIDTH_NO_DICT = (1 << 5), // Run decode kernel for fixed width non-dictionary pages - FIXED_WIDTH_DICT = (1 << 6) // Run decode kernel for fixed width dictionary pages + NONE = 0, + GENERAL = (1 << 0), // Run catch-all decode kernel + STRING = (1 << 1), // Run decode kernel for string data + DELTA_BINARY = (1 << 2), // Run decode kernel for DELTA_BINARY_PACKED data + DELTA_BYTE_ARRAY = (1 << 3), // Run decode kernel for DELTA_BYTE_ARRAY encoded data + DELTA_LENGTH_BA = (1 << 4), // Run decode kernel for DELTA_LENGTH_BYTE_ARRAY encoded data + FIXED_WIDTH_NO_DICT = (1 << 5), // Run decode kernel for fixed width non-dictionary pages + FIXED_WIDTH_DICT = (1 << 6), // Run decode kernel for fixed width dictionary pages + BYTE_STREAM_SPLIT = (1 << 7), // Run decode kernel for BYTE_STREAM_SPLIT encoded data + BYTE_STREAM_SPLIT_FLAT = (1 << 8), // Same as above but with a flat schema }; // mask representing all the ways in which a string can be encoded @@ -517,11 +526,12 @@ constexpr uint32_t encoding_to_mask(Encoding encoding) * Used to control which encode kernels to run. */ enum class encode_kernel_mask { - PLAIN = (1 << 0), // Run plain encoding kernel - DICTIONARY = (1 << 1), // Run dictionary encoding kernel - DELTA_BINARY = (1 << 2), // Run DELTA_BINARY_PACKED encoding kernel - DELTA_LENGTH_BA = (1 << 3), // Run DELTA_LENGTH_BYTE_ARRAY encoding kernel - DELTA_BYTE_ARRAY = (1 << 4), // Run DELTA_BYtE_ARRAY encoding kernel + PLAIN = (1 << 0), // Run plain encoding kernel + DICTIONARY = (1 << 1), // Run dictionary encoding kernel + DELTA_BINARY = (1 << 2), // Run DELTA_BINARY_PACKED encoding kernel + DELTA_LENGTH_BA = (1 << 3), // Run DELTA_LENGTH_BYTE_ARRAY encoding kernel + DELTA_BYTE_ARRAY = (1 << 4), // Run DELTA_BYtE_ARRAY encoding kernel + BYTE_STREAM_SPLIT = (1 << 5), // Run plain encoding kernel, but split streams }; /** @@ -759,6 +769,28 @@ void DecodePageData(cudf::detail::hostdevice_span pages, kernel_error::pointer error_code, rmm::cuda_stream_view stream); +/** + * @brief Launches kernel for reading the BYTE_STREAM_SPLIT column data stored in the pages + * + * The page data will be written to the output pointed to in the page's + * associated column chunk. + * + * @param[in,out] pages All pages to be decoded + * @param[in] chunks All chunks to be decoded + * @param[in] num_rows Total number of rows to read + * @param[in] min_row Minimum number of rows to read + * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[out] error_code Error code for kernel failures + * @param[in] stream CUDA stream to use + */ +void DecodeSplitPageData(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream); + /** * @brief Launches kernel for reading the string column data stored in the pages * @@ -891,6 +923,28 @@ void DecodePageDataFixedDict(cudf::detail::hostdevice_span pages, kernel_error::pointer error_code, rmm::cuda_stream_view stream); +/** + * @brief Launches kernel for reading dictionary fixed width column data stored in the pages + * + * The page data will be written to the output pointed to in the page's + * associated column chunk. + * + * @param[in,out] pages All pages to be decoded + * @param[in] chunks All chunks to be decoded + * @param[in] num_rows Total number of rows to read + * @param[in] min_row Minimum number of rows to read + * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[out] error_code Error code for kernel failures + * @param[in] stream CUDA stream to use + */ +void DecodeSplitPageDataFlat(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + std::size_t num_rows, + size_t min_row, + int level_type_size, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream); + /** * @brief Launches kernel for initializing encoder row group fragments * diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index a524e7c6dcc..b7172f5ba67 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -253,6 +253,28 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row streams[s_idx++]); } + // launch byte stream split decoder + if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT) != 0) { + DecodeSplitPageDataFlat(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + error_code.data(), + streams[s_idx++]); + } + + // launch byte stream split decoder + if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT) != 0) { + DecodeSplitPageData(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + error_code.data(), + streams[s_idx++]); + } + if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_NO_DICT) != 0) { DecodePageDataFixed(subpass.pages, pass.chunks, diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 6a8c31fb96b..5509a33f9f0 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -613,8 +613,7 @@ std::vector construct_schema_tree( column_in_metadata const& col_meta) { s.requested_encoding = column_encoding::USE_DEFAULT; - if (schema[parent_idx].name != "list" and - col_meta.get_encoding() != column_encoding::USE_DEFAULT) { + if (s.name != "list" and col_meta.get_encoding() != column_encoding::USE_DEFAULT) { // do some validation switch (col_meta.get_encoding()) { case column_encoding::DELTA_BINARY_PACKED: @@ -659,6 +658,21 @@ std::vector construct_schema_tree( } break; + case column_encoding::BYTE_STREAM_SPLIT: + if (s.type == Type::BYTE_ARRAY) { + CUDF_LOG_WARN( + "BYTE_STREAM_SPLIT encoding is only supported for fixed width columns; the " + "requested encoding will be ignored"); + return; + } + if (s.type == Type::INT96) { + CUDF_LOG_WARN( + "BYTE_STREAM_SPLIT encoding is not supported for INT96 columns; the " + "requested encoding will be ignored"); + return; + } + break; + // supported parquet encodings case column_encoding::PLAIN: case column_encoding::DICTIONARY: break; diff --git a/cpp/tests/io/parquet_common.cpp b/cpp/tests/io/parquet_common.cpp index b64cd230bc6..c1211869bcc 100644 --- a/cpp/tests/io/parquet_common.cpp +++ b/cpp/tests/io/parquet_common.cpp @@ -203,6 +203,7 @@ template std::vector random_values(size_t size); template std::vector random_values(size_t size); template std::vector random_values(size_t size); template std::vector random_values(size_t size); +template std::vector<__int128_t> random_values<__int128_t>(size_t size); template std::vector random_values(size_t size); template std::vector random_values(size_t size); template std::vector random_values(size_t size); diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index 3a3040f0957..a16b3d63177 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -35,7 +35,7 @@ using cudf::test::iterators::no_nulls; template -void test_durations(mask_op_t mask_op) +void test_durations(mask_op_t mask_op, bool use_byte_stream_split) { std::default_random_engine generator; std::uniform_int_distribution distribution_d(0, 30); @@ -67,6 +67,13 @@ void test_durations(mask_op_t mask_op) auto expected = table_view{{durations_d, durations_s, durations_ms, durations_us, durations_ns}}; + if (use_byte_stream_split) { + cudf::io::table_input_metadata expected_metadata(expected); + for (auto& col_meta : expected_metadata.column_metadata) { + col_meta.set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT); + } + } + auto filepath = temp_env->get_temp_filepath("Durations.parquet"); cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected); @@ -91,10 +98,10 @@ void test_durations(mask_op_t mask_op) TEST_F(ParquetWriterTest, Durations) { - test_durations([](auto i) { return true; }); - test_durations([](auto i) { return (i % 2) != 0; }); - test_durations([](auto i) { return (i % 3) != 0; }); - test_durations([](auto i) { return false; }); + test_durations([](auto i) { return true; }, false); + test_durations([](auto i) { return (i % 2) != 0; }, false); + test_durations([](auto i) { return (i % 3) != 0; }, false); + test_durations([](auto i) { return false; }, false); } TEST_F(ParquetWriterTest, MultiIndex) @@ -1593,6 +1600,7 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) using cudf::io::column_encoding; using cudf::io::parquet::detail::Encoding; constexpr int num_rows = 500; + std::mt19937 engine{31337}; auto const ones = thrust::make_constant_iterator(1); auto const col = @@ -1602,6 +1610,9 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) auto const string_col = cudf::test::strings_column_wrapper(strings, strings + num_rows, no_nulls()); + // throw in a list to make sure encoding selection works there too + auto list_col = make_parquet_list_col(engine, num_rows, 5, true); + auto const table = table_view({col, col, col, @@ -1613,7 +1624,8 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) string_col, string_col, string_col, - string_col}); + string_col, + *list_col}); cudf::io::table_input_metadata table_metadata(table); @@ -1635,10 +1647,17 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) set_meta(10, "string_db", column_encoding::DELTA_BINARY_PACKED); table_metadata.column_metadata[11].set_name("string_none"); - for (auto& col_meta : table_metadata.column_metadata) { - col_meta.set_nullability(false); + for (int i = 0; i < 12; i++) { + table_metadata.column_metadata[i].set_nullability(false); } + // handle list column separately + table_metadata.column_metadata[12].set_name("int32_list").set_nullability(true); + table_metadata.column_metadata[12] + .child(1) + .set_encoding(column_encoding::DELTA_BINARY_PACKED) + .set_nullability(true); + auto const filepath = temp_env->get_temp_filepath("UserRequestedEncodings.parquet"); cudf::io::parquet_writer_options opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, table) @@ -1683,6 +1702,12 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) expect_enc(10, Encoding::PLAIN_DICTIONARY); // no request, should use dictionary expect_enc(11, Encoding::PLAIN_DICTIONARY); + // int list requested delta_binary_packed. it's has level data, so have to search for a match. + auto const encodings = fmd.row_groups[0].columns[12].meta_data.encodings; + auto const has_delta = std::any_of(encodings.begin(), encodings.end(), [](Encoding enc) { + return enc == Encoding::DELTA_BINARY_PACKED; + }); + EXPECT_TRUE(has_delta); } TEST_F(ParquetWriterTest, Decimal128DeltaByteArray) @@ -1743,6 +1768,95 @@ TEST_F(ParquetWriterTest, DeltaBinaryStartsWithNulls) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } +TEST_F(ParquetWriterTest, ByteStreamSplit) +{ + constexpr auto num_rows = 100; + std::mt19937 engine{31337}; + auto col0_data = random_values(num_rows); + auto col1_data = random_values(num_rows); + auto col2_data = random_values(num_rows); + auto col3_data = random_values(num_rows); + + column_wrapper col0{col0_data.begin(), col0_data.end(), no_nulls()}; + column_wrapper col1{col1_data.begin(), col1_data.end(), no_nulls()}; + column_wrapper col2{col2_data.begin(), col2_data.end(), no_nulls()}; + column_wrapper col3{col3_data.begin(), col3_data.end(), no_nulls()}; + + // throw in a list to make sure both decoders are working + auto col4 = make_parquet_list_col(engine, num_rows, 5, true); + + auto expected = table_view{{col0, col1, col2, col3, *col4}}; + + cudf::io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("int32s"); + expected_metadata.column_metadata[1].set_name("int64s"); + expected_metadata.column_metadata[2].set_name("floats"); + expected_metadata.column_metadata[3].set_name("doubles"); + expected_metadata.column_metadata[4].set_name("int32list"); + auto const encoding = cudf::io::column_encoding::BYTE_STREAM_SPLIT; + for (int i = 0; i <= 3; i++) { + expected_metadata.column_metadata[i].set_encoding(encoding); + } + + expected_metadata.column_metadata[4].child(1).set_encoding(encoding); + + auto const filepath = temp_env->get_temp_filepath("ByteStreamSplit.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .metadata(expected_metadata); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); +} + +TEST_F(ParquetWriterTest, DecimalByteStreamSplit) +{ + constexpr cudf::size_type num_rows = 100; + auto seq_col0 = random_values(num_rows); + auto seq_col1 = random_values(num_rows); + auto seq_col2 = random_values<__int128_t>(num_rows); + + auto col0 = cudf::test::fixed_point_column_wrapper{ + seq_col0.begin(), seq_col0.end(), no_nulls(), numeric::scale_type{-5}}; + auto col1 = cudf::test::fixed_point_column_wrapper{ + seq_col1.begin(), seq_col1.end(), no_nulls(), numeric::scale_type{-9}}; + auto col2 = cudf::test::fixed_point_column_wrapper<__int128_t>{ + seq_col1.begin(), seq_col1.end(), no_nulls(), numeric::scale_type{-11}}; + + auto expected = table_view({col0, col1, col2}); + cudf::io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("int32s").set_decimal_precision(7); + expected_metadata.column_metadata[1].set_name("int64s").set_decimal_precision(11); + expected_metadata.column_metadata[2].set_name("int128s").set_decimal_precision(22); + for (auto& col_meta : expected_metadata.column_metadata) { + col_meta.set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT); + } + + auto const filepath = temp_env->get_temp_filepath("DecimalByteStreamSplit.parquet"); + cudf::io::parquet_writer_options args = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .metadata(expected_metadata); + cudf::io::write_parquet(args); + + cudf::io::parquet_reader_options read_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(read_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); +} + +TEST_F(ParquetWriterTest, DurationByteStreamSplit) +{ + test_durations([](auto i) { return true; }, true); + test_durations([](auto i) { return (i % 2) != 0; }, true); + test_durations([](auto i) { return (i % 3) != 0; }, true); + test_durations([](auto i) { return false; }, true); +} + ///////////////////////////////////////////////////////////// // custom mem mapped data sink that supports device writes template @@ -1926,6 +2040,35 @@ TYPED_TEST(ParquetWriterTimestampTypeTest, TimestampOverflow) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } +TYPED_TEST(ParquetWriterTimestampTypeTest, TimestampsByteStreamSplit) +{ + srand(42); + auto sequence = cudf::detail::make_counting_transform_iterator( + 0, [](auto i) { return ((std::rand() / 10000) * 1000); }); + + constexpr auto num_rows = 100; + column_wrapper col( + sequence, sequence + num_rows, no_nulls()); + + auto expected = table_view{{col}}; + + cudf::io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT); + + auto filepath = temp_env->get_temp_filepath("TimestampsByteStreamSplit.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .metadata(expected_metadata); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}) + .timestamp_type(this->type()); + auto result = cudf::io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); +} + ////////////////////////////// // writer stress tests From 2eb71b28d9607e3dfa5b891cbc40ce53a5d27bc6 Mon Sep 17 00:00:00 2001 From: David Wendt <45795991+davidwendt@users.noreply.github.com> Date: Wed, 24 Apr 2024 16:05:34 -0400 Subject: [PATCH 2/7] Large strings gtest fixture and utilities (#15513) Creates the base class and utilities for testing APIs to produce large strings. The main purpose of the fixture is to enable the large strings environment variable(s) and to setup large test data that can be reused by multiple tests. Authors: - David Wendt (https://github.com/davidwendt) Approvers: - MithunR (https://github.com/mythrocks) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/15513 --- cpp/include/cudf_test/testing_main.hpp | 37 ++++-- cpp/tests/CMakeLists.txt | 9 ++ cpp/tests/copying/concatenate_tests.cpp | 43 ------ cpp/tests/large_strings/concatenate_tests.cpp | 65 ++++++++++ .../large_strings/large_strings_fixture.cpp | 122 ++++++++++++++++++ .../large_strings/large_strings_fixture.hpp | 49 +++++++ cpp/tests/large_strings/merge_tests.cpp | 79 ++++++++++++ cpp/tests/merge/merge_string_test.cpp | 57 -------- 8 files changed, 351 insertions(+), 110 deletions(-) create mode 100644 cpp/tests/large_strings/concatenate_tests.cpp create mode 100644 cpp/tests/large_strings/large_strings_fixture.cpp create mode 100644 cpp/tests/large_strings/large_strings_fixture.hpp create mode 100644 cpp/tests/large_strings/merge_tests.cpp diff --git a/cpp/include/cudf_test/testing_main.hpp b/cpp/include/cudf_test/testing_main.hpp index ecac761f7cb..66b831b917f 100644 --- a/cpp/include/cudf_test/testing_main.hpp +++ b/cpp/include/cudf_test/testing_main.hpp @@ -145,6 +145,25 @@ inline auto parse_cudf_test_opts(int argc, char** argv) } } +/** + * @brief Sets up stream mode memory resource adaptor + * + * The resource adaptor is only set as the current device resource if the + * stream mode is enabled. + * + * The caller must keep the return object alive for the life of the test runs. + * + * @param cmd_opts Command line options returned by parse_cudf_test_opts + * @return Memory resource adaptor + */ +inline auto make_memory_resource_adaptor(cxxopts::ParseResult const& cmd_opts) +{ + auto const rmm_mode = cmd_opts["rmm_mode"].as(); + auto resource = cudf::test::create_memory_resource(rmm_mode); + rmm::mr::set_current_device_resource(resource.get()); + return resource; +} + /** * @brief Sets up stream mode memory resource adaptor * @@ -181,14 +200,12 @@ inline auto make_stream_mode_adaptor(cxxopts::ParseResult const& cmd_opts) * function parses the command line to customize test behavior, like the * allocation mode used for creating the default memory resource. */ -#define CUDF_TEST_PROGRAM_MAIN() \ - int main(int argc, char** argv) \ - { \ - ::testing::InitGoogleTest(&argc, argv); \ - auto const cmd_opts = parse_cudf_test_opts(argc, argv); \ - auto const rmm_mode = cmd_opts["rmm_mode"].as(); \ - auto resource = cudf::test::create_memory_resource(rmm_mode); \ - rmm::mr::set_current_device_resource(resource.get()); \ - auto adaptor = make_stream_mode_adaptor(cmd_opts); \ - return RUN_ALL_TESTS(); \ +#define CUDF_TEST_PROGRAM_MAIN() \ + int main(int argc, char** argv) \ + { \ + ::testing::InitGoogleTest(&argc, argv); \ + auto const cmd_opts = parse_cudf_test_opts(argc, argv); \ + [[maybe_unused]] auto mr = make_memory_resource_adaptor(cmd_opts); \ + [[maybe_unused]] auto adaptor = make_stream_mode_adaptor(cmd_opts); \ + return RUN_ALL_TESTS(); \ } diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index f59e675e1d5..6c56d82007a 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -568,6 +568,15 @@ ConfigureTest( strings/urls_tests.cpp ) +# ################################################################################################## +# * large strings test ---------------------------------------------------------------------------- +ConfigureTest( + LARGE_STRINGS_TEST large_strings/large_strings_fixture.cpp large_strings/merge_tests.cpp + large_strings/concatenate_tests.cpp + GPUS 1 + PERCENT 100 +) + # ################################################################################################## # * json path test -------------------------------------------------------------------------------- ConfigureTest(JSON_PATH_TEST json/json_tests.cpp) diff --git a/cpp/tests/copying/concatenate_tests.cpp b/cpp/tests/copying/concatenate_tests.cpp index 3e2e332936e..c2d1e1d9f4f 100644 --- a/cpp/tests/copying/concatenate_tests.cpp +++ b/cpp/tests/copying/concatenate_tests.cpp @@ -197,49 +197,6 @@ TEST_F(StringColumnTest, ConcatenateTooLarge) EXPECT_THROW(cudf::concatenate(input_cols), std::overflow_error); } -TEST_F(StringColumnTest, ConcatenateLargeStrings) -{ - CUDF_TEST_ENABLE_LARGE_STRINGS(); - auto itr = thrust::constant_iterator( - "abcdefghijklmnopqrstuvwxyABCDEFGHIJKLMNOPQRSTUVWXY"); // 50 bytes - auto input = cudf::test::strings_column_wrapper(itr, itr + 5'000'000); // 250MB - auto view = cudf::column_view(input); - std::vector input_cols; - std::vector splits; - int const multiplier = 10; - for (int i = 0; i < multiplier; ++i) { // 2500MB > 2GB - input_cols.push_back(view); - splits.push_back(view.size() * (i + 1)); - } - splits.pop_back(); // remove last entry - auto result = cudf::concatenate(input_cols); - auto sv = cudf::strings_column_view(result->view()); - EXPECT_EQ(sv.size(), view.size() * multiplier); - EXPECT_EQ(sv.offsets().type(), cudf::data_type{cudf::type_id::INT64}); - - // verify results in sections - auto sliced = cudf::split(result->view(), splits); - for (auto c : sliced) { - CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(c, input); - } - - // also test with large strings column as input - { - input_cols.clear(); - input_cols.push_back(input); // regular column - input_cols.push_back(result->view()); // large column - result = cudf::concatenate(input_cols); - sv = cudf::strings_column_view(result->view()); - EXPECT_EQ(sv.size(), view.size() * (multiplier + 1)); - EXPECT_EQ(sv.offsets().type(), cudf::data_type{cudf::type_id::INT64}); - splits.push_back(view.size() * multiplier); - sliced = cudf::split(result->view(), splits); - for (auto c : sliced) { - CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(c, input); - } - } -} - struct TableTest : public cudf::test::BaseFixture {}; TEST_F(TableTest, ConcatenateTables) diff --git a/cpp/tests/large_strings/concatenate_tests.cpp b/cpp/tests/large_strings/concatenate_tests.cpp new file mode 100644 index 00000000000..aa445bf761b --- /dev/null +++ b/cpp/tests/large_strings/concatenate_tests.cpp @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "large_strings_fixture.hpp" + +#include + +#include +#include +#include + +#include + +struct ConcatenateTest : public cudf::test::StringsLargeTest {}; + +TEST_F(ConcatenateTest, ConcatenateVertical) +{ + auto input = this->long_column(); + auto view = cudf::column_view(input); + std::vector input_cols; + std::vector splits; + int const multiplier = 10; + for (int i = 0; i < multiplier; ++i) { // 2500MB > 2GB + input_cols.push_back(view); + splits.push_back(view.size() * (i + 1)); + } + splits.pop_back(); // remove last entry + auto result = cudf::concatenate(input_cols); + auto sv = cudf::strings_column_view(result->view()); + EXPECT_EQ(sv.size(), view.size() * multiplier); + EXPECT_EQ(sv.offsets().type(), cudf::data_type{cudf::type_id::INT64}); + + // verify results in sections + auto sliced = cudf::split(result->view(), splits); + for (auto c : sliced) { + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(c, input); + } + + // also test with large strings column as input + input_cols.clear(); + input_cols.push_back(input); // regular column + input_cols.push_back(result->view()); // large column + result = cudf::concatenate(input_cols); + sv = cudf::strings_column_view(result->view()); + EXPECT_EQ(sv.size(), view.size() * (multiplier + 1)); + EXPECT_EQ(sv.offsets().type(), cudf::data_type{cudf::type_id::INT64}); + splits.push_back(view.size() * multiplier); + sliced = cudf::split(result->view(), splits); + for (auto c : sliced) { + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(c, input); + } +} diff --git a/cpp/tests/large_strings/large_strings_fixture.cpp b/cpp/tests/large_strings/large_strings_fixture.cpp new file mode 100644 index 00000000000..59e0cd43d05 --- /dev/null +++ b/cpp/tests/large_strings/large_strings_fixture.cpp @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "large_strings_fixture.hpp" + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +namespace cudf::test { +class LargeStringsData { + public: + using DataPointer = std::unique_ptr; + + virtual ~LargeStringsData() {} + + void add_table(std::string_view name, std::unique_ptr&& data) + { + _data[std::string(name)] = std::move(data); + } + + cudf::table_view get_table(std::string_view name) const + { + std::string key{name}; + return _data.find(key) != _data.end() ? _data.at(key)->view() : cudf::table_view{}; + } + + void add_column(std::string_view name, std::unique_ptr&& data) + { + std::vector> cols; + cols.emplace_back(std::move(data)); + _data[std::string(name)] = std::make_unique(std::move(cols)); + } + + cudf::column_view get_column(std::string_view name) const + { + std::string key{name}; + return _data.find(key) != _data.end() ? _data.at(key)->view().column(0) : cudf::column_view{}; + } + + bool has_key(std::string_view name) const { return _data.find(std::string(name)) != _data.end(); } + + protected: + std::map _data; +}; + +cudf::column_view StringsLargeTest::wide_column() +{ + std::string name{"wide1"}; + if (!g_ls_data->has_key(name)) { + auto input = + cudf::test::strings_column_wrapper({"the quick brown fox jumps over the lazy dog", + "the fat cat lays next to the other accénted cat", + "a slow moving turtlé cannot catch the bird", + "which can be composéd together to form a more complete", + "The result does not include the value in the sum in"}); + auto counts = cudf::test::fixed_width_column_wrapper({8, 8, 8, 8, 8}); + auto result = cudf::strings::repeat_strings(cudf::strings_column_view(input), counts); + g_ls_data->add_column(name, std::move(result)); + } + return g_ls_data->get_column(name); +} + +cudf::column_view StringsLargeTest::long_column() +{ + std::string name("long1"); + if (!g_ls_data->has_key(name)) { + auto itr = thrust::constant_iterator( + "abcdefghijklmnopqrstuvwxyABCDEFGHIJKLMNOPQRSTUVWXY"); // 50 bytes + auto input = cudf::test::strings_column_wrapper(itr, itr + 5'000'000); // 250MB + g_ls_data->add_column(name, input.release()); + } + return g_ls_data->get_column(name); +} + +std::unique_ptr StringsLargeTest::get_ls_data() +{ + CUDF_EXPECTS(g_ls_data == nullptr, "invalid call to get_ls_data"); + auto lsd_data = std::make_unique(); + g_ls_data = lsd_data.get(); + return lsd_data; +} + +LargeStringsData* StringsLargeTest::g_ls_data = nullptr; +} // namespace cudf::test + +int main(int argc, char** argv) +{ + ::testing::InitGoogleTest(&argc, argv); + auto const cmd_opts = parse_cudf_test_opts(argc, argv); + // hardcoding the CUDA memory resource to keep from exceeding the pool + auto mr = cudf::test::make_cuda(); + rmm::mr::set_current_device_resource(mr.get()); + auto adaptor = make_stream_mode_adaptor(cmd_opts); + + // create object to automatically be destroyed at the end of main() + auto lsd = cudf::test::StringsLargeTest::get_ls_data(); + + return RUN_ALL_TESTS(); +} diff --git a/cpp/tests/large_strings/large_strings_fixture.hpp b/cpp/tests/large_strings/large_strings_fixture.hpp new file mode 100644 index 00000000000..8827b65f1ce --- /dev/null +++ b/cpp/tests/large_strings/large_strings_fixture.hpp @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +namespace cudf::test { +class LargeStringsData; + +/** + * @brief Fixture for creating large strings tests + * + * Stores tests strings columns for reuse by specific tests. + * Creating the test input only once helps speed up the overall tests. + * + * Also automatically enables appropriate large strings environment variables. + */ +struct StringsLargeTest : public cudf::test::BaseFixture { + /** + * @brief Returns a column of long strings + */ + cudf::column_view wide_column(); + + /** + * @brief Returns a long column of strings + */ + cudf::column_view long_column(); + + large_strings_enabler g_ls_enabler; + static LargeStringsData* g_ls_data; + + static std::unique_ptr get_ls_data(); +}; +} // namespace cudf::test diff --git a/cpp/tests/large_strings/merge_tests.cpp b/cpp/tests/large_strings/merge_tests.cpp new file mode 100644 index 00000000000..afe6e424371 --- /dev/null +++ b/cpp/tests/large_strings/merge_tests.cpp @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "large_strings_fixture.hpp" + +#include + +#include +#include +#include + +#include + +struct MergeTest : public cudf::test::StringsLargeTest {}; + +TEST_F(MergeTest, MergeLargeStrings) +{ + auto const input = this->long_column(); + auto input_views = std::vector(); + auto const view = cudf::table_view({input}); + std::vector splits; + int const multiplier = 10; + for (int i = 0; i < multiplier; ++i) { // 2500MB > 2GB + input_views.push_back(view); + splits.push_back(view.num_rows() * (i + 1)); + } + splits.pop_back(); // remove last entry + auto const column_order = std::vector{cudf::order::ASCENDING}; + auto const null_precedence = std::vector{cudf::null_order::AFTER}; + + auto result = cudf::merge(input_views, {0}, column_order, null_precedence); + auto sv = cudf::strings_column_view(result->view().column(0)); + EXPECT_EQ(sv.size(), view.num_rows() * multiplier); + EXPECT_EQ(sv.offsets().type(), cudf::data_type{cudf::type_id::INT64}); + + auto sliced = cudf::split(sv.parent(), splits); + for (auto c : sliced) { + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(c, input); + } + + // also test with large strings column as input + input_views.clear(); + input_views.push_back(view); // regular column + input_views.push_back(result->view()); // large column + result = cudf::merge(input_views, {0}, column_order, null_precedence); + sv = cudf::strings_column_view(result->view().column(0)); + EXPECT_EQ(sv.size(), view.num_rows() * (multiplier + 1)); + EXPECT_EQ(sv.offsets().type(), cudf::data_type{cudf::type_id::INT64}); + splits.push_back(view.num_rows() * multiplier); + sliced = cudf::split(sv.parent(), splits); + for (auto c : sliced) { + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(c, input); + } + + // also check merge still returns 32-bit offsets for regular columns + input_views.clear(); + input_views.push_back(view); + input_views.push_back(view); + result = cudf::merge(input_views, {0}, column_order, null_precedence); + sv = cudf::strings_column_view(result->view().column(0)); + EXPECT_EQ(sv.size(), view.num_rows() * 2); + EXPECT_EQ(sv.offsets().type(), cudf::data_type{cudf::type_id::INT32}); + sliced = cudf::split(sv.parent(), {view.num_rows()}); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(sliced[0], input); + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(sliced[1], input); +} diff --git a/cpp/tests/merge/merge_string_test.cpp b/cpp/tests/merge/merge_string_test.cpp index d7368d31944..28179a7341c 100644 --- a/cpp/tests/merge/merge_string_test.cpp +++ b/cpp/tests/merge/merge_string_test.cpp @@ -411,60 +411,3 @@ TYPED_TEST(MergeStringTest, Merge2StringKeyNullColumns) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_column_view2, output_column_view2); CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_column_view3, output_column_view3); } - -class MergeLargeStringsTest : public cudf::test::BaseFixture {}; - -TEST_F(MergeLargeStringsTest, MergeLargeStrings) -{ - CUDF_TEST_ENABLE_LARGE_STRINGS(); - auto itr = thrust::constant_iterator( - "abcdefghijklmnopqrstuvwxyABCDEFGHIJKLMNOPQRSTUVWXY"); // 50 bytes - auto const input = cudf::test::strings_column_wrapper(itr, itr + 5'000'000); // 250MB - auto input_views = std::vector(); - auto const view = cudf::table_view({input}); - std::vector splits; - int const multiplier = 10; - for (int i = 0; i < multiplier; ++i) { // 2500MB > 2GB - input_views.push_back(view); - splits.push_back(view.num_rows() * (i + 1)); - } - splits.pop_back(); // remove last entry - auto const column_order = std::vector{cudf::order::ASCENDING}; - auto const null_precedence = std::vector{cudf::null_order::AFTER}; - - auto result = cudf::merge(input_views, {0}, column_order, null_precedence); - auto sv = cudf::strings_column_view(result->view().column(0)); - EXPECT_EQ(sv.size(), view.num_rows() * multiplier); - EXPECT_EQ(sv.offsets().type(), cudf::data_type{cudf::type_id::INT64}); - - auto sliced = cudf::split(sv.parent(), splits); - for (auto c : sliced) { - CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(c, input); - } - - // also test with large strings column as input - input_views.clear(); - input_views.push_back(view); // regular column - input_views.push_back(result->view()); // large column - result = cudf::merge(input_views, {0}, column_order, null_precedence); - sv = cudf::strings_column_view(result->view().column(0)); - EXPECT_EQ(sv.size(), view.num_rows() * (multiplier + 1)); - EXPECT_EQ(sv.offsets().type(), cudf::data_type{cudf::type_id::INT64}); - splits.push_back(view.num_rows() * multiplier); - sliced = cudf::split(sv.parent(), splits); - for (auto c : sliced) { - CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(c, input); - } - - // also check merge still returns 32-bit offsets for regular columns - input_views.clear(); - input_views.push_back(view); - input_views.push_back(view); - result = cudf::merge(input_views, {0}, column_order, null_precedence); - sv = cudf::strings_column_view(result->view().column(0)); - EXPECT_EQ(sv.size(), view.num_rows() * 2); - EXPECT_EQ(sv.offsets().type(), cudf::data_type{cudf::type_id::INT32}); - sliced = cudf::split(sv.parent(), {view.num_rows()}); - CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(sliced[0], input); - CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(sliced[1], input); -} From 8b4dc91fbee585e0f03cccc2b60ce7b68baa9a5f Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Wed, 24 Apr 2024 10:53:36 -1000 Subject: [PATCH 3/7] Replace RangeIndex._start/_stop/_step with _range (#15576) The `._start/_stop/_step` attributes are wholly redundant with the similar attributes on a `range` object, so replacing with those attributes where needed Authors: - Matthew Roeschke (https://github.com/mroeschke) - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/15576 --- python/cudf/cudf/core/index.py | 128 +++++++++++---------------- python/cudf/cudf/tests/test_index.py | 2 +- 2 files changed, 55 insertions(+), 75 deletions(-) diff --git a/python/cudf/cudf/core/index.py b/python/cudf/cudf/core/index.py index 6f08b1d83b3..e457e818129 100644 --- a/python/cudf/cudf/core/index.py +++ b/python/cudf/cudf/core/index.py @@ -36,7 +36,6 @@ is_integer, is_list_like, is_scalar, - is_signed_integer_dtype, ) from cudf.core._base_index import BaseIndex from cudf.core._compat import PANDAS_LT_300 @@ -149,6 +148,15 @@ def _index_from_data(data: MutableMapping, name: Any = no_default): return index_class_type._from_data(data, name) +def validate_range_arg(arg, arg_name: Literal["start", "stop", "step"]) -> int: + """Validate start/stop/step argument in RangeIndex.__init__""" + if not is_integer(arg): + raise TypeError( + f"{arg_name} must be an integer, not {type(arg).__name__}" + ) + return int(arg) + + class RangeIndex(BaseIndex, BinaryOperand): """ Immutable Index implementing a monotonic integer range. @@ -197,44 +205,29 @@ class RangeIndex(BaseIndex, BinaryOperand): def __init__( self, start, stop=None, step=1, dtype=None, copy=False, name=None ): - if step == 0: - raise ValueError("Step must not be zero.") if not cudf.api.types.is_hashable(name): raise ValueError("Name must be a hashable value.") - if dtype is not None and not is_signed_integer_dtype(dtype): + self._name = name + if dtype is not None and cudf.dtype(dtype).kind != "i": raise ValueError(f"{dtype=} must be a signed integer type") if isinstance(start, range): - therange = start - start = therange.start - stop = therange.stop - step = therange.step - if stop is None: - start, stop = 0, start - if not is_integer(start): - raise TypeError( - f"start must be an integer, not {type(start).__name__}" - ) - self._start = int(start) - if not is_integer(stop): - raise TypeError( - f"stop must be an integer, not {type(stop).__name__}" - ) - self._stop = int(stop) - if step is not None: - if not is_integer(step): - raise TypeError( - f"step must be an integer, not {type(step).__name__}" - ) - self._step = int(step) + self._range = start else: - self._step = 1 - self._index = None - self._name = name - self._range = range(self._start, self._stop, self._step) - # _end is the actual last element of RangeIndex, - # whereas _stop is an upper bound. - self._end = self._start + self._step * (len(self._range) - 1) + if stop is None: + start, stop = 0, start + start = validate_range_arg(start, "start") + stop = validate_range_arg(stop, "stop") + if step is not None: + step = validate_range_arg(step, "step") + else: + step = 1 + try: + self._range = range(start, stop, step) + except ValueError as err: + if step == 0: + raise ValueError("Step must not be zero.") from err + raise def _copy_type_metadata( self, other: RangeIndex, *, override_dtypes=None @@ -251,9 +244,9 @@ def searchsorted( na_position: Literal["first", "last"] = "last", ): assert (len(self) <= 1) or ( - ascending == (self._step > 0) + ascending == (self.step > 0) ), "Invalid ascending flag" - return search_range(value, self.as_range, side=side) + return search_range(value, self._range, side=side) @property # type: ignore @_cudf_nvtx_annotate @@ -271,7 +264,7 @@ def start(self): """ The value of the `start` parameter (0 if this was not supplied). """ - return self._start + return self._range.start @property # type: ignore @_cudf_nvtx_annotate @@ -279,7 +272,7 @@ def stop(self): """ The value of the stop parameter. """ - return self._stop + return self._range.stop @property # type: ignore @_cudf_nvtx_annotate @@ -287,7 +280,7 @@ def step(self): """ The value of the step parameter. """ - return self._step + return self._range.step @property # type: ignore @_cudf_nvtx_annotate @@ -368,9 +361,7 @@ def copy(self, name=None, deep=False): name = self.name if name is None else name - return RangeIndex( - start=self._start, stop=self._stop, step=self._step, name=name - ) + return RangeIndex(self._range, name=name) @_cudf_nvtx_annotate def astype(self, dtype, copy: bool = True): @@ -389,8 +380,8 @@ def duplicated(self, keep="first"): @_cudf_nvtx_annotate def __repr__(self): return ( - f"{self.__class__.__name__}(start={self._start}, stop={self._stop}" - f", step={self._step}" + f"{self.__class__.__name__}(start={self.start}, stop={self.stop}" + f", step={self.step}" + ( f", name={pd.io.formats.printing.default_pprint(self.name)}" if self.name is not None @@ -401,16 +392,16 @@ def __repr__(self): @_cudf_nvtx_annotate def __len__(self): - return len(range(self._start, self._stop, self._step)) + return len(self._range) @_cudf_nvtx_annotate def __getitem__(self, index): if isinstance(index, slice): sl_start, sl_stop, sl_step = index.indices(len(self)) - lo = self._start + sl_start * self._step - hi = self._start + sl_stop * self._step - st = self._step * sl_step + lo = self.start + sl_start * self.step + hi = self.start + sl_stop * self.step + st = self.step * sl_step return RangeIndex(start=lo, stop=hi, step=st, name=self._name) elif isinstance(index, Number): @@ -419,18 +410,13 @@ def __getitem__(self, index): index += len_self if not (0 <= index < len_self): raise IndexError("Index out of bounds") - return self._start + index * self._step + return self.start + index * self.step return self._as_int_index()[index] @_cudf_nvtx_annotate def equals(self, other): if isinstance(other, RangeIndex): - if (self._start, self._stop, self._step) == ( - other._start, - other._stop, - other._step, - ): - return True + return self._range == other._range return self._as_int_index().equals(other) @_cudf_nvtx_annotate @@ -442,9 +428,9 @@ def serialize(self): # We don't need to store the GPU buffer for RangeIndexes # cuDF only needs to store start/stop and rehydrate # during de-serialization - header["index_column"]["start"] = self._start - header["index_column"]["stop"] = self._stop - header["index_column"]["step"] = self._step + header["index_column"]["start"] = self.start + header["index_column"]["stop"] = self.stop + header["index_column"]["step"] = self.step frames = [] header["name"] = pickle.dumps(self.name) @@ -484,9 +470,9 @@ def to_pandas( elif arrow_type: raise NotImplementedError(f"{arrow_type=} is not implemented.") return pd.RangeIndex( - start=self._start, - stop=self._stop, - step=self._step, + start=self.start, + stop=self.stop, + step=self.step, dtype=self.dtype, name=self.name, ) @@ -495,19 +481,15 @@ def to_pandas( def is_unique(self): return True - @cached_property - def as_range(self): - return range(self._start, self._stop, self._step) - @cached_property # type: ignore @_cudf_nvtx_annotate def is_monotonic_increasing(self): - return self._step > 0 or len(self) <= 1 + return self.step > 0 or len(self) <= 1 @cached_property # type: ignore @_cudf_nvtx_annotate def is_monotonic_decreasing(self): - return self._step < 0 or len(self) <= 1 + return self.step < 0 or len(self) <= 1 @_cudf_nvtx_annotate def memory_usage(self, deep=False): @@ -590,12 +572,12 @@ def get_indexer(self, target, limit=None, method=None, tolerance=None): def get_loc(self, key): if not is_scalar(key): raise TypeError("Should be a scalar-like") - idx = (key - self._start) / self._step - idx_int_upper_bound = (self._stop - self._start) // self._step + idx = (key - self.start) / self.step + idx_int_upper_bound = (self.stop - self.start) // self.step if idx > idx_int_upper_bound or idx < 0: raise KeyError(key) - idx_int = (key - self._start) // self._step + idx_int = (key - self.start) // self.step if idx_int != idx: raise KeyError(key) return idx_int @@ -607,9 +589,9 @@ def _union(self, other, sort=None): # following notation: *_o -> other, *_s -> self, # and *_r -> result start_s, step_s = self.start, self.step - end_s = self._end + end_s = self.start + self.step * (len(self) - 1) start_o, step_o = other.start, other.step - end_o = other._end + end_o = other.start + other.step * (len(other) - 1) if self.step < 0: start_s, step_s, end_s = end_s, -step_s, start_s if other.step < 0: @@ -854,9 +836,7 @@ def argsort( raise ValueError(f"invalid na_position: {na_position}") indices = cupy.arange(0, len(self)) - if (ascending and self._step < 0) or ( - not ascending and self._step > 0 - ): + if (ascending and self.step < 0) or (not ascending and self.step > 0): indices = indices[::-1] return indices diff --git a/python/cudf/cudf/tests/test_index.py b/python/cudf/cudf/tests/test_index.py index ebbca57bd40..08a7a9148dd 100644 --- a/python/cudf/cudf/tests/test_index.py +++ b/python/cudf/cudf/tests/test_index.py @@ -1606,7 +1606,7 @@ def test_rangeindex_name_not_hashable(): def test_index_rangeindex_search_range(): # step > 0 ridx = RangeIndex(-13, 17, 4) - ri = ridx.as_range + ri = ridx._range for i in range(len(ridx)): assert i == search_range(ridx[i], ri, side="left") assert i + 1 == search_range(ridx[i], ri, side="right") From 70a5b2bda500fe46cd14860b4e2ca0109893c434 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Wed, 24 Apr 2024 13:40:03 -1000 Subject: [PATCH 4/7] Don't materialize column during RangeIndex methods (#15582) Additionally implements some methods that are defined on `BaseIndex` that were not implemented on `RangeIndex` and adds some typing Authors: - Matthew Roeschke (https://github.com/mroeschke) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/15582 --- python/cudf/cudf/core/_base_index.py | 10 ++- python/cudf/cudf/core/index.py | 108 +++++++++++++++++---------- python/cudf/cudf/tests/test_index.py | 23 ++++++ 3 files changed, 100 insertions(+), 41 deletions(-) diff --git a/python/cudf/cudf/core/_base_index.py b/python/cudf/cudf/core/_base_index.py index de44f392eef..b5630ff9a54 100644 --- a/python/cudf/cudf/core/_base_index.py +++ b/python/cudf/cudf/core/_base_index.py @@ -517,7 +517,7 @@ def where(self, cond, other=None, inplace=False): """ raise NotImplementedError - def factorize(self, sort=False, na_sentinel=None, use_na_sentinel=None): + def factorize(self, sort: bool = False, use_na_sentinel: bool = True): raise NotImplementedError def union(self, other, sort=None): @@ -2061,7 +2061,13 @@ def dropna(self, how="any"): one null value. "all" drops only rows containing *all* null values. """ - + if how not in {"any", "all"}: + raise ValueError(f"{how=} must be 'any' or 'all'") + try: + if not self.hasnans: + return self.copy() + except NotImplementedError: + pass # This is to be consistent with IndexedFrame.dropna to handle nans # as nulls by default data_columns = [ diff --git a/python/cudf/cudf/core/index.py b/python/cudf/cudf/core/index.py index e457e818129..6c0acdc5fb0 100644 --- a/python/cudf/cudf/core/index.py +++ b/python/cudf/cudf/core/index.py @@ -21,6 +21,7 @@ import cupy import numpy as np import pandas as pd +import pyarrow as pa from typing_extensions import Self import cudf @@ -248,6 +249,15 @@ def searchsorted( ), "Invalid ascending flag" return search_range(value, self._range, side=side) + def factorize(self, sort: bool = False, use_na_sentinel: bool = True): + if sort and self.step < 0: + codes = cupy.arange(len(self) - 1, -1, -1) + uniques = self[::-1] + else: + codes = cupy.arange(len(self), dtype=np.intp) + uniques = self + return codes, uniques + @property # type: ignore @_cudf_nvtx_annotate def name(self): @@ -260,7 +270,7 @@ def name(self, value): @property # type: ignore @_cudf_nvtx_annotate - def start(self): + def start(self) -> int: """ The value of the `start` parameter (0 if this was not supplied). """ @@ -268,7 +278,7 @@ def start(self): @property # type: ignore @_cudf_nvtx_annotate - def stop(self): + def stop(self) -> int: """ The value of the stop parameter. """ @@ -276,7 +286,7 @@ def stop(self): @property # type: ignore @_cudf_nvtx_annotate - def step(self): + def step(self) -> int: """ The value of the step parameter. """ @@ -284,7 +294,7 @@ def step(self): @property # type: ignore @_cudf_nvtx_annotate - def _num_rows(self): + def _num_rows(self) -> int: return len(self) @cached_property # type: ignore @@ -295,33 +305,33 @@ def _values(self): else: return column.column_empty(0, masked=False, dtype=self.dtype) - def _clean_nulls_from_index(self): + def _clean_nulls_from_index(self) -> Self: return self - def _is_numeric(self): + def _is_numeric(self) -> bool: return True - def _is_boolean(self): + def _is_boolean(self) -> bool: return False - def _is_integer(self): + def _is_integer(self) -> bool: return True - def _is_floating(self): + def _is_floating(self) -> bool: return False - def _is_object(self): + def _is_object(self) -> bool: return False - def _is_categorical(self): + def _is_categorical(self) -> bool: return False - def _is_interval(self): + def _is_interval(self) -> bool: return False @property # type: ignore @_cudf_nvtx_annotate - def hasnans(self): + def hasnans(self) -> bool: return False @property # type: ignore @@ -369,12 +379,15 @@ def astype(self, dtype, copy: bool = True): return self return self._as_int_index().astype(dtype, copy=copy) + def fillna(self, value, downcast=None): + return self.copy() + @_cudf_nvtx_annotate def drop_duplicates(self, keep="first"): return self @_cudf_nvtx_annotate - def duplicated(self, keep="first"): + def duplicated(self, keep="first") -> cupy.ndarray: return cupy.zeros(len(self), dtype=bool) @_cudf_nvtx_annotate @@ -390,6 +403,11 @@ def __repr__(self): + ")" ) + @property + @_cudf_nvtx_annotate + def size(self) -> int: + return len(self) + @_cudf_nvtx_annotate def __len__(self): return len(self._range) @@ -478,12 +496,12 @@ def to_pandas( ) @property - def is_unique(self): + def is_unique(self) -> bool: return True @cached_property # type: ignore @_cudf_nvtx_annotate - def is_monotonic_increasing(self): + def is_monotonic_increasing(self) -> bool: return self.step > 0 or len(self) <= 1 @cached_property # type: ignore @@ -492,7 +510,7 @@ def is_monotonic_decreasing(self): return self.step < 0 or len(self) <= 1 @_cudf_nvtx_annotate - def memory_usage(self, deep=False): + def memory_usage(self, deep: bool = False) -> int: if deep: warnings.warn( "The deep parameter is ignored and is only included " @@ -500,7 +518,7 @@ def memory_usage(self, deep=False): ) return 0 - def unique(self): + def unique(self) -> Self: # RangeIndex always has unique values return self @@ -823,34 +841,37 @@ def _columns(self): @property # type: ignore @_cudf_nvtx_annotate - def values_host(self): - return self.to_pandas().values + def values_host(self) -> np.ndarray: + return np.arange(start=self.start, stop=self.stop, step=self.step) @_cudf_nvtx_annotate def argsort( self, ascending=True, na_position="last", - ): + ) -> cupy.ndarray: if na_position not in {"first", "last"}: raise ValueError(f"invalid na_position: {na_position}") - - indices = cupy.arange(0, len(self)) if (ascending and self.step < 0) or (not ascending and self.step > 0): - indices = indices[::-1] - return indices + return cupy.arange(len(self) - 1, -1, -1) + else: + return cupy.arange(len(self)) @_cudf_nvtx_annotate def where(self, cond, other=None, inplace=False): return self._as_int_index().where(cond, other, inplace) @_cudf_nvtx_annotate - def to_numpy(self): + def to_numpy(self) -> np.ndarray: return self.values_host @_cudf_nvtx_annotate - def to_arrow(self): - return self._as_int_index().to_arrow() + def to_cupy(self) -> cupy.ndarray: + return self.values + + @_cudf_nvtx_annotate + def to_arrow(self) -> pa.Array: + return pa.array(self._range, type=pa.from_numpy_dtype(self.dtype)) def __array__(self, dtype=None): raise TypeError( @@ -861,17 +882,17 @@ def __array__(self, dtype=None): ) @_cudf_nvtx_annotate - def nunique(self): + def nunique(self) -> int: return len(self) @_cudf_nvtx_annotate - def isna(self): + def isna(self) -> cupy.ndarray: return cupy.zeros(len(self), dtype=bool) isnull = isna @_cudf_nvtx_annotate - def notna(self): + def notna(self) -> cupy.ndarray: return cupy.ones(len(self), dtype=bool) notnull = isna @@ -895,12 +916,15 @@ def max(self): return self._minmax("max") @property - def values(self): + def values(self) -> cupy.ndarray: return cupy.arange(self.start, self.stop, self.step) - def any(self): + def any(self) -> bool: return any(self._range) + def all(self) -> bool: + return 0 not in self._range + def append(self, other): result = self._as_int_index().append(other) return self._try_reconstruct_range_index(result) @@ -926,14 +950,20 @@ def isin(self, values): return self._values.isin(values).values - def __neg__(self): - return -self._as_int_index() + def __pos__(self) -> Self: + return self.copy() - def __pos__(self): - return +self._as_int_index() + def __neg__(self) -> Self: + rng = range(-self.start, -self.stop, -self.step) + return type(self)(rng, name=self.name) - def __abs__(self): - return abs(self._as_int_index()) + def __abs__(self) -> Self | Index: + if len(self) == 0 or self.min() >= 0: + return self.copy() + elif self.max() <= 0: + return -self + else: + return abs(self._as_int_index()) @_warn_no_dask_cudf def __dask_tokenize__(self): diff --git a/python/cudf/cudf/tests/test_index.py b/python/cudf/cudf/tests/test_index.py index 08a7a9148dd..c7875b81440 100644 --- a/python/cudf/cudf/tests/test_index.py +++ b/python/cudf/cudf/tests/test_index.py @@ -3176,3 +3176,26 @@ def test_index_to_pandas_arrow_type(scalar): result = idx.to_pandas(arrow_type=True) expected = pd.Index(pd.arrays.ArrowExtensionArray(pa_array)) pd.testing.assert_index_equal(result, expected) + + +@pytest.mark.parametrize("data", [range(-3, 3), range(1, 3), range(0)]) +def test_rangeindex_all(data): + result = cudf.RangeIndex(data).all() + expected = cudf.Index(list(data)).all() + assert result == expected + + +@pytest.mark.parametrize("sort", [True, False]) +@pytest.mark.parametrize("data", [range(2), range(2, -1, -1)]) +def test_rangeindex_factorize(sort, data): + res_codes, res_uniques = cudf.RangeIndex(data).factorize(sort=sort) + exp_codes, exp_uniques = cudf.Index(list(data)).factorize(sort=sort) + assert_eq(res_codes, exp_codes) + assert_eq(res_uniques, exp_uniques) + + +def test_rangeindex_dropna(): + ri = cudf.RangeIndex(range(2)) + result = ri.dropna() + expected = ri.copy() + assert_eq(result, expected) From 4dc9ebbfe5b2a22949c5f24114918e4369d055cd Mon Sep 17 00:00:00 2001 From: David Wendt <45795991+davidwendt@users.noreply.github.com> Date: Thu, 25 Apr 2024 08:53:11 -0400 Subject: [PATCH 5/7] Improve performance for cudf::strings::count_re (#15578) Improves performance of `cudf::strings::count_re` when pattern starts with a literal character. Although this is a specific use case, the regex code has special logic to help speed up the search in this case. Since the pattern indicates the target must contain this character as the start of the matching sequence, it first does a normal find for the character before continuing matching the remaining pattern. The `find()` function can be inefficient for long strings since it is character based and must resolve the character's byte-position by counting from the beginning of the string. For a function like `count_re()` all occurrences are matched within a target meaning longer target strings can incur expensive counting. The solution included here is to introduce a more efficient `find_char()` utility that accepts a `string_view::const_iterator()` which automatically keeps track of its byte and character positions. This helps minimize byte/character counting in between calls from `count_re()` and other similar functions that make repeated calls for all matches (e.g. `replace_re()` and `split_re()`). Close #15567 Authors: - David Wendt (https://github.com/davidwendt) Approvers: - Yunsong Wang (https://github.com/PointKernel) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/15578 --- cpp/benchmarks/string/contains.cpp | 4 ++-- cpp/benchmarks/string/count.cpp | 12 ++++++++---- cpp/src/strings/regex/regex.inl | 19 ++++++++++++++----- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/cpp/benchmarks/string/contains.cpp b/cpp/benchmarks/string/contains.cpp index 6d839c1de64..ae6c8b844c8 100644 --- a/cpp/benchmarks/string/contains.cpp +++ b/cpp/benchmarks/string/contains.cpp @@ -80,7 +80,7 @@ std::unique_ptr build_input_column(cudf::size_type n_rows, } // longer pattern lengths demand more working memory per string -std::string patterns[] = {"^\\d+ [a-z]+", "[A-Z ]+\\d+ +\\d+[A-Z]+\\d+$"}; +std::string patterns[] = {"^\\d+ [a-z]+", "[A-Z ]+\\d+ +\\d+[A-Z]+\\d+$", "5W43"}; static void bench_contains(nvbench::state& state) { @@ -114,4 +114,4 @@ NVBENCH_BENCH(bench_contains) .add_int64_axis("row_width", {32, 64, 128, 256, 512}) .add_int64_axis("num_rows", {32768, 262144, 2097152, 16777216}) .add_int64_axis("hit_rate", {50, 100}) // percentage - .add_int64_axis("pattern", {0, 1}); + .add_int64_axis("pattern", {0, 1, 2}); diff --git a/cpp/benchmarks/string/count.cpp b/cpp/benchmarks/string/count.cpp index a656010dca5..f964bc5d224 100644 --- a/cpp/benchmarks/string/count.cpp +++ b/cpp/benchmarks/string/count.cpp @@ -25,10 +25,13 @@ #include +static std::string patterns[] = {"\\d+", "a"}; + static void bench_count(nvbench::state& state) { - auto const num_rows = static_cast(state.get_int64("num_rows")); - auto const row_width = static_cast(state.get_int64("row_width")); + auto const num_rows = static_cast(state.get_int64("num_rows")); + auto const row_width = static_cast(state.get_int64("row_width")); + auto const pattern_index = static_cast(state.get_int64("pattern")); if (static_cast(num_rows) * static_cast(row_width) >= static_cast(std::numeric_limits::max())) { @@ -41,7 +44,7 @@ static void bench_count(nvbench::state& state) create_random_table({cudf::type_id::STRING}, row_count{num_rows}, table_profile); cudf::strings_column_view input(table->view().column(0)); - std::string pattern = "\\d+"; + auto const pattern = patterns[pattern_index]; auto prog = cudf::strings::regex_program::create(pattern); @@ -59,4 +62,5 @@ static void bench_count(nvbench::state& state) NVBENCH_BENCH(bench_count) .set_name("count") .add_int64_axis("row_width", {32, 64, 128, 256, 512, 1024, 2048}) - .add_int64_axis("num_rows", {4096, 32768, 262144, 2097152, 16777216}); + .add_int64_axis("num_rows", {4096, 32768, 262144, 2097152, 16777216}) + .add_int64_axis("pattern", {0, 1}); diff --git a/cpp/src/strings/regex/regex.inl b/cpp/src/strings/regex/regex.inl index ce12dc17aa4..10e06505094 100644 --- a/cpp/src/strings/regex/regex.inl +++ b/cpp/src/strings/regex/regex.inl @@ -217,6 +217,15 @@ __device__ __forceinline__ reprog_device reprog_device::load(reprog_device const : reinterpret_cast(buffer)[0]; } +__device__ __forceinline__ static string_view::const_iterator find_char( + cudf::char_utf8 chr, string_view const d_str, string_view::const_iterator itr) +{ + while (itr.byte_offset() < d_str.size_bytes() && *itr != chr) { + ++itr; + } + return itr; +} + /** * @brief Evaluate a specific string against regex pattern compiled to this instance. * @@ -253,16 +262,16 @@ __device__ __forceinline__ match_result reprog_device::regexec(string_view const case BOL: if (pos == 0) break; if (jnk.startchar != '^') { return thrust::nullopt; } - --pos; + --itr; startchar = static_cast('\n'); case CHAR: { - auto const fidx = dstr.find(startchar, pos); - if (fidx == string_view::npos) { return thrust::nullopt; } - pos = fidx + (jnk.starttype == BOL); + auto const find_itr = find_char(startchar, dstr, itr); + if (find_itr.byte_offset() >= dstr.size_bytes()) { return thrust::nullopt; } + itr = find_itr + (jnk.starttype == BOL); + pos = itr.position(); break; } } - itr += (pos - itr.position()); // faster to increment position } if (((eos < 0) || (pos < eos)) && match == 0) { From 65c2b53602d70f7f50c7dd7544ca0fd07ac8b455 Mon Sep 17 00:00:00 2001 From: David Wendt <45795991+davidwendt@users.noreply.github.com> Date: Thu, 25 Apr 2024 15:12:01 -0400 Subject: [PATCH 6/7] Fix debug warnings/errors in from_arrow_device_test.cpp (#15596) Fixes debug build errors introduced by #15458 These warnings show up in a debug build where warnings become errors. Some of the errors: ``` /cudf/cpp/tests/interop/from_arrow_device_test.cpp:103:27: error: ignoring return value of 'ArrowErrorCode cudfArrowSchemaSetTypeStruct(ArrowSchema*, int64_t)' declared with attribute 'warn_unused_result' [-Werror=unused-result] 103 | ArrowSchemaSetTypeStruct(input_schema.get(), 1); /cudf/cpp/tests/interop/from_arrow_device_test.cpp:105:29: error: ignoring return value of 'ArrowErrorCode cudfArrowSchemaSetTypeDateTime(ArrowSchema*, ArrowType, ArrowTimeUnit, const char*)' declared with attribute 'warn_unused_result' [-Werror=unused-result] 105 | ArrowSchemaSetTypeDateTime( /cudf/cpp/tests/interop/from_arrow_device_test.cpp:107:21: error: ignoring return value of 'ArrowErrorCode cudfArrowSchemaSetName(ArrowSchema*, const char*)' declared with attribute 'warn_unused_result' [-Werror=unused-result] 107 | ArrowSchemaSetName(input_schema->children[0], "a"); /cudf/cpp/tests/interop/from_arrow_device_test.cpp:110:27: error: ignoring return value of 'ArrowErrorCode cudfArrowArrayInitFromSchema(ArrowArray*, const ArrowSchema*, ArrowError*)' declared with attribute 'warn_unused_result' [-Werror=unused-result] 110 | ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr); /cudf/cpp/tests/interop/from_arrow_device_test.cpp:115:26: error: ignoring return value of 'ArrowErrorCode ArrowBufferSetAllocator(ArrowBuffer*, ArrowBufferAllocator)' declared with attribute 'warn_unused_result' [-Werror=unused-result] 115 | ArrowBufferSetAllocator(ArrowArrayBuffer(input_array->children[0], 1), noop_alloc); | ~~~~~~~~~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /cudf/cpp/tests/interop/from_arrow_device_test.cpp:118:27: error: ignoring return value of 'ArrowErrorCode cudfArrowArrayFinishBuilding(ArrowArray*, ArrowValidationLevel, ArrowError*)' declared with attribute 'warn_unused_result' [-Werror=unused-result] 118 | ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_MINIMAL, nullptr); /cudf/cpp/tests/interop/from_arrow_device_test.cpp: In member function 'virtual void FromArrowDeviceTest_NestedList_Test::TestBody()': /cudf/cpp/tests/interop/from_arrow_device_test.cpp:202:27: error: ignoring return value of 'ArrowErrorCode cudfArrowSchemaSetTypeStruct(ArrowSchema*, int64_t)' declared with attribute 'warn_unused_result' [-Werror=unused-result] 202 | ArrowSchemaSetTypeStruct(input_schema.get(), 1); /cudf/cpp/tests/interop/from_arrow_device_test.cpp:204:26: error: ignoring return value of 'ArrowErrorCode cudfArrowSchemaInitFromType(ArrowSchema*, ArrowType)' declared with attribute 'warn_unused_result' [-Werror=unused-result] 204 | ArrowSchemaInitFromType(input_schema->children[0], NANOARROW_TYPE_LIST); /cudf/cpp/tests/interop/from_arrow_device_test.cpp:205:21: error: ignoring return value of 'ArrowErrorCode cudfArrowSchemaSetName(ArrowSchema*, const char*)' declared with attribute 'warn_unused_result' [-Werror=unused-result] 205 | ArrowSchemaSetName(input_schema->children[0], "a"); /cudf/cpp/tests/interop/from_arrow_device_test.cpp:208:26: error: ignoring return value of 'ArrowErrorCode cudfArrowSchemaInitFromType(ArrowSchema*, ArrowType)' declared with attribute 'warn_unused_result' [-Werror=unused-result] 208 | ArrowSchemaInitFromType(input_schema->children[0]->children[0], NANOARROW_TYPE_LIST); /cudf/cpp/tests/interop/from_arrow_device_test.cpp:209:21: error: ignoring return value of 'ArrowErrorCode cudfArrowSchemaSetName(ArrowSchema*, const char*)' declared with attribute 'warn_unused_result' [-Werror=unused-result] 209 | ArrowSchemaSetName(input_schema->children[0]->children[0], "element"); /cudf/cpp/tests/interop/from_arrow_device_test.cpp:212:26: error: ignoring return value of 'ArrowErrorCode cudfArrowSchemaInitFromType(ArrowSchema*, ArrowType)' declared with attribute 'warn_unused_result' [-Werror=unused-result] 212 | ArrowSchemaInitFromType(input_schema->children[0]->children[0]->children[0], /cudf/cpp/tests/interop/from_arrow_device_test.cpp:214:21: error: ignoring return value of 'ArrowErrorCode cudfArrowSchemaSetName(ArrowSchema*, const char*)' declared with attribute 'warn_unused_result' [-Werror=unused-result] 214 | ArrowSchemaSetName(input_schema->children[0]->children[0]->children[0], "element"); /cudf/cpp/tests/interop/from_arrow_device_test.cpp:226:27: error: ignoring return value of 'ArrowErrorCode cudfArrowArrayFinishBuilding(ArrowArray*, ArrowValidationLevel, ArrowError*)' declared with attribute 'warn_unused_result' [-Werror=unused-result] 226 | ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr); /cudf/cpp/tests/interop/from_arrow_device_test.cpp: In member function 'virtual void FromArrowDeviceTest_StructColumn_Test::TestBody()': ``` Closes #15597 Authors: - David Wendt (https://github.com/davidwendt) Approvers: - Muhammad Haseeb (https://github.com/mhaseeb123) - Vukasin Milovanovic (https://github.com/vuule) URL: https://github.com/rapidsai/cudf/pull/15596 --- cpp/tests/interop/from_arrow_device_test.cpp | 229 +++++++++++-------- cpp/tests/interop/nanoarrow_utils.hpp | 4 +- cpp/tests/interop/to_arrow_device_test.cpp | 7 +- 3 files changed, 135 insertions(+), 105 deletions(-) diff --git a/cpp/tests/interop/from_arrow_device_test.cpp b/cpp/tests/interop/from_arrow_device_test.cpp index 95cbe8057d1..66bd4dd1bfb 100644 --- a/cpp/tests/interop/from_arrow_device_test.cpp +++ b/cpp/tests/interop/from_arrow_device_test.cpp @@ -100,22 +100,26 @@ TEST_F(FromArrowDeviceTest, DateTimeTable) nanoarrow::UniqueSchema input_schema; ArrowSchemaInit(input_schema.get()); - ArrowSchemaSetTypeStruct(input_schema.get(), 1); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeStruct(input_schema.get(), 1)); ArrowSchemaInit(input_schema->children[0]); - ArrowSchemaSetTypeDateTime( - input_schema->children[0], NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_MILLI, nullptr); - ArrowSchemaSetName(input_schema->children[0], "a"); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeDateTime( + input_schema->children[0], NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_MILLI, nullptr)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(input_schema->children[0], "a")); nanoarrow::UniqueArray input_array; - ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr); + NANOARROW_THROW_NOT_OK(ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr)); input_array->length = 6; input_array->null_count = 0; input_array->children[0]->length = 6; input_array->children[0]->null_count = 0; - ArrowBufferSetAllocator(ArrowArrayBuffer(input_array->children[0], 1), noop_alloc); + NANOARROW_THROW_NOT_OK( + ArrowBufferSetAllocator(ArrowArrayBuffer(input_array->children[0], 1), noop_alloc)); ArrowArrayBuffer(input_array->children[0], 1)->data = const_cast(cudf::column_view(col).data()); - ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_MINIMAL, nullptr); + ArrowArrayBuffer(input_array->children[0], 1)->size_bytes = + sizeof(int64_t) * cudf::column_view(col).size(); + NANOARROW_THROW_NOT_OK( + ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_MINIMAL, nullptr)); ArrowDeviceArray input_device_array; input_device_array.device_id = rmm::get_current_cuda_device().value(); @@ -155,23 +159,27 @@ TYPED_TEST(FromArrowDeviceTestDurationsTest, DurationTable) nanoarrow::UniqueSchema input_schema; ArrowSchemaInit(input_schema.get()); - ArrowSchemaSetTypeStruct(input_schema.get(), 1); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeStruct(input_schema.get(), 1)); ArrowSchemaInit(input_schema->children[0]); - ArrowSchemaSetTypeDateTime( - input_schema->children[0], NANOARROW_TYPE_DURATION, time_unit, nullptr); - ArrowSchemaSetName(input_schema->children[0], "a"); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeDateTime( + input_schema->children[0], NANOARROW_TYPE_DURATION, time_unit, nullptr)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(input_schema->children[0], "a")); - auto data_ptr = expected_table_view.column(0).data(); + auto data_ptr = expected_table_view.column(0).data(); + auto data_size = expected_table_view.column(0).size(); nanoarrow::UniqueArray input_array; - ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr); + NANOARROW_THROW_NOT_OK(ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr)); input_array->length = expected_table_view.num_rows(); input_array->null_count = 0; input_array->children[0]->length = expected_table_view.num_rows(); input_array->children[0]->null_count = 0; - ArrowBufferSetAllocator(ArrowArrayBuffer(input_array->children[0], 1), noop_alloc); - ArrowArrayBuffer(input_array->children[0], 1)->data = const_cast(data_ptr); - ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_MINIMAL, nullptr); + NANOARROW_THROW_NOT_OK( + ArrowBufferSetAllocator(ArrowArrayBuffer(input_array->children[0], 1), noop_alloc)); + ArrowArrayBuffer(input_array->children[0], 1)->data = const_cast(data_ptr); + ArrowArrayBuffer(input_array->children[0], 1)->size_bytes = sizeof(T) * data_size; + NANOARROW_THROW_NOT_OK( + ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_MINIMAL, nullptr)); ArrowDeviceArray input_device_array; input_device_array.device_id = rmm::get_current_cuda_device().value(); @@ -199,19 +207,21 @@ TEST_F(FromArrowDeviceTest, NestedList) nanoarrow::UniqueSchema input_schema; ArrowSchemaInit(input_schema.get()); - ArrowSchemaSetTypeStruct(input_schema.get(), 1); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeStruct(input_schema.get(), 1)); - ArrowSchemaInitFromType(input_schema->children[0], NANOARROW_TYPE_LIST); - ArrowSchemaSetName(input_schema->children[0], "a"); + NANOARROW_THROW_NOT_OK(ArrowSchemaInitFromType(input_schema->children[0], NANOARROW_TYPE_LIST)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(input_schema->children[0], "a")); input_schema->children[0]->flags = ARROW_FLAG_NULLABLE; - ArrowSchemaInitFromType(input_schema->children[0]->children[0], NANOARROW_TYPE_LIST); - ArrowSchemaSetName(input_schema->children[0]->children[0], "element"); + NANOARROW_THROW_NOT_OK( + ArrowSchemaInitFromType(input_schema->children[0]->children[0], NANOARROW_TYPE_LIST)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(input_schema->children[0]->children[0], "element")); input_schema->children[0]->children[0]->flags = 0; - ArrowSchemaInitFromType(input_schema->children[0]->children[0]->children[0], - NANOARROW_TYPE_INT64); - ArrowSchemaSetName(input_schema->children[0]->children[0]->children[0], "element"); + NANOARROW_THROW_NOT_OK(ArrowSchemaInitFromType( + input_schema->children[0]->children[0]->children[0], NANOARROW_TYPE_INT64)); + NANOARROW_THROW_NOT_OK( + ArrowSchemaSetName(input_schema->children[0]->children[0]->children[0], "element")); input_schema->children[0]->children[0]->children[0]->flags = ARROW_FLAG_NULLABLE; nanoarrow::UniqueArray input_array; @@ -223,7 +233,8 @@ TEST_F(FromArrowDeviceTest, NestedList) cudf::lists_column_view nested_view{lview.child()}; populate_list_from_col(top_list->children[0], nested_view); populate_from_col(top_list->children[0]->children[0], nested_view.child()); - ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr); + NANOARROW_THROW_NOT_OK( + ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr)); ArrowDeviceArray input_device_array; input_device_array.device_id = rmm::get_current_cuda_device().value(); @@ -287,47 +298,52 @@ TEST_F(FromArrowDeviceTest, StructColumn) nanoarrow::UniqueSchema input_schema; ArrowSchemaInit(input_schema.get()); - ArrowSchemaSetTypeStruct(input_schema.get(), 1); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeStruct(input_schema.get(), 1)); ArrowSchemaInit(input_schema->children[0]); - ArrowSchemaSetTypeStruct(input_schema->children[0], 5); - ArrowSchemaSetName(input_schema->children[0], "a"); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeStruct(input_schema->children[0], 5)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(input_schema->children[0], "a")); input_schema->children[0]->flags = 0; auto child = input_schema->children[0]; - ArrowSchemaInitFromType(child->children[0], NANOARROW_TYPE_STRING); - ArrowSchemaSetName(child->children[0], "string"); + NANOARROW_THROW_NOT_OK(ArrowSchemaInitFromType(child->children[0], NANOARROW_TYPE_STRING)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(child->children[0], "string")); child->children[0]->flags = 0; - ArrowSchemaInitFromType(child->children[1], NANOARROW_TYPE_INT32); - ArrowSchemaSetName(child->children[1], "integral"); + NANOARROW_THROW_NOT_OK(ArrowSchemaInitFromType(child->children[1], NANOARROW_TYPE_INT32)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(child->children[1], "integral")); child->children[1]->flags = 0; - ArrowSchemaInitFromType(child->children[2], NANOARROW_TYPE_BOOL); - ArrowSchemaSetName(child->children[2], "bool"); + NANOARROW_THROW_NOT_OK(ArrowSchemaInitFromType(child->children[2], NANOARROW_TYPE_BOOL)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(child->children[2], "bool")); child->children[2]->flags = 0; - ArrowSchemaInitFromType(child->children[3], NANOARROW_TYPE_LIST); - ArrowSchemaSetName(child->children[3], "nested_list"); + NANOARROW_THROW_NOT_OK(ArrowSchemaInitFromType(child->children[3], NANOARROW_TYPE_LIST)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(child->children[3], "nested_list")); child->children[3]->flags = 0; - ArrowSchemaInitFromType(child->children[3]->children[0], NANOARROW_TYPE_LIST); - ArrowSchemaSetName(child->children[3]->children[0], "element"); + NANOARROW_THROW_NOT_OK( + ArrowSchemaInitFromType(child->children[3]->children[0], NANOARROW_TYPE_LIST)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(child->children[3]->children[0], "element")); child->children[3]->children[0]->flags = 0; - ArrowSchemaInitFromType(child->children[3]->children[0]->children[0], NANOARROW_TYPE_INT64); - ArrowSchemaSetName(child->children[3]->children[0]->children[0], "element"); + NANOARROW_THROW_NOT_OK( + ArrowSchemaInitFromType(child->children[3]->children[0]->children[0], NANOARROW_TYPE_INT64)); + NANOARROW_THROW_NOT_OK( + ArrowSchemaSetName(child->children[3]->children[0]->children[0], "element")); child->children[3]->children[0]->children[0]->flags = 0; ArrowSchemaInit(child->children[4]); - ArrowSchemaSetTypeStruct(child->children[4], 2); - ArrowSchemaSetName(child->children[4], "struct"); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeStruct(child->children[4], 2)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(child->children[4], "struct")); - ArrowSchemaInitFromType(child->children[4]->children[0], NANOARROW_TYPE_STRING); - ArrowSchemaSetName(child->children[4]->children[0], "string2"); - ArrowSchemaInitFromType(child->children[4]->children[1], NANOARROW_TYPE_INT32); - ArrowSchemaSetName(child->children[4]->children[1], "integral2"); + NANOARROW_THROW_NOT_OK( + ArrowSchemaInitFromType(child->children[4]->children[0], NANOARROW_TYPE_STRING)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(child->children[4]->children[0], "string2")); + NANOARROW_THROW_NOT_OK( + ArrowSchemaInitFromType(child->children[4]->children[1], NANOARROW_TYPE_INT32)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(child->children[4]->children[1], "integral2")); nanoarrow::UniqueArray input_array; - ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr); + NANOARROW_THROW_NOT_OK(ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr)); input_array->length = expected_table_view.num_rows(); @@ -336,7 +352,7 @@ TEST_F(FromArrowDeviceTest, StructColumn) array_a->length = view_a.size(); array_a->null_count = view_a.null_count(); - ArrowBufferSetAllocator(ArrowArrayBuffer(array_a, 0), noop_alloc); + NANOARROW_THROW_NOT_OK(ArrowBufferSetAllocator(ArrowArrayBuffer(array_a, 0), noop_alloc)); ArrowArrayValidityBitmap(array_a)->buffer.data = const_cast(reinterpret_cast(view_a.null_mask())); @@ -354,14 +370,15 @@ TEST_F(FromArrowDeviceTest, StructColumn) array_struct->length = view_struct.size(); array_struct->null_count = view_struct.null_count(); - ArrowBufferSetAllocator(ArrowArrayBuffer(array_struct, 0), noop_alloc); + NANOARROW_THROW_NOT_OK(ArrowBufferSetAllocator(ArrowArrayBuffer(array_struct, 0), noop_alloc)); ArrowArrayValidityBitmap(array_struct)->buffer.data = const_cast(reinterpret_cast(view_struct.null_mask())); populate_from_col(array_struct->children[0], view_struct.child(0)); populate_from_col(array_struct->children[1], view_struct.child(1)); - ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr); + NANOARROW_THROW_NOT_OK( + ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr)); ArrowDeviceArray input_device_array; input_device_array.device_id = rmm::get_current_cuda_device().value(); @@ -406,25 +423,28 @@ TEST_F(FromArrowDeviceTest, DictionaryIndicesType) nanoarrow::UniqueSchema input_schema; ArrowSchemaInit(input_schema.get()); - ArrowSchemaSetTypeStruct(input_schema.get(), 3); - - ArrowSchemaInitFromType(input_schema->children[0], NANOARROW_TYPE_INT8); - ArrowSchemaSetName(input_schema->children[0], "a"); - ArrowSchemaAllocateDictionary(input_schema->children[0]); - ArrowSchemaInitFromType(input_schema->children[0]->dictionary, NANOARROW_TYPE_INT64); - - ArrowSchemaInitFromType(input_schema->children[1], NANOARROW_TYPE_INT16); - ArrowSchemaSetName(input_schema->children[1], "b"); - ArrowSchemaAllocateDictionary(input_schema->children[1]); - ArrowSchemaInitFromType(input_schema->children[1]->dictionary, NANOARROW_TYPE_INT64); - - ArrowSchemaInitFromType(input_schema->children[2], NANOARROW_TYPE_INT64); - ArrowSchemaSetName(input_schema->children[2], "c"); - ArrowSchemaAllocateDictionary(input_schema->children[2]); - ArrowSchemaInitFromType(input_schema->children[2]->dictionary, NANOARROW_TYPE_INT64); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeStruct(input_schema.get(), 3)); + + NANOARROW_THROW_NOT_OK(ArrowSchemaInitFromType(input_schema->children[0], NANOARROW_TYPE_INT8)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(input_schema->children[0], "a")); + NANOARROW_THROW_NOT_OK(ArrowSchemaAllocateDictionary(input_schema->children[0])); + NANOARROW_THROW_NOT_OK( + ArrowSchemaInitFromType(input_schema->children[0]->dictionary, NANOARROW_TYPE_INT64)); + + NANOARROW_THROW_NOT_OK(ArrowSchemaInitFromType(input_schema->children[1], NANOARROW_TYPE_INT16)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(input_schema->children[1], "b")); + NANOARROW_THROW_NOT_OK(ArrowSchemaAllocateDictionary(input_schema->children[1])); + NANOARROW_THROW_NOT_OK( + ArrowSchemaInitFromType(input_schema->children[1]->dictionary, NANOARROW_TYPE_INT64)); + + NANOARROW_THROW_NOT_OK(ArrowSchemaInitFromType(input_schema->children[2], NANOARROW_TYPE_INT64)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(input_schema->children[2], "c")); + NANOARROW_THROW_NOT_OK(ArrowSchemaAllocateDictionary(input_schema->children[2])); + NANOARROW_THROW_NOT_OK( + ArrowSchemaInitFromType(input_schema->children[2]->dictionary, NANOARROW_TYPE_INT64)); nanoarrow::UniqueArray input_array; - ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr); + NANOARROW_THROW_NOT_OK(ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr)); input_array->length = expected_table.num_rows(); input_array->null_count = 0; @@ -446,7 +466,8 @@ TEST_F(FromArrowDeviceTest, DictionaryIndicesType) populate_from_col(input_array->children[2]->dictionary, cudf::dictionary_column_view{expected_table_view.column(2)}.keys()); - ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr); + NANOARROW_THROW_NOT_OK( + ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr)); ArrowDeviceArray input_device_array; input_device_array.device_id = rmm::get_current_cuda_device().value(); @@ -562,20 +583,22 @@ TEST_F(FromArrowDeviceTest, FixedPoint128Table) nanoarrow::UniqueSchema input_schema; ArrowSchemaInit(input_schema.get()); - ArrowSchemaSetTypeStruct(input_schema.get(), 1); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeStruct(input_schema.get(), 1)); ArrowSchemaInit(input_schema->children[0]); - ArrowSchemaSetTypeDecimal(input_schema->children[0], - NANOARROW_TYPE_DECIMAL128, - cudf::detail::max_precision<__int128_t>(), - -scale); - ArrowSchemaSetName(input_schema->children[0], "a"); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeDecimal(input_schema->children[0], + NANOARROW_TYPE_DECIMAL128, + cudf::detail::max_precision<__int128_t>(), + -scale)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(input_schema->children[0], "a")); nanoarrow::UniqueArray input_array; - ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr); + NANOARROW_THROW_NOT_OK( + ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr)); input_array->length = expected.num_rows(); populate_from_col<__int128_t>(input_array->children[0], expected.column(0)); - ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr); + NANOARROW_THROW_NOT_OK( + ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr)); ArrowDeviceArray input_device_array; input_device_array.device_id = rmm::get_current_cuda_device().value(); @@ -607,20 +630,22 @@ TEST_F(FromArrowDeviceTest, FixedPoint128TableLarge) nanoarrow::UniqueSchema input_schema; ArrowSchemaInit(input_schema.get()); - ArrowSchemaSetTypeStruct(input_schema.get(), 1); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeStruct(input_schema.get(), 1)); ArrowSchemaInit(input_schema->children[0]); - ArrowSchemaSetTypeDecimal(input_schema->children[0], - NANOARROW_TYPE_DECIMAL128, - cudf::detail::max_precision<__int128_t>(), - -scale); - ArrowSchemaSetName(input_schema->children[0], "a"); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeDecimal(input_schema->children[0], + NANOARROW_TYPE_DECIMAL128, + cudf::detail::max_precision<__int128_t>(), + -scale)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(input_schema->children[0], "a")); nanoarrow::UniqueArray input_array; - ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr); + NANOARROW_THROW_NOT_OK( + ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr)); input_array->length = expected.num_rows(); populate_from_col<__int128_t>(input_array->children[0], expected.column(0)); - ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr); + NANOARROW_THROW_NOT_OK( + ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr)); ArrowDeviceArray input_device_array; input_device_array.device_id = rmm::get_current_cuda_device().value(); @@ -652,20 +677,22 @@ TEST_F(FromArrowDeviceTest, FixedPoint128TableNulls) nanoarrow::UniqueSchema input_schema; ArrowSchemaInit(input_schema.get()); - ArrowSchemaSetTypeStruct(input_schema.get(), 1); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeStruct(input_schema.get(), 1)); ArrowSchemaInit(input_schema->children[0]); - ArrowSchemaSetTypeDecimal(input_schema->children[0], - NANOARROW_TYPE_DECIMAL128, - cudf::detail::max_precision<__int128_t>(), - -scale); - ArrowSchemaSetName(input_schema->children[0], "a"); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeDecimal(input_schema->children[0], + NANOARROW_TYPE_DECIMAL128, + cudf::detail::max_precision<__int128_t>(), + -scale)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(input_schema->children[0], "a")); nanoarrow::UniqueArray input_array; - ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr); + NANOARROW_THROW_NOT_OK( + ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr)); input_array->length = expected.num_rows(); populate_from_col<__int128_t>(input_array->children[0], expected.column(0)); - ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr); + NANOARROW_THROW_NOT_OK( + ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr)); ArrowDeviceArray input_device_array; input_device_array.device_id = rmm::get_current_cuda_device().value(); @@ -699,20 +726,22 @@ TEST_F(FromArrowDeviceTest, FixedPoint128TableNullsLarge) nanoarrow::UniqueSchema input_schema; ArrowSchemaInit(input_schema.get()); - ArrowSchemaSetTypeStruct(input_schema.get(), 1); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeStruct(input_schema.get(), 1)); ArrowSchemaInit(input_schema->children[0]); - ArrowSchemaSetTypeDecimal(input_schema->children[0], - NANOARROW_TYPE_DECIMAL128, - cudf::detail::max_precision<__int128_t>(), - -scale); - ArrowSchemaSetName(input_schema->children[0], "a"); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetTypeDecimal(input_schema->children[0], + NANOARROW_TYPE_DECIMAL128, + cudf::detail::max_precision<__int128_t>(), + -scale)); + NANOARROW_THROW_NOT_OK(ArrowSchemaSetName(input_schema->children[0], "a")); nanoarrow::UniqueArray input_array; - ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr); + NANOARROW_THROW_NOT_OK( + ArrowArrayInitFromSchema(input_array.get(), input_schema.get(), nullptr)); input_array->length = expected.num_rows(); populate_from_col<__int128_t>(input_array->children[0], expected.column(0)); - ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr); + NANOARROW_THROW_NOT_OK( + ArrowArrayFinishBuilding(input_array.get(), NANOARROW_VALIDATION_LEVEL_NONE, nullptr)); ArrowDeviceArray input_device_array; input_device_array.device_id = rmm::get_current_cuda_device().value(); diff --git a/cpp/tests/interop/nanoarrow_utils.hpp b/cpp/tests/interop/nanoarrow_utils.hpp index b795bafed97..fb5d1060f6f 100644 --- a/cpp/tests/interop/nanoarrow_utils.hpp +++ b/cpp/tests/interop/nanoarrow_utils.hpp @@ -122,13 +122,13 @@ void populate_dict_from_col(ArrowArray* arr, cudf::dictionary_column_view dview) { arr->length = dview.size(); arr->null_count = dview.null_count(); - ArrowBufferSetAllocator(ArrowArrayBuffer(arr, 0), noop_alloc); + NANOARROW_THROW_NOT_OK(ArrowBufferSetAllocator(ArrowArrayBuffer(arr, 0), noop_alloc)); ArrowArrayValidityBitmap(arr)->buffer.size_bytes = cudf::bitmask_allocation_size_bytes(dview.size()); ArrowArrayValidityBitmap(arr)->buffer.data = const_cast(reinterpret_cast(dview.null_mask())); - ArrowBufferSetAllocator(ArrowArrayBuffer(arr, 1), noop_alloc); + NANOARROW_THROW_NOT_OK(ArrowBufferSetAllocator(ArrowArrayBuffer(arr, 1), noop_alloc)); ArrowArrayBuffer(arr, 1)->size_bytes = sizeof(IND_TYPE) * dview.indices().size(); ArrowArrayBuffer(arr, 1)->data = const_cast(dview.indices().data()); diff --git a/cpp/tests/interop/to_arrow_device_test.cpp b/cpp/tests/interop/to_arrow_device_test.cpp index fb346dad538..626aeb53cdd 100644 --- a/cpp/tests/interop/to_arrow_device_test.cpp +++ b/cpp/tests/interop/to_arrow_device_test.cpp @@ -217,7 +217,8 @@ get_nanoarrow_tables(cudf::size_type length) populate_from_col(arrow->children[5]->children[1], struct_view.child(1)); arrow->children[5]->length = struct_view.size(); arrow->children[5]->null_count = struct_view.null_count(); - ArrowBufferSetAllocator(ArrowArrayBuffer(arrow->children[5], 0), noop_alloc); + NANOARROW_THROW_NOT_OK( + ArrowBufferSetAllocator(ArrowArrayBuffer(arrow->children[5], 0), noop_alloc)); ArrowArrayValidityBitmap(arrow->children[5])->buffer.size_bytes = cudf::bitmask_allocation_size_bytes(struct_view.size()); ArrowArrayValidityBitmap(arrow->children[5])->buffer.data = @@ -241,13 +242,13 @@ void populate_list_from_col(ArrowArray* arr, cudf::lists_column_view view) arr->length = view.size(); arr->null_count = view.null_count(); - ArrowBufferSetAllocator(ArrowArrayBuffer(arr, 0), noop_alloc); + NANOARROW_THROW_NOT_OK(ArrowBufferSetAllocator(ArrowArrayBuffer(arr, 0), noop_alloc)); ArrowArrayValidityBitmap(arr)->buffer.size_bytes = cudf::bitmask_allocation_size_bytes(view.size()); ArrowArrayValidityBitmap(arr)->buffer.data = const_cast(reinterpret_cast(view.null_mask())); - ArrowBufferSetAllocator(ArrowArrayBuffer(arr, 1), noop_alloc); + NANOARROW_THROW_NOT_OK(ArrowBufferSetAllocator(ArrowArrayBuffer(arr, 1), noop_alloc)); ArrowArrayBuffer(arr, 1)->size_bytes = sizeof(int32_t) * view.offsets().size(); ArrowArrayBuffer(arr, 1)->data = const_cast(view.offsets().data()); } From c62c5f69ca5036d69188ab8e43ac2ab5276d6cfa Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 26 Apr 2024 04:02:25 -0500 Subject: [PATCH 7/7] Fix a JNI bug in JSON parsing fixup (#15550) When parsing JSON in the current code if no columns can be parsed out of the data, then an empty table is returned. Earlier we put in a work around to this so that we could pass in the number of rows needed and the JSON parsing code would make a table of null values for it. This had some issues with structs and lists which needed an extended way to produce the null scalar. This adds in code to do just that. Authors: - Robert (Bobby) Evans (https://github.com/revans2) - Nghia Truong (https://github.com/ttnghia) Approvers: - Jason Lowe (https://github.com/jlowe) URL: https://github.com/rapidsai/cudf/pull/15550 --- java/src/main/java/ai/rapids/cudf/Schema.java | 28 ++++++++++++++++++- java/src/main/java/ai/rapids/cudf/Table.java | 22 +++++++++++++-- 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/Schema.java b/java/src/main/java/ai/rapids/cudf/Schema.java index c8571dd841c..43603386649 100644 --- a/java/src/main/java/ai/rapids/cudf/Schema.java +++ b/java/src/main/java/ai/rapids/cudf/Schema.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; /** * The schema of data to be read in. @@ -221,6 +222,13 @@ public DType[] getChildTypes() { return ret; } + public int getNumChildren() { + if (childSchemas == null) { + return 0; + } + return childSchemas.size(); + } + int[] getFlattenedNumChildren() { flattenIfNeeded(); return flattenedCounts; @@ -243,7 +251,25 @@ public boolean isStructOrHasStructDescendant() { return false; } - public static class Builder { + public HostColumnVector.DataType asHostDataType() { + if (topLevelType == DType.LIST) { + assert(childSchemas != null && childSchemas.size() == 1); + HostColumnVector.DataType element = childSchemas.get(0).asHostDataType(); + return new HostColumnVector.ListType(true, element); + } else if (topLevelType == DType.STRUCT) { + if (childSchemas == null) { + return new HostColumnVector.StructType(true); + } else { + List childTypes = + childSchemas.stream().map(Schema::asHostDataType).collect(Collectors.toList()); + return new HostColumnVector.StructType(true, childTypes); + } + } else { + return new HostColumnVector.BasicType(true, topLevelType); + } + } + + public static class Builder { private final DType topLevelType; private final List names; private final List types; diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index 4038b3a40b8..4e737451ed6 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -1220,8 +1220,26 @@ private static Table gatherJSONColumns(Schema schema, TableWithMeta twm, int emp columns[i] = tbl.getColumn(index).incRefCount(); } } else { - try (Scalar s = Scalar.fromNull(types[i])) { - columns[i] = ColumnVector.fromScalar(s, rowCount); + if (types[i] == DType.LIST) { + Schema listSchema = schema.getChild(i); + Schema elementSchema = listSchema.getChild(0); + try (Scalar s = Scalar.listFromNull(elementSchema.asHostDataType())) { + columns[i] = ColumnVector.fromScalar(s, rowCount); + } + } else if (types[i] == DType.STRUCT) { + Schema structSchema = schema.getChild(i); + int numStructChildren = structSchema.getNumChildren(); + DataType[] structChildrenTypes = new DataType[numStructChildren]; + for (int j = 0; j < numStructChildren; j++) { + structChildrenTypes[j] = structSchema.getChild(j).asHostDataType(); + } + try (Scalar s = Scalar.structFromNull(structChildrenTypes)) { + columns[i] = ColumnVector.fromScalar(s, rowCount); + } + } else { + try (Scalar s = Scalar.fromNull(types[i])) { + columns[i] = ColumnVector.fromScalar(s, rowCount); + } } } }