Skip to content

Conversation

klion26
Copy link
Member

@klion26 klion26 commented Jul 31, 2025

Which issue does this PR close?

This pr wants to optimize the logic of ObjectBuilder::finish

Rationale for this change

This pr wants to optimize the logic of ObjectBuilder::finish

What changes are included in this PR?

This PR wants to optimize ObjectBuilder::finish with packedu3 iterator

Are these changes tested?

This pr was covered by existing test

Are there any user-facing changes?

No

@github-actions github-actions bot added the parquet Changes to the parquet crate label Jul 31, 2025
@klion26
Copy link
Member Author

klion26 commented Jul 31, 2025

@alamb @scovich @viirya Please help review this when you're free. thanks.

I've tried to benchmark three implementations

  1. the current implementation with PackedU32Iterator
  2. the implementation with append_packed_u32 into a tmp buf, then splice into the parent's buffer like we did for ListBuilder::finish
append_packed_u32 into a tmp buf
   let num_fileds_size = if is_large { 4 } else { 1 }; // is_large: 4 bytes, else 1 byte.
    let header_size = 1 + // header byte (i.e., `object_header`)
        num_fileds_size + // num_fields_size
        (num_fields * id_size as usize) + // field IDs
        ((num_fields + 1) * offset_size as usize); // field offsets + data_size
    let mut bytes_to_splice = Vec::with_capacity(header_size + 3);
    // Write header byte
    let header = object_header(is_large, id_size, offset_size);
    bytes_to_splice.push(header);
    append_packed_u32(&mut bytes_to_splice, num_fields as u32, num_fileds_size);
    for field_id in self.fields.keys() {
        append_packed_u32(&mut bytes_to_splice, *field_id, id_size as usize);
    }
    for offset in self.fields.values() {
        append_packed_u32(&mut bytes_to_splice, *offset as u32, id_size as usize);
    }
    append_packed_u32(&mut bytes_to_splice, data_size as u32, offset_size as usize);
    let starting_offset = self.parent_value_offset_base;
    // Shift existing data to make room for the header
    let buffer = parent_buffer.inner_mut();
    buffer.splice(starting_offset..starting_offset, bytes_to_splice);
  1. the implementation with using PackedU32Iterator and extend, then splice into the iterator into the parent's buffer
PackedU32Iterator with extend
    let num_fields_bytes = num_fields.to_le_bytes();
    let num_elements_bytes = num_fields_bytes.iter().take(num_fileds_size).copied();
    let fields = PackedU32Iterator::new(
        id_size as usize,
        self.fields.keys().map(|offset| offset.to_le_bytes()),
    );
    let offsets = PackedU32Iterator::new(
        offset_size as usize,
        self.fields
            .values()
            .map(|offset| (*offset as u32).to_le_bytes()),
    );
    let data_size_bytes = (data_size as u32).to_le_bytes();
    let data_size_bytes_iter = data_size_bytes.iter().take(offset_size as usize).copied();
    let header = object_header(is_large, id_size, offset_size);
    let mut bytes_to_splice = vec![header];
    bytes_to_splice.extend(num_elements_bytes);
    bytes_to_splice.extend(fields);
    bytes_to_splice.extend(offsets);
    bytes_to_splice.extend(data_size_bytes_iter);
    let starting_offset = self.parent_value_offset_base;
    // Shift existing data to make room for the header
    let buffer = parent_buffer.inner_mut();
    buffer.splice(starting_offset..starting_offset, bytes_to_splice);

The results show that the first win(but not too much).

The results were generated by
1 Create three branches with the code
2 Execute the benchmark command one by one on the three branches and main
3 Execute critcmp to generate the result.

PackedU32Iterator

group                                                                7978_optimize_header_generatation_with_iterator    main
-----                                                                -----------------------------------------------    ----
batch_json_string_to_variant json_list 8k string                     1.00     28.4±1.05ms        ? ?/sec                1.03     29.1±0.97ms        ? ?/sec
batch_json_string_to_variant random_json(2633 bytes per document)    1.00    324.6±8.35ms        ? ?/sec                1.01    327.6±8.18ms        ? ?/sec
batch_json_string_to_variant repeated_struct 8k string               1.00     11.0±0.40ms        ? ?/sec                1.01     11.2±0.34ms        ? ?/sec
variant_get_primitive                                                1.00  1105.8±33.81µs        ? ?/sec                1.01  1121.0±43.28µs        ? ?/sec

