Skip to content

Commit

Permalink
parquet: Make page_index/pushdown metrics consistent with row_group m…
Browse files Browse the repository at this point in the history
…etrics (#12545)

* parquet: Make page_index/pushdown metrics consistent with row_group metrics

1. Rename `{pushdown,page_index}_filtered` to `{pushdown,page_index}_pruned`
2. Add `{pushdown,page_index}_matched`

The latter makes it clearer in EXPLAIN ANALYZE when the Page Index is
not checked because their row groups were already eliminated
(with a Bloom Filter or row group statistics).

* Add missing metric definitions in the docs

Co-authored-by: Andrew Lamb <[email protected]>

* s/pass/select/

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
progval and alamb authored Sep 22, 2024
1 parent 3bd41bc commit 300a39b
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 31 deletions.
28 changes: 20 additions & 8 deletions datafusion/core/src/datasource/physical_plan/parquet/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,15 @@ pub struct ParquetFileMetrics {
/// Total number of bytes scanned
pub bytes_scanned: Count,
/// Total rows filtered out by predicates pushed into parquet scan
pub pushdown_rows_filtered: Count,
pub pushdown_rows_pruned: Count,
/// Total rows passed predicates pushed into parquet scan
pub pushdown_rows_matched: Count,
/// Total time spent evaluating pushdown filters
pub pushdown_eval_time: Time,
/// Total rows filtered out by parquet page index
pub page_index_rows_filtered: Count,
pub page_index_rows_pruned: Count,
/// Total rows passed through the parquet page index
pub page_index_rows_matched: Count,
/// Total time spent evaluating parquet page index filters
pub page_index_eval_time: Time,
}
Expand Down Expand Up @@ -80,16 +84,22 @@ impl ParquetFileMetrics {
.with_new_label("filename", filename.to_string())
.counter("bytes_scanned", partition);

let pushdown_rows_filtered = MetricBuilder::new(metrics)
let pushdown_rows_pruned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("pushdown_rows_filtered", partition);
.counter("pushdown_rows_pruned", partition);
let pushdown_rows_matched = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("pushdown_rows_matched", partition);

let pushdown_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("pushdown_eval_time", partition);
let page_index_rows_filtered = MetricBuilder::new(metrics)
let page_index_rows_pruned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("page_index_rows_pruned", partition);
let page_index_rows_matched = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("page_index_rows_filtered", partition);
.counter("page_index_rows_matched", partition);

let page_index_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
Expand All @@ -102,9 +112,11 @@ impl ParquetFileMetrics {
row_groups_matched_statistics,
row_groups_pruned_statistics,
bytes_scanned,
pushdown_rows_filtered,
pushdown_rows_pruned,
pushdown_rows_matched,
pushdown_eval_time,
page_index_rows_filtered,
page_index_rows_pruned,
page_index_rows_matched,
page_index_eval_time,
}
}
Expand Down
15 changes: 10 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,8 @@ mod tests {
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
let metrics = rt.parquet_exec.metrics().unwrap();
// Note there are were 6 rows in total (across three batches)
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 4);
assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 4);
assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 2);
}

#[tokio::test]
Expand Down Expand Up @@ -1325,7 +1326,8 @@ mod tests {
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
let metrics = rt.parquet_exec.metrics().unwrap();
// Note there are were 6 rows in total (across three batches)
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 5);
assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 1);
}

#[tokio::test]
Expand Down Expand Up @@ -1399,7 +1401,8 @@ mod tests {
// There are 4 rows pruned in each of batch2, batch3, and
// batch4 for a total of 12. batch1 had no pruning as c2 was
// filled in as null
assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 12);
assert_eq!(get_value(&metrics, "page_index_rows_pruned"), 12);
assert_eq!(get_value(&metrics, "page_index_rows_matched"), 6);
}

