Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add size statistics to ParquetMetaData introduced in PARQUET-2261 #5486

Closed
wants to merge 54 commits into from
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
a7e41c3
regen thrift with size statistics added
etseidl Feb 7, 2024
788eef3
first cut at adding page size statistics
etseidl Feb 9, 2024
6296ada
add new stats to chunk metadata test
etseidl Feb 16, 2024
84f3d7a
Merge branch 'apache:master' into size_stats
etseidl Mar 8, 2024
0da05a8
fix escapes
etseidl Mar 12, 2024
7301aeb
Merge remote-tracking branch 'origin/master' into size_stats
etseidl Mar 12, 2024
6e5fece
format
etseidl Mar 12, 2024
457eb4a
formatting
etseidl Mar 12, 2024
18a5732
add escapes
etseidl Mar 12, 2024
658512e
Merge remote-tracking branch 'origin/master' into size_stats
etseidl Mar 12, 2024
81c2b2e
Merge remote-tracking branch 'origin/master' into size_stats
etseidl Apr 29, 2024
29dde50
Merge branch 'size_stats' of github.com:etseidl/arrow-rs into size_stats
etseidl Jun 27, 2024
84f8512
Merge remote-tracking branch 'origin/master' into size_stats
etseidl Jun 27, 2024
9635e5e
add test of SizeStatistics.unencoded_byte_array_data_bytes
etseidl Jun 27, 2024
c5c07b6
test def histogram as well, rename test
etseidl Jun 27, 2024
6dd160f
add an assert
etseidl Jun 27, 2024
917b412
refactor and add test of def histogram with nulls
etseidl Jun 27, 2024
f8961a3
add test of repetition level histogram
etseidl Jun 28, 2024
73fa099
revert changes to test_roundtrip
etseidl Jun 28, 2024
00ca596
suggestion from review
etseidl Jul 1, 2024
6acc500
add to documentation as suggested in review
etseidl Jul 1, 2024
787e3e8
make histograms optional
etseidl Jul 2, 2024
46851f4
add histograms to PageIndex
etseidl Jul 2, 2024
4f8487b
use Vec::push()
etseidl Jul 2, 2024
903b06b
formatting
etseidl Jul 2, 2024
fa89836
check size stats in read metadata
etseidl Jul 2, 2024
2800cc7
check unencoded_byte_array_data_bytes is not set for int cols
etseidl Jul 2, 2024
95a0535
rewrite test_byte_array_size_statistics() to not use test_roundtrip()
etseidl Jul 2, 2024
fc66a59
add unencoded_byte_array_data_bytes support in page index
etseidl Jul 2, 2024
542570f
Merge remote-tracking branch 'origin/master' into size_stats
etseidl Jul 2, 2024
7be97e5
update expected sizes to account for new stats
etseidl Jul 2, 2024
f5ab47b
only write SizeStatistics in ColumnMetaData if statistics are enabled
etseidl Jul 3, 2024
a008e9e
add a little documentation
etseidl Jul 5, 2024
87ccec2
add ParquetOffsetIndex to avoid double read of OffsetIndex
etseidl Jul 5, 2024
3eead30
cleanup
etseidl Jul 5, 2024
ddf40c3
use less verbose update of variable_length_bytes
etseidl Jul 5, 2024
0ebb72f
add some documentation
etseidl Jul 6, 2024
393aea1
update to latest thrift (as of 11 Jul 2024) from parquet-format
etseidl Jul 11, 2024
1c12fb8
pass None for optional size statistics
etseidl Jul 11, 2024
53cd5fa
escape HTML tags
etseidl Jul 11, 2024
45f25a8
Merge remote-tracking branch 'origin/master' into size_stats
etseidl Jul 11, 2024
98025cc
don't need to escape brackets in arrays
etseidl Jul 11, 2024
7b59246
Merge remote-tracking branch 'github/update_parquet_thrift' into size…
etseidl Jul 11, 2024
65096dd
use consistent naming
etseidl Jul 11, 2024
08065ad
suggested doc changes
etseidl Jul 11, 2024
1cbd4b7
more suggested doc changes
etseidl Jul 11, 2024
dce3513
use more asserts in tests
etseidl Jul 11, 2024
f661839
move histogram logic into PageMetrics and ColumnMetrics
etseidl Jul 12, 2024
818a614
refactor some to reduce code duplication, finish docs
etseidl Jul 12, 2024
c391dec
account for new size statistics in heap size calculations
etseidl Jul 12, 2024
4816a95
add histogram examples to docs
etseidl Jul 12, 2024
e2faf2d
Merge remote-tracking branch 'origin/master' into size_stats
etseidl Jul 12, 2024
d92ae20
add some fixmes
etseidl Jul 14, 2024
69dd652
leave not to self
etseidl Jul 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ macro_rules! downcast_op {
struct FallbackEncoder {
encoder: FallbackEncoderImpl,
num_values: usize,
variable_length_bytes: i64,
}

/// The fallback encoder in use
Expand Down Expand Up @@ -152,6 +153,7 @@ impl FallbackEncoder {
Ok(Self {
encoder,
num_values: 0,
variable_length_bytes: 0,
})
}

Expand All @@ -168,7 +170,8 @@ impl FallbackEncoder {
let value = values.value(*idx);
let value = value.as_ref();
buffer.extend_from_slice((value.len() as u32).as_bytes());
buffer.extend_from_slice(value)
buffer.extend_from_slice(value);
self.variable_length_bytes += value.len() as i64;
}
}
FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
Expand All @@ -177,6 +180,7 @@ impl FallbackEncoder {
let value = value.as_ref();
lengths.put(&[value.len() as i32]).unwrap();
buffer.extend_from_slice(value);
self.variable_length_bytes += value.len() as i64;
}
}
FallbackEncoderImpl::Delta {
Expand Down Expand Up @@ -205,6 +209,7 @@ impl FallbackEncoder {
buffer.extend_from_slice(&value[prefix_length..]);
prefix_lengths.put(&[prefix_length as i32]).unwrap();
suffix_lengths.put(&[suffix_length as i32]).unwrap();
self.variable_length_bytes += value.len() as i64;
}
}
}
Expand Down Expand Up @@ -265,12 +270,16 @@ impl FallbackEncoder {
}
};

