- 
                Notifications
    You must be signed in to change notification settings 
- Fork 3.9k
GH-45334: [C++][Acero] Fix swiss join overflow issues in row offset calculation for fixed length and null masks #45336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e15d80b
              b752669
              06bcc5e
              2cdd4c2
              7f0ea14
              f2f3535
              9b1e908
              c004237
              18d8188
              ba24a03
              22d6b1e
              8897753
              ff4202b
              5e7f863
              c3b0ee7
              b93af5b
              5c0c857
              efe3c98
              53dc951
              8e97d6b
              c4d3959
              d4c2af3
              65c4003
              8355c68
              f7df7a4
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -477,14 +477,15 @@ void RowArrayMerge::CopyFixedLength(RowTableImpl* target, const RowTableImpl& so | |
| const int64_t* source_rows_permutation) { | ||
| int64_t num_source_rows = source.length(); | ||
|  | ||
| int64_t fixed_length = target->metadata().fixed_length; | ||
| uint32_t fixed_length = target->metadata().fixed_length; | ||
|  | ||
| // Permutation of source rows is optional. Without permutation all that is | ||
| // needed is memcpy. | ||
| // | ||
| if (!source_rows_permutation) { | ||
| memcpy(target->mutable_data(1) + fixed_length * first_target_row_id, source.data(1), | ||
| fixed_length * num_source_rows); | ||
| DCHECK_LE(first_target_row_id, std::numeric_limits<uint32_t>::max()); | ||
| memcpy(target->mutable_fixed_length_rows(static_cast<uint32_t>(first_target_row_id)), | ||
| source.fixed_length_rows(/*row_id=*/0), fixed_length * num_source_rows); | ||
| } else { | ||
| // Row length must be a multiple of 64-bits due to enforced alignment. | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unrelated, but that's an interesting piece of info. Doesn't it blow up memory use if there is a small number of very small columns? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes this is true. But IMHO this is acceptable because we also have other auxiliary data structures to aim the hash join so I wouldn't say this is very bad. | ||
| // Loop for each output row copying a fixed number of 64-bit words. | ||
|  | @@ -494,10 +495,13 @@ void RowArrayMerge::CopyFixedLength(RowTableImpl* target, const RowTableImpl& so | |
| int64_t num_words_per_row = fixed_length / sizeof(uint64_t); | ||
| for (int64_t i = 0; i < num_source_rows; ++i) { | ||
| int64_t source_row_id = source_rows_permutation[i]; | ||
| DCHECK_LE(source_row_id, std::numeric_limits<uint32_t>::max()); | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If that's always the case then are we wasting memory and CPU cache by having a 64-bit permutation array? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are. In fact I have another WIP branch trying to clean them up. | ||
| const uint64_t* source_row_ptr = reinterpret_cast<const uint64_t*>( | ||
| source.data(1) + fixed_length * source_row_id); | ||
| source.fixed_length_rows(static_cast<uint32_t>(source_row_id))); | ||
| int64_t target_row_id = first_target_row_id + i; | ||
| DCHECK_LE(target_row_id, std::numeric_limits<uint32_t>::max()); | ||
| uint64_t* target_row_ptr = reinterpret_cast<uint64_t*>( | ||
| target->mutable_data(1) + fixed_length * (first_target_row_id + i)); | ||
| target->mutable_fixed_length_rows(static_cast<uint32_t>(target_row_id))); | ||
|  | ||
| for (int64_t word = 0; word < num_words_per_row; ++word) { | ||
| target_row_ptr[word] = source_row_ptr[word]; | ||
|  | @@ -529,16 +533,16 @@ void RowArrayMerge::CopyVaryingLength(RowTableImpl* target, const RowTableImpl& | |
|  | ||
| // We can simply memcpy bytes of rows if their order has not changed. | ||
| // | ||
| memcpy(target->mutable_data(2) + target_offsets[first_target_row_id], source.data(2), | ||
| source_offsets[num_source_rows] - source_offsets[0]); | ||
| memcpy(target->mutable_var_length_rows() + target_offsets[first_target_row_id], | ||
| source.var_length_rows(), source_offsets[num_source_rows] - source_offsets[0]); | ||
| } else { | ||
| int64_t target_row_offset = first_target_row_offset; | ||
| uint64_t* target_row_ptr = | ||
| reinterpret_cast<uint64_t*>(target->mutable_data(2) + target_row_offset); | ||
| uint64_t* target_row_ptr = reinterpret_cast<uint64_t*>( | ||
| target->mutable_var_length_rows() + target_row_offset); | ||
| for (int64_t i = 0; i < num_source_rows; ++i) { | ||
| int64_t source_row_id = source_rows_permutation[i]; | ||
| const uint64_t* source_row_ptr = reinterpret_cast<const uint64_t*>( | ||
| source.data(2) + source_offsets[source_row_id]); | ||
| source.var_length_rows() + source_offsets[source_row_id]); | ||
| int64_t length = source_offsets[source_row_id + 1] - source_offsets[source_row_id]; | ||
| // Though the row offset is 64-bit, the length of a single row must be 32-bit as | ||
| // required by current row table implementation. | ||
|  | @@ -564,14 +568,18 @@ void RowArrayMerge::CopyNulls(RowTableImpl* target, const RowTableImpl& source, | |
| const int64_t* source_rows_permutation) { | ||
| int64_t num_source_rows = source.length(); | ||
| int num_bytes_per_row = target->metadata().null_masks_bytes_per_row; | ||
| uint8_t* target_nulls = target->null_masks() + num_bytes_per_row * first_target_row_id; | ||
| DCHECK_LE(first_target_row_id, std::numeric_limits<uint32_t>::max()); | ||
| uint8_t* target_nulls = | ||
| target->mutable_null_masks(static_cast<uint32_t>(first_target_row_id)); | ||
| if (!source_rows_permutation) { | ||
| memcpy(target_nulls, source.null_masks(), num_bytes_per_row * num_source_rows); | ||
| memcpy(target_nulls, source.null_masks(/*row_id=*/0), | ||
| num_bytes_per_row * num_source_rows); | ||
| } else { | ||
| for (int64_t i = 0; i < num_source_rows; ++i) { | ||
| for (uint32_t i = 0; i < num_source_rows; ++i) { | ||
| int64_t source_row_id = source_rows_permutation[i]; | ||
| DCHECK_LE(source_row_id, std::numeric_limits<uint32_t>::max()); | ||
| const uint8_t* source_nulls = | ||
| source.null_masks() + num_bytes_per_row * source_row_id; | ||
| source.null_masks(static_cast<uint32_t>(source_row_id)); | ||
| for (int64_t byte = 0; byte < num_bytes_per_row; ++byte) { | ||
| *target_nulls++ = *source_nulls++; | ||
| } | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were these size constraints already implied but not tested for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We assume row id to be
uint32_t(that means no more than 2^32 rows are allowed) almost everywhere, so this is implied. But there are still some places weirdly and unnecessarily usingint64_tas row id, here included.