append_packed_u32 into a tmp buf

group                                                                7978_optimize_header_generation_logic_object_builder    main
-----                                                                ----------------------------------------------------    ----
batch_json_string_to_variant json_list 8k string                     1.00     28.5±1.09ms        ? ?/sec                     1.02     29.1±0.97ms        ? ?/sec
batch_json_string_to_variant random_json(2633 bytes per document)    1.00    328.9±8.20ms        ? ?/sec                     1.00    327.6±8.18ms        ? ?/sec
batch_json_string_to_variant repeated_struct 8k string               1.00     11.3±0.31ms        ? ?/sec                     1.00     11.2±0.34ms        ? ?/sec
variant_get_primitive                                                1.00  1106.9±40.65µs        ? ?/sec                     1.01  1121.0±43.28µs        ? ?/sec

PackedU32Iterator with extend

group                                                                7978_optimizer_header_generation_with_extend    main
-----                                                                --------------------------------------------    ----
batch_json_string_to_variant json_list 8k string                     1.04     30.2±1.82ms        ? ?/sec             1.00     29.1±0.97ms        ? ?/sec
batch_json_string_to_variant random_json(2633 bytes per document)    1.06    345.8±9.52ms        ? ?/sec             1.00    327.6±8.18ms        ? ?/sec
batch_json_string_to_variant repeated_struct 8k string               1.12     12.5±0.77ms        ? ?/sec             1.00     11.2±0.34ms        ? ?/sec
variant_get_primitive                                                1.00  1123.7±50.11µs        ? ?/sec             1.00  1121.0±43.28µs        ? ?/sec

From the conversation of previous prs[1][2], I think we prefer that

  1. using iterator(or tmp buf) with splice, so that we won't mis-calculate the size we spliced into the parent's buffer
  2. we want to avoid tmp buf creation

For this and the benchmarks, I've some questions

  1. for the mis-calculated header problem, can we use ut to cover the header size calculation logic?
  2. for the tmp buf creation problem, from the pr in Remove redundant is_err checks in Variant tests #7897 , it would have a better performance if we use a tmp buf with append_packed_u32, is that the tmp buffer creation cost may or may not be the bottleneck, it depends on the workloads we're on. If this is the case, do we need to add some more benchmarks?(also do we need to bench for different header metadata such asis_large is true)
  3. the variant_kernel benchmark will run batch_json_string_to_variant random_json({} bytes per document) twice, is the first for warmup?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @klion26 -- I agree this appears somewhat complicated for what it is doing and doesn't really make a huge difference

I left some thoughts on specialization

Let me know what you think

cc @scovich

(if is_large { 4 } else { 1 }) + // num_fields
(num_fields * id_size as usize) + // field IDs
((num_fields + 1) * offset_size as usize); // field offsets + data_size
let num_fileds_size = if is_large { 4 } else { 1 }; // is_large: 4 bytes, else 1 byte.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable name seems to have a typo:

Suggested change
let num_fileds_size = if is_large { 4 } else { 1 }; // is_large: 4 bytes, else 1 byte.
let num_fields_size = if is_large { 4 } else { 1 }; // is_large: 4 bytes, else 1 byte.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also: I personally find the comment unhelpful -- it just restates the code.