let var_bytes = Some(self.variable_length_bytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let var_bytes = Some(self.variable_length_bytes);
let variable_length_bytes = Some(self.variable_length_bytes);

Might be more consistent with the rest of the code. The same comment applies to several other uses below

self.variable_length_bytes = 0;

Ok(DataPageValues {
buf: buf.into(),
num_values: std::mem::take(&mut self.num_values),
encoding,
min_value,
max_value,
variable_length_bytes: var_bytes,
})
}
}
Expand Down Expand Up @@ -311,6 +320,7 @@ impl Storage for ByteArrayStorage {
struct DictEncoder {
interner: Interner<ByteArrayStorage>,
indices: Vec<u64>,
variable_length_bytes: i64,
}

impl DictEncoder {
Expand All @@ -326,6 +336,7 @@ impl DictEncoder {
let value = values.value(*idx);
let interned = self.interner.intern(value.as_ref());
self.indices.push(interned);
self.variable_length_bytes += value.as_ref().len() as i64;
}
}

Expand Down Expand Up @@ -370,12 +381,16 @@ impl DictEncoder {

self.indices.clear();

let var_bytes = Some(self.variable_length_bytes);
self.variable_length_bytes = 0;

DataPageValues {
buf: encoder.consume().into(),
num_values,
encoding: Encoding::RLE_DICTIONARY,
min_value,
max_value,
variable_length_bytes: var_bytes,
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct DataPageValues<T> {
pub encoding: Encoding,
pub min_value: Option<T>,
pub max_value: Option<T>,
pub variable_length_bytes: Option<i64>,
}

/// A generic encoder of [`ColumnValues`] to data and dictionary pages used by
Expand Down Expand Up @@ -124,6 +125,7 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
min_value: Option<T::T>,
max_value: Option<T::T>,
bloom_filter: Option<Sbbf>,
variable_length_bytes: Option<i64>,
}

impl<T: DataType> ColumnValueEncoderImpl<T> {
Expand All @@ -143,6 +145,11 @@ impl<T: DataType> ColumnValueEncoderImpl<T> {
update_min(&self.descr, &min, &mut self.min_value);
update_max(&self.descr, &max, &mut self.max_value);
}

if let Some(var_bytes) = T::T::variable_length_bytes(slice) {
self.variable_length_bytes =
Some(var_bytes + self.variable_length_bytes.unwrap_or(0));
etseidl marked this conversation as resolved.
Show resolved Hide resolved
}
}

// encode the values into bloom filter if enabled
Expand Down Expand Up @@ -196,6 +203,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
bloom_filter,
min_value: None,
max_value: None,
variable_length_bytes: None,
})
}

Expand Down Expand Up @@ -271,6 +279,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
num_values: std::mem::take(&mut self.num_values),
min_value: self.min_value.take(),
max_value: self.max_value.take(),
variable_length_bytes: self.variable_length_bytes.take(),
})
}
}
Expand Down
128 changes: 119 additions & 9 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ struct PageMetrics {
num_buffered_values: u32,
num_buffered_rows: u32,
num_page_nulls: u64,
repetition_level_histogram: Option<Vec<i64>>,
definition_level_histogram: Option<Vec<i64>>,
}

