Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate errors from Parquet reader kernels back to host #14167

Merged
merged 18 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,15 @@ static __device__ void gpuOutputGeneric(
* @param chunks List of column chunks
* @param min_row Row index to start reading at
* @param num_rows Maximum number of rows to read
* @param error_code Error code to set if an error is encountered
*/
template <int lvl_buf_size, typename level_t>
__global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(
PageInfo* pages, device_span<ColumnChunkDesc const> chunks, size_t min_row, size_t num_rows)
__global__ void __launch_bounds__(decode_block_size)
gpuDecodePageData(PageInfo* pages,
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
{
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(16)
Expand Down Expand Up @@ -472,7 +477,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(

// skipped_leaf_values will always be 0 for flat hierarchies.
uint32_t skipped_leaf_values = s->page.skipped_leaf_values;
while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
while (s->error == 0 &&
(s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
int target_pos;
int src_pos = s->src_pos;

Expand Down Expand Up @@ -596,6 +602,10 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(
}
__syncthreads();
}
if (t == 0 and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(s->error, cuda::std::memory_order_relaxed);
}
}

struct mask_tform {
Expand All @@ -621,6 +631,7 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand All @@ -629,11 +640,11 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
dim3 dim_grid(pages.size(), 1); // 1 threadblock per page

if (level_type_size == 1) {
gpuDecodePageData<rolling_buf_size, uint8_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodePageData<rolling_buf_size, uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
} else {
gpuDecodePageData<rolling_buf_size, uint16_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodePageData<rolling_buf_size, uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
}
}

Expand Down
57 changes: 40 additions & 17 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <io/utilities/block_utils.cuh>

#include <cuda/atomic>
#include <cuda/std/tuple>

namespace cudf::io::parquet::gpu {
Expand Down Expand Up @@ -69,6 +70,18 @@ struct page_state_s {
PageNestingDecodeInfo nesting_decode_cache[max_cacheable_nesting_decode_info]{};
// points to either nesting_decode_cache above when possible, or to the global source otherwise
PageNestingDecodeInfo* nesting_info{};

inline __device__ void set_error_code(decode_error err) volatile
{
cuda::atomic_ref<int32_t, cuda::thread_scope_block> ref{const_cast<int&>(error)};
ref.fetch_or(static_cast<int32_t>(err), cuda::std::memory_order_relaxed);
}

inline __device__ void reset_error_code() volatile
{
cuda::atomic_ref<int32_t, cuda::thread_scope_block> ref{const_cast<int&>(error)};
ref.store(0, cuda::std::memory_order_release);
}
};

// buffers only used in the decode kernel. separated from page_state_s to keep
Expand Down Expand Up @@ -471,7 +484,7 @@ __device__ void gpuDecodeStream(
int32_t value_count = s->lvl_count[lvl];
int32_t batch_coded_count = 0;

while (value_count < target_count && value_count < num_input_values) {
while (s->error == 0 && value_count < target_count && value_count < num_input_values) {
int batch_len;
if (level_run <= 1) {
// Get a new run symbol from the byte stream
Expand All @@ -487,7 +500,14 @@ __device__ void gpuDecodeStream(
cur++;
}
}
if (cur > end || level_run <= 1) { s->error = 0x10; }
if (cur > end) {
s->set_error_code(decode_error::LEVEL_STREAM_OVERRUN);
break;
}
if (level_run <= 1) {
s->set_error_code(decode_error::INVALID_LEVEL_RUN);
break;
}
sym_len = (int32_t)(cur - cur_def);
__threadfence_block();
}
Expand All @@ -496,7 +516,7 @@ __device__ void gpuDecodeStream(
level_run = shuffle(level_run);
cur_def += sym_len;
}
if (s->error) { break; }
if (s->error != 0) { break; }

batch_len = min(num_input_values - value_count, 32);
if (level_run & 1) {
Expand Down Expand Up @@ -852,7 +872,7 @@ __device__ void gpuDecodeLevels(page_state_s* s,

constexpr int batch_size = 32;
int cur_leaf_count = target_leaf_count;
while (!s->error && s->nz_count < target_leaf_count &&
while (s->error == 0 && s->nz_count < target_leaf_count &&
s->input_value_count < s->num_input_values) {
if (has_repetition) {
gpuDecodeStream<level_t, rolling_buf_size>(rep, s, cur_leaf_count, t, level_type::REPETITION);
Expand Down Expand Up @@ -916,7 +936,7 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s,
}
s->lvl_start[lvl] = cur;

if (cur > end) { s->error = 2; }
if (cur > end) { s->set_error_code(decode_error::LEVEL_STREAM_OVERRUN); }
};

// this is a little redundant. if level_bits == 0, then nothing should be encoded
Expand All @@ -941,8 +961,8 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s,
// add back the 4 bytes for the length
len += 4;
} else {
len = 0;
s->error = 2;
len = 0;
s->set_error_code(decode_error::LEVEL_STREAM_OVERRUN);
}
} else if (encoding == Encoding::BIT_PACKED) {
len = (s->page.num_input_values * level_bits + 7) >> 3;
Expand All @@ -951,8 +971,8 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s,
s->lvl_start[lvl] = cur;
s->abs_lvl_start[lvl] = cur;
} else {
s->error = 3;
len = 0;
len = 0;
s->set_error_code(decode_error::UNSUPPORTED_ENCODING);
}

s->abs_lvl_end[lvl] = start + len;
Expand Down Expand Up @@ -1094,7 +1114,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
}

if (!t) {
s->error = 0;
s->reset_error_code();

// IMPORTANT : nested schemas can have 0 rows in a page but still have
// values. The case is:
Expand Down Expand Up @@ -1152,7 +1172,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
break;
default: // FIXED_LEN_BYTE_ARRAY:
s->dtype_len = dtype_len_out;
s->error |= (s->dtype_len <= 0);
if (s->dtype_len <= 0) { s->set_error_code(decode_error::INVALID_DATA_TYPE); }
break;
}
// Special check for downconversions
Expand Down Expand Up @@ -1268,7 +1288,9 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
s->dict_run = 0;
s->dict_val = 0;
s->dict_bits = (cur < end) ? *cur++ : 0;
if (s->dict_bits > 32 || !s->dict_base) { s->error = (10 << 8) | s->dict_bits; }
if (s->dict_bits > 32 || !s->dict_base) {
s->set_error_code(decode_error::INVALID_DICT_WIDTH);
}
break;
case Encoding::PLAIN:
s->dict_size = static_cast<int32_t>(end - cur);
Expand All @@ -1279,22 +1301,23 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
// first 4 bytes are length of RLE data
int const len = (cur[0]) + (cur[1] << 8) + (cur[2] << 16) + (cur[3] << 24);
cur += 4;
if (cur + len > end) { s->error = 2; }
if (cur + len > end) { s->set_error_code(decode_error::DATA_STREAM_OVERRUN); }
s->dict_run = 0;
} break;
case Encoding::DELTA_BINARY_PACKED:
// nothing to do, just don't error
break;
default:
s->error = 1; // Unsupported encoding
default: {
s->set_error_code(decode_error::UNSUPPORTED_ENCODING);
break;
}
}
if (cur > end) { s->error = 1; }
if (cur > end) { s->set_error_code(decode_error::DATA_STREAM_OVERRUN); }
s->lvl_end = cur;
s->data_start = cur;
s->data_end = end;
} else {
s->error = 1;
s->set_error_code(decode_error::EMPTY_PAGE);
}

s->lvl_count[level_type::REPETITION] = 0;
Expand Down
25 changes: 18 additions & 7 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ namespace {
// with V2 page headers; see https://www.mail-archive.com/dev@parquet.apache.org/msg11826.html).
// this kernel only needs 96 threads (3 warps)(for now).
template <typename level_t>
__global__ void __launch_bounds__(96) gpuDecodeDeltaBinary(
PageInfo* pages, device_span<ColumnChunkDesc const> chunks, size_t min_row, size_t num_rows)
__global__ void __launch_bounds__(96)
gpuDecodeDeltaBinary(PageInfo* pages,
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
{
using cudf::detail::warp_size;
__shared__ __align__(16) delta_binary_decoder db_state;
Expand Down Expand Up @@ -79,7 +83,8 @@ __global__ void __launch_bounds__(96) gpuDecodeDeltaBinary(
// that has a value we need.
if (skipped_leaf_values > 0) { db->skip_values(skipped_leaf_values); }

while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
while (s->error == 0 &&
(s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
uint32_t target_pos;
uint32_t const src_pos = s->src_pos;

Expand Down Expand Up @@ -145,6 +150,11 @@ __global__ void __launch_bounds__(96) gpuDecodeDeltaBinary(
}
__syncthreads();
}

if (t == 0 and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(s->error, cuda::std::memory_order_relaxed);
}
}

} // anonymous namespace
Expand All @@ -157,6 +167,7 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand All @@ -165,11 +176,11 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages
dim3 dim_grid(pages.size(), 1); // 1 threadblock per page

if (level_type_size == 1) {
gpuDecodeDeltaBinary<uint8_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodeDeltaBinary<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
} else {
gpuDecodeDeltaBinary<uint16_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodeDeltaBinary<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
}
}

Expand Down
25 changes: 18 additions & 7 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -582,8 +582,12 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz
* @tparam level_t Type used to store decoded repetition and definition levels
*/
template <typename level_t>
__global__ void __launch_bounds__(decode_block_size) gpuDecodeStringPageData(
PageInfo* pages, device_span<ColumnChunkDesc const> chunks, size_t min_row, size_t num_rows)
__global__ void __launch_bounds__(decode_block_size)
gpuDecodeStringPageData(PageInfo* pages,
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
int32_t* error_code)
{
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(4) size_type last_offset;
Expand Down Expand Up @@ -617,7 +621,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodeStringPageData(

// skipped_leaf_values will always be 0 for flat hierarchies.
uint32_t skipped_leaf_values = s->page.skipped_leaf_values;
while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
while (s->error == 0 &&
(s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
int target_pos;
int src_pos = s->src_pos;

Expand Down Expand Up @@ -742,6 +747,11 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodeStringPageData(

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);

if (t == 0 and s->error != 0) {
cuda::atomic_ref<int32_t, cuda::thread_scope_device> ref{*error_code};
ref.fetch_or(s->error, cuda::std::memory_order_relaxed);
}
}

} // anonymous namespace
Expand Down Expand Up @@ -775,6 +785,7 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pa
size_t num_rows,
size_t min_row,
int level_type_size,
int32_t* error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");
Expand All @@ -783,11 +794,11 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pa
dim3 dim_grid(pages.size(), 1); // 1 threadblock per page

if (level_type_size == 1) {
gpuDecodeStringPageData<uint8_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodeStringPageData<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
} else {
gpuDecodeStringPageData<uint16_t>
<<<dim_grid, dim_block, 0, stream.value()>>>(pages.device_ptr(), chunks, min_row, num_rows);
gpuDecodeStringPageData<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
}
}

Expand Down
Loading