#[tokio::test]
Expand Down Expand Up @@ -1786,7 +1789,8 @@ mod tests {
"+-----+"
];
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 4);
assert_eq!(get_value(&metrics, "page_index_rows_pruned"), 4);
assert_eq!(get_value(&metrics, "page_index_rows_matched"), 2);
assert!(
get_value(&metrics, "page_index_eval_time") > 0,
"no eval time in metrics: {metrics:#?}"
Expand Down Expand Up @@ -1855,7 +1859,8 @@ mod tests {

// pushdown predicates have eliminated all 4 bar rows and the
// null row for 5 rows total
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 5);
assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 2);
assert!(
get_value(&metrics, "pushdown_eval_time") > 0,
"no eval time in metrics: {metrics:#?}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ impl PagePruningAccessPlanFilter {

// track the total number of rows that should be skipped
let mut total_skip = 0;
// track the total number of rows that should not be skipped
let mut total_select = 0;

// for each row group specified in the access plan
let row_group_indexes = access_plan.row_group_indexes();
Expand Down Expand Up @@ -242,8 +244,10 @@ impl PagePruningAccessPlanFilter {
if let Some(overall_selection) = overall_selection {
if overall_selection.selects_any() {
let rows_skipped = rows_skipped(&overall_selection);
trace!("Overall selection from predicate skipped {rows_skipped}: {overall_selection:?}");
let rows_selected = rows_selected(&overall_selection);
trace!("Overall selection from predicate skipped {rows_skipped}, selected {rows_selected}: {overall_selection:?}");
total_skip += rows_skipped;
total_select += rows_selected;
access_plan.scan_selection(row_group_index, overall_selection)
} else {
// Selection skips all rows, so skip the entire row group
Expand All @@ -258,7 +262,8 @@ impl PagePruningAccessPlanFilter {
}
}

file_metrics.page_index_rows_filtered.add(total_skip);
file_metrics.page_index_rows_pruned.add(total_skip);
file_metrics.page_index_rows_matched.add(total_select);
access_plan
}

Expand All @@ -276,6 +281,14 @@ fn rows_skipped(selection: &RowSelection) -> usize {
.fold(0, |acc, x| if x.skip { acc + x.row_count } else { acc })
}

/// returns the number of rows not skipped in the selection
/// TODO should this be upstreamed to RowSelection?
fn rows_selected(selection: &RowSelection) -> usize {
selection
.iter()
.fold(0, |acc, x| if x.skip { acc } else { acc + x.row_count })
}

fn update_selection(
current_selection: Option<RowSelection>,
row_selection: RowSelection,
Expand Down
24 changes: 17 additions & 7 deletions datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ pub(crate) struct DatafusionArrowPredicate {
/// Columns required to evaluate the expression in the arrow schema
projection: Vec<usize>,
/// how many rows were filtered out by this predicate
rows_filtered: metrics::Count,
rows_pruned: metrics::Count,
/// how many rows passed this predicate
rows_matched: metrics::Count,
/// how long was spent evaluating this predicate
time: metrics::Time,
/// used to perform type coercion while filtering rows
Expand All @@ -118,7 +120,8 @@ impl DatafusionArrowPredicate {
candidate: FilterCandidate,
schema: &Schema,
metadata: &ParquetMetaData,
rows_filtered: metrics::Count,
rows_pruned: metrics::Count,
rows_matched: metrics::Count,
time: metrics::Time,
schema_mapping: Arc<dyn SchemaMapper>,
) -> Result<Self> {
Expand All @@ -140,7 +143,8 @@ impl DatafusionArrowPredicate {
metadata.file_metadata().schema_descr(),
candidate.projection,
),
rows_filtered,
rows_pruned,
rows_matched,
time,
schema_mapping,
})
Expand All @@ -167,8 +171,10 @@ impl ArrowPredicate for DatafusionArrowPredicate {
.and_then(|v| v.into_array(batch.num_rows()))
.and_then(|array| {
let bool_arr = as_boolean_array(&array)?.clone();
let num_filtered = bool_arr.len() - bool_arr.true_count();
self.rows_filtered.add(num_filtered);
let num_matched = bool_arr.true_count();
let num_pruned = bool_arr.len() - num_matched;
self.rows_pruned.add(num_pruned);
self.rows_matched.add(num_matched);
timer.stop();
Ok(bool_arr)
})
Expand Down Expand Up @@ -523,7 +529,8 @@ pub fn build_row_filter(
file_metrics: &ParquetFileMetrics,
schema_mapping: Arc<dyn SchemaMapper>,
) -> Result<Option<RowFilter>> {
let rows_filtered = &file_metrics.pushdown_rows_filtered;
let rows_pruned = &file_metrics.pushdown_rows_pruned;
let rows_matched = &file_metrics.pushdown_rows_matched;
let time = &file_metrics.pushdown_eval_time;

// Split into conjuncts:
Expand Down Expand Up @@ -563,7 +570,8 @@ pub fn build_row_filter(
candidate,
file_schema,
metadata,
rows_filtered.clone(),
rows_pruned.clone(),
rows_matched.clone(),
time.clone(),
Arc::clone(&schema_mapping),
)
Expand Down Expand Up @@ -705,6 +713,7 @@ mod test {
&file_schema,
&metadata,
Count::new(),
Count::new(),
Time::new(),
Arc::clone(&schema_mapping),
)
Expand All @@ -728,6 +737,7 @@ mod test {
&file_schema,
&metadata,
Count::new(),
Count::new(),
Time::new(),
schema_mapping,
)
Expand Down
20 changes: 12 additions & 8 deletions datafusion/core/tests/parquet/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,24 +538,28 @@ impl<'a> TestCase<'a> {
PushdownExpected::None
};

let pushdown_rows_filtered = get_value(&metrics, "pushdown_rows_filtered");
println!(" pushdown_rows_filtered: {pushdown_rows_filtered}");
let pushdown_rows_pruned = get_value(&metrics, "pushdown_rows_pruned");
println!(" pushdown_rows_pruned: {pushdown_rows_pruned}");
let pushdown_rows_matched = get_value(&metrics, "pushdown_rows_matched");
println!(" pushdown_rows_matched: {pushdown_rows_matched}");

match pushdown_expected {
PushdownExpected::None => {
assert_eq!(pushdown_rows_filtered, 0, "{}", self.name);
assert_eq!(pushdown_rows_pruned, 0, "{}", self.name);
}
PushdownExpected::Some => {
assert!(
pushdown_rows_filtered > 0,
pushdown_rows_pruned > 0,
"{}: Expected to filter rows via pushdown, but none were",
self.name
);
}
};

let page_index_rows_filtered = get_value(&metrics, "page_index_rows_filtered");
println!(" page_index_rows_filtered: {page_index_rows_filtered}");
let page_index_rows_pruned = get_value(&metrics, "page_index_rows_pruned");
println!(" page_index_rows_pruned: {page_index_rows_pruned}");
let page_index_rows_matched = get_value(&metrics, "page_index_rows_matched");
println!(" page_index_rows_matched: {page_index_rows_matched}");

let page_index_filtering_expected = if scan_options.enable_page_index {
self.page_index_filtering_expected
Expand All @@ -567,11 +571,11 @@ impl<'a> TestCase<'a> {

match page_index_filtering_expected {
PageIndexFilteringExpected::None => {
assert_eq!(page_index_rows_filtered, 0);
assert_eq!(page_index_rows_pruned, 0);
}
PageIndexFilteringExpected::Some => {
assert!(
page_index_rows_filtered > 0,
page_index_rows_pruned > 0,
"Expected to filter rows via page index but none were",
);
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl TestOutput {

/// The number of row pages pruned
fn row_pages_pruned(&self) -> Option<usize> {
self.metric_value("page_index_rows_filtered")
self.metric_value("page_index_rows_pruned")
}

fn description(&self) -> String {
Expand Down
15 changes: 15 additions & 0 deletions docs/source/user-guide/explain-usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,21 @@ Again, reading from bottom up:
- `SortPreservingMergeExec`
- `output_rows=5`, `elapsed_compute=2.375µs`: Produced the final 5 rows in 2.375µs (microseconds)

When predicate pushdown is enabled, `ParquetExec` gains the following metrics:

- `page_index_rows_matched`: number of rows in pages that were tested by a page index filter, and passed
- `page_index_rows_pruned`: number of rows in pages that were tested by a page index filter, and did not pass
- `row_groups_matched_bloom_filter`: number of rows in row groups that were tested by a Bloom Filter, and passed
- `row_groups_pruned_bloom_filter`: number of rows in row groups that were tested by a Bloom Filter, and did not pass
- `row_groups_matched_statistics`: number of rows in row groups that were tested by row group statistics (min and max value), and passed
- `row_groups_pruned_statistics`: number of rows in row groups that were tested by row group statistics (min and max value), and did not pass
- `pushdown_rows_matched`: rows that were tested by any of the above filtered, and passed all of them (this should be minimum of `page_index_rows_matched`, `row_groups_pruned_bloom_filter`, and `row_groups_pruned_statistics`)
- `pushdown_rows_pruned`: rows that were tested by any of the above filtered, and did not pass one of them (this should be sum of `page_index_rows_matched`, `row_groups_pruned_bloom_filter`, and `row_groups_pruned_statistics`)
- `predicate_evaluation_errors`: number of times evaluating the filter expression failed (expected to be zero in normal operation)
- `num_predicate_creation_errors`: number of errors creating predicates (expected to be zero in normal operation)
- `pushdown_eval_time`: time spent evaluating these filters
- `page_index_eval_time`: time required to evaluate the page index filters

## Partitions and Execution

DataFusion determines the optimal number of cores to use as part of query
Expand Down

0 comments on commit 300a39b

Please sign in to comment.