// Metrics per column writer
Expand All @@ -198,6 +200,9 @@ struct ColumnMetrics<T> {
max_column_value: Option<T>,
num_column_nulls: u64,
column_distinct_count: Option<u64>,
variable_length_bytes: Option<i64>,
repetition_level_histogram: Option<Vec<i64>>,
definition_level_histogram: Option<Vec<i64>>,
}

/// Typed column writer for a primitive column.
Expand Down Expand Up @@ -254,6 +259,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// Used for level information
encodings.insert(Encoding::RLE);

// histogram data is only collected if there is more than a single level and if
// page or chunk statistics are being collected
let new_histogram_vec = |max_level| {
if statistics_enabled == EnabledStatistics::None || max_level == 0 {
None
} else {
Some(vec![0; max_level as usize + 1])
}
};

let max_rep_level = descr.max_rep_level();
let max_def_level = descr.max_def_level();

Self {
descr,
props,
Expand All @@ -269,6 +287,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
num_buffered_values: 0,
num_buffered_rows: 0,
num_page_nulls: 0,
repetition_level_histogram: new_histogram_vec(max_rep_level),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading through the code in this PR, I think it is hard for me to convince myself that the fields in PageMetrics are set / updated appropriately. Part of this is because the modifications are happening inline rather than, for example, some encapsulated methods of PageMetrics

I realize the this PR follows the existing pattern of modifing the fields of PageMetrics directly, but I think the new code is sufficiently complex that it is worth more encapsulation

For example, I would find it easier to reason about this code with something like

let mut page_metrics = PageMetrics::new();
// Collect histograms if there is more than a single level and if
// page or chunk statistics are being collected
if statistics_enabled == EnabledStatistics::None || max_level == 0 {
  page_metrics = page_metrics
    .with_repetition_level_histograms(max_rep_level)
    .with_definition_level_histograms(max_def_level)
}
...

  Self {
            descr,
            props,
...
            page_metrics,
...
}

Likewise then below rather than inlining the update of repetition_level_histogram

     if let Some(ref mut def_hist) = self.page_metrics.definition_level_histogram {
                // Count values and update histogram
                for &level in levels {
                    process_def_level(level);
                    def_hist[level as usize] += 1;
                }
            } else {
                // Count values
                for &level in levels {
                    process_def_level(level);
                }
            }

It could be encapsulated into something like

     self.page_metrics.update_definition_level_histogram(levels);

definition_level_histogram: new_histogram_vec(max_def_level),
},
column_metrics: ColumnMetrics {
total_bytes_written: 0,
Expand All @@ -282,6 +302,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
max_column_value: None,
num_column_nulls: 0,
column_distinct_count: None,
variable_length_bytes: None,
repetition_level_histogram: new_histogram_vec(max_rep_level),
definition_level_histogram: new_histogram_vec(max_def_level),
},
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
Expand Down Expand Up @@ -513,12 +536,26 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
})?;

let mut values_to_write = 0;
for &level in levels {

let mut process_def_level = |level| {
if level == self.descr.max_def_level() {
values_to_write += 1;
} else {
// We must always compute this as it is used to populate v2 pages
self.page_metrics.num_page_nulls += 1
self.page_metrics.num_page_nulls += 1;
}
};

if let Some(ref mut def_hist) = self.page_metrics.definition_level_histogram {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same comment applies here about encapsulating the updates of page metrics -- I think it would reduce the repetition sigificantly

// Count values and update histogram
for &level in levels {
process_def_level(level);
def_hist[level as usize] += 1;
}
} else {
// Count values
for &level in levels {
process_def_level(level);
}
}

Expand All @@ -545,9 +582,17 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
));
}

// Count the occasions where we start a new row
for &level in levels {
self.page_metrics.num_buffered_rows += (level == 0) as u32
if let Some(ref mut rep_hist) = self.page_metrics.repetition_level_histogram {
// Count the occasions where we start a new row and update histogram
for &level in levels {
self.page_metrics.num_buffered_rows += (level == 0) as u32;
rep_hist[level as usize] += 1;
}
} else {
// Count the occasions where we start a new row
for &level in levels {
self.page_metrics.num_buffered_rows += (level == 0) as u32
}
}

self.rep_levels_sink.extend_from_slice(levels);
Expand Down Expand Up @@ -618,7 +663,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}

