Skip to content

Commit

Permalink
GH-39332: [C++] Explicit error in ExecBatchBuilder when appending var…
Browse files Browse the repository at this point in the history
… length data exceeds offset limit (int32 max) (#39383)

### Rationale for this change

When appending var length data in `ExecBatchBuilder`, the offset is potentially to overflow if the batch contains 4GB data or more. This may further result in segmentation fault during the subsequent data content copying. For details, please refer to this comment: #39332 (comment).

The solution is let user to use the "large" counterpart data type to avoid the overflow, but we may need explicit error information when such overflow happens.

### What changes are included in this PR?

1. Detect the offset overflow in appending data in `ExecBatchBuilder` and explicitly throw.
2. Change the offset type from `uint32_t` to `int32_t` in `ExecBatchBuilder` and respects the `BinaryBuilder::memory_limit()` which is `2GB - 2B` as the rest part of the codebase.

### Are these changes tested?

UT included.

### Are there any user-facing changes?

No.

* Closes: #39332

Lead-authored-by: zanmato <zanmato1984@gmail.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Co-authored-by: Rossi(Ruoxi) Sun <zanmato1984@gmail.com>
Co-authored-by: Antoine Pitrou <pitrou@free.fr>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
3 people authored Jan 18, 2024
1 parent 1f5dece commit a2aa1c4
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 22 deletions.
47 changes: 27 additions & 20 deletions cpp/src/arrow/compute/light_array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <type_traits>

#include "arrow/util/bitmap_ops.h"
#include "arrow/util/int_util_overflow.h"
#include "arrow/util/macros.h"

namespace arrow {
namespace compute {
Expand Down Expand Up @@ -325,11 +327,10 @@ Status ResizableArrayData::ResizeVaryingLengthBuffer() {
column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie();

if (!column_metadata.is_fixed_length) {
int min_new_size = static_cast<int>(reinterpret_cast<const uint32_t*>(
buffers_[kFixedLengthBuffer]->data())[num_rows_]);
int64_t min_new_size = buffers_[kFixedLengthBuffer]->data_as<int32_t>()[num_rows_];
ARROW_DCHECK(var_len_buf_size_ > 0);
if (var_len_buf_size_ < min_new_size) {
int new_size = var_len_buf_size_;
int64_t new_size = var_len_buf_size_;
while (new_size < min_new_size) {
new_size *= 2;
}
Expand Down Expand Up @@ -465,12 +466,11 @@ void ExecBatchBuilder::Visit(const std::shared_ptr<ArrayData>& column, int num_r

if (!metadata.is_fixed_length) {
const uint8_t* ptr_base = column->buffers[2]->data();
const uint32_t* offsets =
reinterpret_cast<const uint32_t*>(column->buffers[1]->data()) + column->offset;
const int32_t* offsets = column->GetValues<int32_t>(1);
for (int i = 0; i < num_rows; ++i) {
uint16_t row_id = row_ids[i];
const uint8_t* field_ptr = ptr_base + offsets[row_id];
uint32_t field_length = offsets[row_id + 1] - offsets[row_id];
int32_t field_length = offsets[row_id + 1] - offsets[row_id];
process_value_fn(i, field_ptr, field_length);
}
} else {
Expand All @@ -480,7 +480,7 @@ void ExecBatchBuilder::Visit(const std::shared_ptr<ArrayData>& column, int num_r
const uint8_t* field_ptr =
column->buffers[1]->data() +
(column->offset + row_id) * static_cast<int64_t>(metadata.fixed_length);
process_value_fn(i, field_ptr, metadata.fixed_length);
process_value_fn(i, field_ptr, static_cast<int32_t>(metadata.fixed_length));
}
}
}
Expand Down Expand Up @@ -511,30 +511,30 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source
break;
case 1:
Visit(source, num_rows_to_append, row_ids,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
target->mutable_data(1)[num_rows_before + i] = *ptr;
});
break;
case 2:
Visit(
source, num_rows_to_append, row_ids,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
reinterpret_cast<uint16_t*>(target->mutable_data(1))[num_rows_before + i] =
*reinterpret_cast<const uint16_t*>(ptr);
});
break;
case 4:
Visit(
source, num_rows_to_append, row_ids,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
reinterpret_cast<uint32_t*>(target->mutable_data(1))[num_rows_before + i] =
*reinterpret_cast<const uint32_t*>(ptr);
});
break;
case 8:
Visit(
source, num_rows_to_append, row_ids,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
reinterpret_cast<uint64_t*>(target->mutable_data(1))[num_rows_before + i] =
*reinterpret_cast<const uint64_t*>(ptr);
});
Expand All @@ -544,7 +544,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source
num_rows_to_append -
NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
Visit(source, num_rows_to_process, row_ids,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
uint64_t* dst = reinterpret_cast<uint64_t*>(
target->mutable_data(1) +
static_cast<int64_t>(num_bytes) * (num_rows_before + i));
Expand All @@ -558,7 +558,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source
if (num_rows_to_append > num_rows_to_process) {
Visit(source, num_rows_to_append - num_rows_to_process,
row_ids + num_rows_to_process,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
uint64_t* dst = reinterpret_cast<uint64_t*>(
target->mutable_data(1) +
static_cast<int64_t>(num_bytes) *
Expand All @@ -575,16 +575,23 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source

// Step 1: calculate target offsets
//
uint32_t* offsets = reinterpret_cast<uint32_t*>(target->mutable_data(1));
uint32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before];
int32_t* offsets = reinterpret_cast<int32_t*>(target->mutable_data(1));
int32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before];
Visit(source, num_rows_to_append, row_ids,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
offsets[num_rows_before + i] = num_bytes;
});
for (int i = 0; i < num_rows_to_append; ++i) {
uint32_t length = offsets[num_rows_before + i];
int32_t length = offsets[num_rows_before + i];
offsets[num_rows_before + i] = sum;
sum += length;
int32_t new_sum_maybe_overflow = 0;
if (ARROW_PREDICT_FALSE(
arrow::internal::AddWithOverflow(sum, length, &new_sum_maybe_overflow))) {
return Status::Invalid("Overflow detected in ExecBatchBuilder when appending ",
num_rows_before + i + 1, "-th element of length ", length,
" bytes to current length ", sum, " bytes");
}
sum = new_sum_maybe_overflow;
}
offsets[num_rows_before + num_rows_to_append] = sum;

Expand All @@ -598,7 +605,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source
num_rows_to_append -
NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
Visit(source, num_rows_to_process, row_ids,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
uint64_t* dst = reinterpret_cast<uint64_t*>(target->mutable_data(2) +
offsets[num_rows_before + i]);
const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
Expand All @@ -608,7 +615,7 @@ Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source
}
});
Visit(source, num_rows_to_append - num_rows_to_process, row_ids + num_rows_to_process,
[&](int i, const uint8_t* ptr, uint32_t num_bytes) {
[&](int i, const uint8_t* ptr, int32_t num_bytes) {
uint64_t* dst = reinterpret_cast<uint64_t*>(
target->mutable_data(2) +
offsets[num_rows_before + num_rows_to_process + i]);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/light_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ class ARROW_EXPORT ResizableArrayData {
MemoryPool* pool_;
int num_rows_;
int num_rows_allocated_;
int var_len_buf_size_;
int64_t var_len_buf_size_;
static constexpr int kMaxBuffers = 3;
std::shared_ptr<ResizableBuffer> buffers_[kMaxBuffers];
};
Expand Down
64 changes: 64 additions & 0 deletions cpp/src/arrow/compute/light_array_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,70 @@ TEST(ExecBatchBuilder, AppendValuesBeyondLimit) {
ASSERT_EQ(0, pool->bytes_allocated());
}

TEST(ExecBatchBuilder, AppendVarLengthBeyondLimit) {
// GH-39332: check appending variable-length data past 2GB.
if constexpr (sizeof(void*) == 4) {
GTEST_SKIP() << "Test only works on 64-bit platforms";
}

std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
MemoryPool* pool = owned_pool.get();
constexpr auto eight_mb = 8 * 1024 * 1024;
constexpr auto eight_mb_minus_one = eight_mb - 1;
// String of size 8mb to repetitively fill the heading multiple of 8mbs of an array
// of int32_max bytes.
std::string str_8mb(eight_mb, 'a');
// String of size (8mb - 1) to be the last element of an array of int32_max bytes.
std::string str_8mb_minus_1(eight_mb_minus_one, 'b');
std::shared_ptr<Array> values_8mb = ConstantArrayGenerator::String(1, str_8mb);
std::shared_ptr<Array> values_8mb_minus_1 =
ConstantArrayGenerator::String(1, str_8mb_minus_1);

ExecBatch batch_8mb({values_8mb}, 1);
ExecBatch batch_8mb_minus_1({values_8mb_minus_1}, 1);

auto num_rows = std::numeric_limits<int32_t>::max() / eight_mb;
std::vector<uint16_t> body_row_ids(num_rows, 0);
std::vector<uint16_t> tail_row_id(1, 0);

{
// Building an array of (int32_max + 1) = (8mb * num_rows + 8mb) bytes should raise an
// error of overflow.
ExecBatchBuilder builder;
ASSERT_OK(builder.AppendSelected(pool, batch_8mb, num_rows, body_row_ids.data(),
/*num_cols=*/1));
std::stringstream ss;
ss << "Invalid: Overflow detected in ExecBatchBuilder when appending " << num_rows + 1
<< "-th element of length " << eight_mb << " bytes to current length "
<< eight_mb * num_rows << " bytes";
ASSERT_RAISES_WITH_MESSAGE(
Invalid, ss.str(),
builder.AppendSelected(pool, batch_8mb, 1, tail_row_id.data(),
/*num_cols=*/1));
}

{
// Building an array of int32_max = (8mb * num_rows + 8mb - 1) bytes should succeed.
ExecBatchBuilder builder;
ASSERT_OK(builder.AppendSelected(pool, batch_8mb, num_rows, body_row_ids.data(),
/*num_cols=*/1));
ASSERT_OK(builder.AppendSelected(pool, batch_8mb_minus_1, 1, tail_row_id.data(),
/*num_cols=*/1));
ExecBatch built = builder.Flush();
auto datum = built[0];
ASSERT_TRUE(datum.is_array());
auto array = datum.array_as<StringArray>();
ASSERT_EQ(array->length(), num_rows + 1);
for (int i = 0; i < num_rows; ++i) {
ASSERT_EQ(array->GetString(i), str_8mb);
}
ASSERT_EQ(array->GetString(num_rows), str_8mb_minus_1);
ASSERT_NE(0, pool->bytes_allocated());
}

ASSERT_EQ(0, pool->bytes_allocated());
}

TEST(KeyColumnArray, FromExecBatch) {
ExecBatch batch =
JSONToExecBatch({int64(), boolean()}, "[[1, true], [2, false], [null, null]]");
Expand Down
9 changes: 8 additions & 1 deletion cpp/src/arrow/testing/generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/string.h"

Expand Down Expand Up @@ -103,7 +104,13 @@ std::shared_ptr<arrow::Array> ConstantArrayGenerator::Float64(int64_t size,

std::shared_ptr<arrow::Array> ConstantArrayGenerator::String(int64_t size,
std::string value) {
return ConstantArray<StringType>(size, value);
using BuilderType = typename TypeTraits<StringType>::BuilderType;
auto type = TypeTraits<StringType>::type_singleton();
auto builder_fn = [&](BuilderType* builder) {
DCHECK_OK(builder->Append(std::string_view(value.data())));
};
return ArrayFromBuilderVisitor(type, value.size() * size, size, builder_fn)
.ValueOrDie();
}

std::shared_ptr<arrow::Array> ConstantArrayGenerator::Zeroes(
Expand Down

0 comments on commit a2aa1c4

Please sign in to comment.