-
Notifications
You must be signed in to change notification settings - Fork 796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add size statistics to ParquetMetaData
introduced in PARQUET-2261
#5486
Changes from 19 commits
a7e41c3
788eef3
6296ada
84f3d7a
0da05a8
7301aeb
6e5fece
457eb4a
18a5732
658512e
81c2b2e
29dde50
84f8512
9635e5e
c5c07b6
6dd160f
917b412
f8961a3
73fa099
00ca596
6acc500
787e3e8
46851f4
4f8487b
903b06b
fa89836
2800cc7
95a0535
fc66a59
542570f
7be97e5
f5ab47b
a008e9e
87ccec2
3eead30
ddf40c3
0ebb72f
393aea1
1c12fb8
53cd5fa
45f25a8
98025cc
7b59246
65096dd
08065ad
1cbd4b7
dce3513
f661839
818a614
c391dec
4816a95
e2faf2d
d92ae20
69dd652
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -183,6 +183,8 @@ struct PageMetrics { | |
num_buffered_values: u32, | ||
num_buffered_rows: u32, | ||
num_page_nulls: u64, | ||
repetition_level_histogram: Option<Vec<i64>>, | ||
definition_level_histogram: Option<Vec<i64>>, | ||
} | ||
|
||
// Metrics per column writer | ||
|
@@ -198,6 +200,9 @@ struct ColumnMetrics<T> { | |
max_column_value: Option<T>, | ||
num_column_nulls: u64, | ||
column_distinct_count: Option<u64>, | ||
variable_length_bytes: Option<i64>, | ||
repetition_level_histogram: Option<Vec<i64>>, | ||
definition_level_histogram: Option<Vec<i64>>, | ||
} | ||
|
||
/// Typed column writer for a primitive column. | ||
|
@@ -254,6 +259,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { | |
// Used for level information | ||
encodings.insert(Encoding::RLE); | ||
|
||
// histogram data is only collected if there is more than a single level and if | ||
// page or chunk statistics are being collected | ||
let new_histogram_vec = |max_level| { | ||
if statistics_enabled == EnabledStatistics::None || max_level == 0 { | ||
None | ||
} else { | ||
Some(vec![0; max_level as usize + 1]) | ||
} | ||
}; | ||
|
||
let max_rep_level = descr.max_rep_level(); | ||
let max_def_level = descr.max_def_level(); | ||
|
||
Self { | ||
descr, | ||
props, | ||
|
@@ -269,6 +287,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { | |
num_buffered_values: 0, | ||
num_buffered_rows: 0, | ||
num_page_nulls: 0, | ||
repetition_level_histogram: new_histogram_vec(max_rep_level), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reading through the code in this PR, I think it is hard for me to convince myself that the fields in PageMetrics are set / updated appropriately. Part of this is because the modifications are happening inline rather than, for example, some encapsulated methods of I realize the this PR follows the existing pattern of modifing the fields of PageMetrics directly, but I think the new code is sufficiently complex that it is worth more encapsulation For example, I would find it easier to reason about this code with something like let mut page_metrics = PageMetrics::new();
// Collect histograms if there is more than a single level and if
// page or chunk statistics are being collected
if statistics_enabled == EnabledStatistics::None || max_level == 0 {
page_metrics = page_metrics
.with_repetition_level_histograms(max_rep_level)
.with_definition_level_histograms(max_def_level)
}
...
Self {
descr,
props,
...
page_metrics,
...
} Likewise then below rather than inlining the update of if let Some(ref mut def_hist) = self.page_metrics.definition_level_histogram {
// Count values and update histogram
for &level in levels {
process_def_level(level);
def_hist[level as usize] += 1;
}
} else {
// Count values
for &level in levels {
process_def_level(level);
}
} It could be encapsulated into something like self.page_metrics.update_definition_level_histogram(levels); |
||
definition_level_histogram: new_histogram_vec(max_def_level), | ||
}, | ||
column_metrics: ColumnMetrics { | ||
total_bytes_written: 0, | ||
|
@@ -282,6 +302,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { | |
max_column_value: None, | ||
num_column_nulls: 0, | ||
column_distinct_count: None, | ||
variable_length_bytes: None, | ||
repetition_level_histogram: new_histogram_vec(max_rep_level), | ||
definition_level_histogram: new_histogram_vec(max_def_level), | ||
}, | ||
column_index_builder: ColumnIndexBuilder::new(), | ||
offset_index_builder: OffsetIndexBuilder::new(), | ||
|
@@ -513,12 +536,26 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { | |
})?; | ||
|
||
let mut values_to_write = 0; | ||
for &level in levels { | ||
|
||
let mut process_def_level = |level| { | ||
if level == self.descr.max_def_level() { | ||
values_to_write += 1; | ||
} else { | ||
// We must always compute this as it is used to populate v2 pages | ||
self.page_metrics.num_page_nulls += 1 | ||
self.page_metrics.num_page_nulls += 1; | ||
} | ||
}; | ||
|
||
if let Some(ref mut def_hist) = self.page_metrics.definition_level_histogram { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the same comment applies here about encapsulating the updates of page metrics -- I think it would reduce the repetition sigificantly |
||
// Count values and update histogram | ||
for &level in levels { | ||
process_def_level(level); | ||
def_hist[level as usize] += 1; | ||
} | ||
} else { | ||
// Count values | ||
for &level in levels { | ||
process_def_level(level); | ||
} | ||
} | ||
|
||
|
@@ -545,9 +582,17 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { | |
)); | ||
} | ||
|
||
// Count the occasions where we start a new row | ||
for &level in levels { | ||
self.page_metrics.num_buffered_rows += (level == 0) as u32 | ||
if let Some(ref mut rep_hist) = self.page_metrics.repetition_level_histogram { | ||
// Count the occasions where we start a new row and update histogram | ||
for &level in levels { | ||
self.page_metrics.num_buffered_rows += (level == 0) as u32; | ||
rep_hist[level as usize] += 1; | ||
} | ||
} else { | ||
// Count the occasions where we start a new row | ||
for &level in levels { | ||
self.page_metrics.num_buffered_rows += (level == 0) as u32 | ||
} | ||
} | ||
|
||
self.rep_levels_sink.extend_from_slice(levels); | ||
|
@@ -618,7 +663,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { | |
} | ||
|
||
/// Update the column index and offset index when adding the data page | ||
fn update_column_offset_index(&mut self, page_statistics: Option<&ValueStatistics<E::T>>) { | ||
fn update_column_offset_index( | ||
&mut self, | ||
page_statistics: Option<&ValueStatistics<E::T>>, | ||
page_variable_length_bytes: Option<i64>, | ||
) { | ||
// update the column index | ||
let null_page = | ||
(self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls; | ||
|
@@ -689,9 +738,21 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { | |
} | ||
} | ||
} | ||
|
||
// update histograms | ||
if self.column_index_builder.valid() { | ||
self.column_index_builder.append_histograms( | ||
&self.page_metrics.repetition_level_histogram, | ||
&self.page_metrics.definition_level_histogram, | ||
); | ||
} | ||
|
||
// update the offset index | ||
self.offset_index_builder | ||
.append_row_count(self.page_metrics.num_buffered_rows as i64); | ||
|
||
self.offset_index_builder | ||
.append_unencoded_byte_array_data_bytes(page_variable_length_bytes); | ||
} | ||
|
||
/// Determine if we should allow truncating min/max values for this column's statistics | ||
|
@@ -766,8 +827,50 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { | |
_ => None, | ||
}; | ||
|
||
if let Some(var_bytes) = values_data.variable_length_bytes { | ||
self.column_metrics.variable_length_bytes = | ||
Some(self.column_metrics.variable_length_bytes.unwrap_or(0) + var_bytes); | ||
} | ||
|
||
// update column and offset index | ||
self.update_column_offset_index(page_statistics.as_ref()); | ||
self.update_column_offset_index( | ||
page_statistics.as_ref(), | ||
values_data.variable_length_bytes, | ||
); | ||
|
||
// collect page histograms into chunk histograms and zero out page histograms | ||
// TODO(ets): This could instead just add the vectors, and then allow page_metrics to be reset | ||
// below. Would then need to recreate the histogram vectors, so `new_histogram_vec` above | ||
// would need to become a function. | ||
if let Some(ref mut page_hist) = self.page_metrics.repetition_level_histogram { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you followed the suggestion above to move more of this logic into self.column_metrics.update_from_page(&self.page_metrics);
// reset metrics for a new page (zeros out buffered rows, resets histogram counts to 0)
self.page_metrics.new_page()
... |
||
if let Some(ref mut chunk_hist) = self.column_metrics.repetition_level_histogram { | ||
assert_eq!(chunk_hist.len(), page_hist.len()); | ||
for i in 0..page_hist.len() { | ||
chunk_hist[i] += page_hist[i]; | ||
page_hist[i] = 0; | ||
} | ||
} else { | ||
// this should never be reached, but zero out histogram just in case | ||
for v in page_hist { | ||
*v = 0; | ||
} | ||
} | ||
} | ||
if let Some(ref mut page_hist) = self.page_metrics.definition_level_histogram { | ||
if let Some(ref mut chunk_hist) = self.column_metrics.definition_level_histogram { | ||
assert_eq!(chunk_hist.len(), page_hist.len()); | ||
for i in 0..page_hist.len() { | ||
chunk_hist[i] += page_hist[i]; | ||
page_hist[i] = 0; | ||
} | ||
} else { | ||
// this should never be reached, but zero out histogram just in case | ||
for v in page_hist { | ||
*v = 0; | ||
} | ||
} | ||
} | ||
|
||
let page_statistics = page_statistics.map(Statistics::from); | ||
|
||
let compressed_page = match self.props.writer_version() { | ||
|
@@ -871,7 +974,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { | |
// Reset state. | ||
self.rep_levels_sink.clear(); | ||
self.def_levels_sink.clear(); | ||
self.page_metrics = PageMetrics::default(); | ||
|
||
// don't clobber histogram vectors | ||
self.page_metrics.num_buffered_values = 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the previous formulation was easier to reason about (reset page metrics) |
||
self.page_metrics.num_buffered_rows = 0; | ||
self.page_metrics.num_page_nulls = 0; | ||
|
||
Ok(()) | ||
} | ||
|
@@ -914,7 +1021,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { | |
.set_total_uncompressed_size(total_uncompressed_size) | ||
.set_num_values(num_values) | ||
.set_data_page_offset(data_page_offset) | ||
.set_dictionary_page_offset(dict_page_offset); | ||
.set_dictionary_page_offset(dict_page_offset) | ||
.set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes) | ||
.set_repetition_level_histogram(self.column_metrics.repetition_level_histogram.take()) | ||
.set_definition_level_histogram(self.column_metrics.definition_level_histogram.take()); | ||
|
||
if self.statistics_enabled != EnabledStatistics::None { | ||
let backwards_compatible_min_max = self.descr.sort_order().is_signed(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be more consistent with the rest of the code. The same comment applies to several other uses below