/// Update the column index and offset index when adding the data page
fn update_column_offset_index(&mut self, page_statistics: Option<&ValueStatistics<E::T>>) {
fn update_column_offset_index(
&mut self,
page_statistics: Option<&ValueStatistics<E::T>>,
page_variable_length_bytes: Option<i64>,
) {
// update the column index
let null_page =
(self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
Expand Down Expand Up @@ -689,9 +738,21 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}
}
}

// update histograms
if self.column_index_builder.valid() {
self.column_index_builder.append_histograms(
&self.page_metrics.repetition_level_histogram,
&self.page_metrics.definition_level_histogram,
);
}

// update the offset index
self.offset_index_builder
.append_row_count(self.page_metrics.num_buffered_rows as i64);

self.offset_index_builder
.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
}

/// Determine if we should allow truncating min/max values for this column's statistics
Expand Down Expand Up @@ -766,8 +827,50 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
_ => None,
};

if let Some(var_bytes) = values_data.variable_length_bytes {
self.column_metrics.variable_length_bytes =
Some(self.column_metrics.variable_length_bytes.unwrap_or(0) + var_bytes);
}

// update column and offset index
self.update_column_offset_index(page_statistics.as_ref());
self.update_column_offset_index(
page_statistics.as_ref(),
values_data.variable_length_bytes,
);

// collect page histograms into chunk histograms and zero out page histograms
// TODO(ets): This could instead just add the vectors, and then allow page_metrics to be reset
// below. Would then need to recreate the histogram vectors, so `new_histogram_vec` above
// would need to become a function.
if let Some(ref mut page_hist) = self.page_metrics.repetition_level_histogram {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you followed the suggestion above to move more of this logic into PageMetrics maybe thie code could look like

self.column_metrics.update_from_page(&self.page_metrics);
// reset metrics for a new page (zeros out buffered rows, resets histogram counts to 0)
self.page_metrics.new_page()
...

if let Some(ref mut chunk_hist) = self.column_metrics.repetition_level_histogram {
assert_eq!(chunk_hist.len(), page_hist.len());
for i in 0..page_hist.len() {
chunk_hist[i] += page_hist[i];
page_hist[i] = 0;
}
} else {
// this should never be reached, but zero out histogram just in case
for v in page_hist {
*v = 0;
}
}
}
if let Some(ref mut page_hist) = self.page_metrics.definition_level_histogram {
if let Some(ref mut chunk_hist) = self.column_metrics.definition_level_histogram {
assert_eq!(chunk_hist.len(), page_hist.len());
for i in 0..page_hist.len() {
chunk_hist[i] += page_hist[i];
page_hist[i] = 0;
}
} else {
// this should never be reached, but zero out histogram just in case
for v in page_hist {
*v = 0;
}
}
}

let page_statistics = page_statistics.map(Statistics::from);

let compressed_page = match self.props.writer_version() {
Expand Down Expand Up @@ -871,7 +974,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// Reset state.
self.rep_levels_sink.clear();
self.def_levels_sink.clear();
self.page_metrics = PageMetrics::default();

// don't clobber histogram vectors
self.page_metrics.num_buffered_values = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the previous formulation was easier to reason about (reset page metrics)

self.page_metrics.num_buffered_rows = 0;
self.page_metrics.num_page_nulls = 0;

Ok(())
}
Expand Down Expand Up @@ -914,7 +1021,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
.set_total_uncompressed_size(total_uncompressed_size)
.set_num_values(num_values)
.set_data_page_offset(data_page_offset)
.set_dictionary_page_offset(dict_page_offset);
.set_dictionary_page_offset(dict_page_offset)
.set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
.set_repetition_level_histogram(self.column_metrics.repetition_level_histogram.take())
.set_definition_level_histogram(self.column_metrics.definition_level_histogram.take());

if self.statistics_enabled != EnabledStatistics::None {
let backwards_compatible_min_max = self.descr.sort_order().is_signed();
Expand Down
11 changes: 11 additions & 0 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,13 @@ pub(crate) mod private {
(std::mem::size_of::<Self>(), 1)
}

/// Return the number of variable length bytes in a given slice of data
///
/// Returns the sum of lengths for BYTE_ARRAY data, and None for all other data types
fn variable_length_bytes(_: &[Self]) -> Option<i64> {
None
}

/// Return the value as i64 if possible
///
/// This is essentially the same as `std::convert::TryInto<i64>` but can't be
Expand Down Expand Up @@ -937,6 +944,10 @@ pub(crate) mod private {
Ok(num_values)
}

fn variable_length_bytes(values: &[Self]) -> Option<i64> {
Some(values.iter().map(|x| x.len() as i64).sum())
}

fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
let data = decoder
.data
Expand Down
Loading
Loading