/// An iterator that yields the bytes of a packed u32 iterator.
/// Will yield the first `packed_bytes` bytes of each item in the iterator.
struct PackedU32Iterator<T: Iterator<Item = [u8; 4]>> {
packed_bytes: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to get this really fast we will need to use generics based on the packed_bytes size -- so we would end up with different versions of the code for 1,2 and 4 byte offsets

We could try to make this particular iterator generic, but it might get a bit messy.

I was thinking maybe we can somehow structure the code so there is a function like this that writes the header for a certain offset size:

fn write_header<const SIZE: usize>(dst: &mut Vec<u8>, ...) {
  ...
}

Then we would basically have a switch like this to instantiate the appropriate versions (can probably avoid the panic)

match  int_size(max_id as usize) {
  1 => write_header::<1>(dst, ...),
  2 => write_header::<2>(dst, ...),
  4 => write_header::<4>(dst, ...),
  _ => panic!("unsupported size")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even just specializing the PackedU32Iterator with a const N: usize parameter would probably give most of the benefit, for this iterator-based approach?

However -- the temp buf append+truncate is fundamentally faster on a per-entry basis, because of how much simpler it is at machine code level. And it will run at exactly the same speed for all packing sizes, because there are no branches or even conditional moves based on the packing size (**). The only downside is the up-front cost of allocating said temp buffer first, which has to amortize across all entries.

(**) If the compiler were a bit smarter, it could even eliminate the bound checks in the loop's push calls (based on the result of the with_capacity that preceded it), and also eliminate the bounds checks in the loop's truncate calls (based on the fact that the arg passed to truncate is never larger than the vec size).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked into the variant implementation or this PR in detail, but does this iterator need a custom implementation or can it be an anonymous impl Iterator<Item=u8>? It looks like the implementation is basically iter.flat_map(u32::to_le_bytes). The benefit of that is that if the underlying iterator has a trusted length then the flat-mapped iterator is still trusted, and extending a slice from a TrustedLen iterator is much more efficient.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jhorstmann IIUC, the benefit of this iterator is size_hint, Rust can using the size_hint do do better staff, as it knows the size of the iterator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However -- the temp buf append+truncate is fundamentally faster on a per-entry basis, because of how much simpler it is at machine code level. And it will run at exactly the same speed for all packing sizes, because there are no branches or even conditional moves based on the packing size (**). The only downside is the up-front cost of allocating said temp buffer first, which has to amortize across all entries.

Yes I agree with this analysis.

This is why my opinion still remains that the way to avoid the allocation is to calculate the header size and shift the bytes first so we write directly to the target. I realize that is trickier code, but as @klion26 has said somewhere else I think we could write some good assertions and tests to avoid problems

When I next get a chance to work on Variant related things with time, I want to focus on unsticking the shredding implementation. Once that is done, I may return here to try and restructure things

One thought I had was to encapsulate the logic / templating in a

struct ListHeaderWriter {
  offsets: &[usize],
}

impl ListHeaderWriter {
  fn header_size::<const OFFSET_SIZE:usize>(&self) -> usize {
    // put the header size calculation here
  }
  //  write the header into dst starting at start_offset
  fn write(dst: &mut Vec<u8>, start_offset: usize)::<const OFFSET_SIZE:usize>(&self) {
   ... 
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you are right, FlatMap would only work for constant length arrays, but not dynamic slices with length based on packed_bytes. And TrustedLen optimization also can not kick in since the base iterator is from a crate outside the standard library.

@scovich
Copy link
Contributor

scovich commented Jul 31, 2025

I've tried to benchmark three implementations

Thanks for taking the time to do this!

The results show that the first win(but not too much).

Technically, the first two are equivalent because each result's mean is comfortably inside the other's confidence window. While it may be suggestive that the temp buf approach has a higher mean in all four results, any real difference is dwarfed by noise:

group PackedU32Iterator append into tmp buf
json_list 28.4±1.05ms 28.5±1.09ms
random_json 324.6±8.35ms 328.9±8.20m
repeated_struct 11.0±0.40ms 11.3±0.31ms
variant_get_primitive 1105.8±33.81µs 1106.9±40.65µ

(aside: why would variant_get_primitive case (= reading) even be relevant for variant building?)

Meanwhile, it's unsurprising in retrospect that the complex iterator approach would be slower than the append+truncate approach, given how simple (branchless!) the latter's machine code is.

for the mis-calculated header problem, can we use ut to cover the header size calculation logic?

I mean, yes we can. But given a choice to eliminate a bug surface entirely from the code base, vs. try to validate it with unit tests that could be incomplete and/or go out of sync? I almost always favor eliminating the possibility of bugs over testing for them -- unless the perf and/or complexity difference is large enough to make me question that default choice.

do we need to bench for different header metadata such as is_large is true?

If we care about the performance (and correctness) of that case, it seems like we need to have benchmarks and unit tests to cover it?

Copy link
Contributor

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this appears somewhat complicated for what it is doing and doesn't really make a huge difference

If we want to measure the cost of safety, it might be instructive to modify append_offset_array_start_from_buf_pos to take the same approach as append_packed_u32 in writing more bytes than we keep -- something like:

        let mut current_pos = start_pos;
        for relative_offset in offsets {
-           write_offset_at_pos(buf, current_pos, relative_offset, nbytes);
+           write_u32_at_pos(buf, current_pos, relative_offset as u32);
            current_pos += nbytes as usize;
        }

        // Write data_size
        if let Some(data_size) = data_size {
            // Write data_size at the end of the offsets
            write_offset_at_pos(buf, current_pos, data_size, nbytes);
            current_pos += nbytes as usize;
        }

where

// always writes the four LE bytes of a value, relying on the caller to only advance
// the buffer's position by the number of bytes that should be kept (1..=4). 
//
// WARNING: Caller must ensure that it's safe to produce as many as 3 extra bytes
// without corrupting the buffer. Usually by ensuring that something else overwrites
// the space immediately after.
fn write_u32_at_pos(buf: &mut [u8], start_pos: usize, value: u32) {
    let bytes = value.to_le_bytes();
    buf[start_pos..start_pos + bytes.len()].copy_from_slice(&bytes);
}

Highly dangerous -- it relies on the fact that the field array is followed by the value array, and the value array is followed followed by the data size (which is written in the safer/slower way) -- but we could at least see whether it gives enough performance boost to consider keeping it.

IMO, if want to keep the current unsafe approach that uses offset arithmetic in a pre-allocated buffer... we may as well go all the way and maximize the performance benefit we gain by giving up safety.

/// An iterator that yields the bytes of a packed u32 iterator.
/// Will yield the first `packed_bytes` bytes of each item in the iterator.
struct PackedU32Iterator<T: Iterator<Item = [u8; 4]>> {
packed_bytes: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even just specializing the PackedU32Iterator with a const N: usize parameter would probably give most of the benefit, for this iterator-based approach?

However -- the temp buf append+truncate is fundamentally faster on a per-entry basis, because of how much simpler it is at machine code level. And it will run at exactly the same speed for all packing sizes, because there are no branches or even conditional moves based on the packing size (**). The only downside is the up-front cost of allocating said temp buffer first, which has to amortize across all entries.

(**) If the compiler were a bit smarter, it could even eliminate the bound checks in the loop's push calls (based on the result of the with_capacity that preceded it), and also eliminate the bounds checks in the loop's truncate calls (based on the fact that the arg passed to truncate is never larger than the vec size).

(if is_large { 4 } else { 1 }) + // num_fields
(num_fields * id_size as usize) + // field IDs
((num_fields + 1) * offset_size as usize); // field offsets + data_size
let num_fileds_size = if is_large { 4 } else { 1 }; // is_large: 4 bytes, else 1 byte.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also: I personally find the comment unhelpful -- it just restates the code.

let header = object_header(is_large, id_size, offset_size);
let bytess_to_splice = std::iter::once(header)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let bytess_to_splice = std::iter::once(header)
let bytes_to_splice = std::iter::once(header)

@klion26
Copy link
Member Author

klion26 commented Aug 1, 2025

@alamb I think it might be better to stick with the current solution: 1) The solution in the current PR is more complex than the original, but the performance improvement isn't significant; 2) I originally hoped to keep the same implementation for ListBuilder/ObjectBuilder, but it seems that changes to ObjectBuilder might result in performance regressions.

@scovich

I mean, yes we can. But given a choice to eliminate a bug surface entirely from the code base, vs. try to validate it with unit tests that could be incomplete and/or go out of sync? I almost always favor eliminating the possibility of bugs over testing for them -- unless the perf and/or complexity difference is large enough to make me question that default choice

Thanks for your detailed response. I agree with your point. It's more reliable to fundamentally address the potential for errors. This way, even if there's no test to guarantee this logic for any reason in the future, it will continue to run correctly.

I'm a little confused about the benchmark. Why is PackedU32Iterator + extend the slowest? Is it because it calls multiple extend, which results in data movement? (using PackedU32Iterator allows extend to take advantage of size_hint).

For the unsafe code with write_u32_at_pos, I tried it, but it requires further changes; otherwise, it will panic in write_u32_at_pos because there is no more room. If we need to benchmark this, I'll try to make this change and benchmark it with current solution.

@alamb @scovich Finally, thank you for your detailed review and guidance on these PRs. I've learned a lot from them. Thank you again.

@alamb
Copy link
Contributor

alamb commented Aug 1, 2025

@alamb I think it might be better to stick with the current solution: 1) The solution in the current PR is more complex than the original, but the performance improvement isn't significant;

Yes I agree with this plan for now

@alamb @scovich Finally, thank you for your detailed review and guidance on these PRs. I've learned a lot from them. Thank you again.

Thank you again for all the help

@scovich
Copy link
Contributor

scovich commented Aug 1, 2025

Why is PackedU32Iterator + extend the slowest? Is it because it calls multiple extend, which results in data movement? (using PackedU32Iterator allows extend to take advantage of size_hint).

I suspect it's because the compiler can't "see through" the packed iterator like it can with the append+truncate approach. So the former produces 1-4 individual byte appends, each with its own bounds check and length change, while the latter produces a single 32-bit append followed by a truncate.

For the unsafe code with write_u32_at_pos, I tried it, but it requires further changes; otherwise, it will panic in write_u32_at_pos because there is no more room.

If you hit a buffer overflow panic, I think that means you tried to use the "fast" version to write the Some(data_size) entry, after the loop exits. Or did you hit a problem in the loop itself?

I think it might be better to stick with the current solution: 1) The solution in the current PR is more complex than the original, but the performance improvement isn't significant.

The same argument likely applies to the object builder. Should we consider reverting that back to match the list builder, for simplicity and maintainability? I'd have to go back and check the other PR, but was the perf improvement actually significant there?

If so=> why did it not work for list builder? Do we not have apples-to-apples benchmarking?

If not => probably good to revert.

@klion26
Copy link
Member Author

klion26 commented Aug 2, 2025

@scovich

If you hit a buffer overflow panic, I think that means you tried to use the "fast" version to write the Some(data_size) entry, after the loop exits. Or did you hit a problem in the loop itself?

It would panic in the following test when creating a variant with VariantBuilder::new.

let (m1, v1) = make_nested_object();
        let variant = Variant::new(&m1, &v1);

        // because we can guarantee metadata is validated through the builder
        let mut builder = VariantBuilder::new().with_metadata(VariantMetadata::new(&m1));
        builder.append_value(variant.clone()); <-- panic here because we have a buffer with size 6 and will try to copy 4 bytes to the buffer starting from 3

        let (metadata, value) = builder.finish();
        let result_variant = Variant::new(&metadata, &value);

        assert_eq!(variant, result_variant);

The same argument likely applies to the object builder. Should we consider reverting that back to match the list builder, for simplicity and maintainability? I'd have to go back and check the other PR, but was the perf improvement actually significant there?

Sorry, I've rechecked the current ObjectBuilder::finish logic(I misremembered the implementation in main branch before). I think either the current approach or aligning it with ListBuilder::finish is fine for me. The current approach has some possibility of generating bugs as @scovich said in the previous comment, but for now we have ut which ensures correctness. Aligning the logic with ListBuilder::finish will fundamentally ensure correctness(the benchmark difference is minimal). @alamb @scovich, I can modify the code if necessary.

header_pos = self
.parent_state
.buffer()
.append_offset_array_start_from_buf_pos(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR, but append_offset_array_start_from_buf_pos also writes field ids instead of just offsets.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change the function name after this pr finalized.

}

impl<T: Iterator<Item = [u8; 4]>> PackedU32Iterator<T> {
fn new(packed_bytes: usize, iterator: T) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add an assert on packed_bytes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out, adding assert is indeed a better behavior

buffer.splice(
starting_offset..starting_offset,
std::iter::repeat_n(0u8, header_size),
let fields = PackedU32Iterator::new(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly that the current approach looks more easy to understand at the first glance. This PackedU32Iterator approach is a bit over-abstraction to me. For appending the bytes into the buffer, I'd prefer to keep it simple instead of introducing a new abstraction on it if not too much benefits.

@scovich
Copy link
Contributor

scovich commented Aug 3, 2025

If you hit a buffer overflow panic, I think that means you tried to use the "fast" version to write the Some(data_size) entry, after the loop exits. Or did you hit a problem in the loop itself?

It would panic in the following test when creating a variant with VariantBuilder::new.

let (m1, v1) = make_nested_object();
        let variant = Variant::new(&m1, &v1);

        // because we can guarantee metadata is validated through the builder
        let mut builder = VariantBuilder::new().with_metadata(VariantMetadata::new(&m1));
        builder.append_value(variant.clone()); <-- panic here because we have a buffer with size 6 and will try to copy 4 bytes to the buffer starting from 3

Ah... I didn't think about that. With one-byte offsets, Some(data_size) (= one byte) isn't enough buffer to avoid overflow when writing 4 bytes. Drat.

@alamb alamb marked this pull request as draft August 5, 2025 19:28
@alamb
Copy link
Contributor

alamb commented Aug 5, 2025

Converting to a draft as I think we are leanting towards not merging this one and I want to clean up the PR queue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Variant] Optimize the object header generation logic in ObjectBuilder::finish
5 participants