Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table Scan: Add Row Selection Filtering #565

Merged
merged 5 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ opendal = { workspace = true }
ordered-float = { workspace = true }
parquet = { workspace = true, features = ["async"] }
paste = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true }
rust_decimal = { workspace = true }
serde = { workspace = true }
Expand Down
95 changes: 92 additions & 3 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use fnv::FnvHashSet;
use futures::channel::mpsc::{channel, Sender};
use futures::future::BoxFuture;
use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter};
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection};
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
use parquet::file::metadata::ParquetMetaData;
Expand All @@ -41,6 +41,7 @@ use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::error::Result;
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
Expand All @@ -56,6 +57,7 @@ pub struct ArrowReaderBuilder {
file_io: FileIO,
concurrency_limit_data_files: usize,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
}

impl ArrowReaderBuilder {
Expand All @@ -68,6 +70,7 @@ impl ArrowReaderBuilder {
file_io,
concurrency_limit_data_files: num_cpus,
row_group_filtering_enabled: true,
row_selection_enabled: false,
}
}

Expand All @@ -90,13 +93,20 @@ impl ArrowReaderBuilder {
self
}

/// Determines whether to enable row selection.
pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) -> Self {
self.row_selection_enabled = row_selection_enabled;
self
}

/// Build the ArrowReader.
pub fn build(self) -> ArrowReader {
ArrowReader {
batch_size: self.batch_size,
file_io: self.file_io,
concurrency_limit_data_files: self.concurrency_limit_data_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
row_selection_enabled: self.row_selection_enabled,
}
}
}
Expand All @@ -111,6 +121,7 @@ pub struct ArrowReader {
concurrency_limit_data_files: usize,

row_group_filtering_enabled: bool,
row_selection_enabled: bool,
}

impl ArrowReader {
Expand All @@ -121,6 +132,7 @@ impl ArrowReader {
let batch_size = self.batch_size;
let concurrency_limit_data_files = self.concurrency_limit_data_files;
let row_group_filtering_enabled = self.row_group_filtering_enabled;
let row_selection_enabled = self.row_selection_enabled;

let (tx, rx) = channel(concurrency_limit_data_files);
let mut channel_for_error = tx.clone();
Expand All @@ -142,6 +154,7 @@ impl ArrowReader {
file_io,
tx,
row_group_filtering_enabled,
row_selection_enabled,
)
.await
})
Expand All @@ -168,6 +181,7 @@ impl ArrowReader {
file_io: FileIO,
mut tx: Sender<Result<RecordBatch>>,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
) -> Result<()> {
// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
Expand All @@ -176,11 +190,12 @@ impl ArrowReader {
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

let should_load_page_index = row_selection_enabled && task.predicate().is_some();

// Start creating the record batch stream, which wraps the parquet file reader
let mut record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(
parquet_file_reader,
// Page index will be required in upcoming row selection PR
ArrowReaderOptions::new().with_page_index(false),
ArrowReaderOptions::new().with_page_index(should_load_page_index),
)
.await?;

Expand Down Expand Up @@ -224,6 +239,19 @@ impl ArrowReader {
selected_row_groups = Some(result);
}

if row_selection_enabled {
let row_selection = Self::get_row_selection(
predicate,
record_batch_stream_builder.metadata(),
&selected_row_groups,
&field_id_map,
task.schema(),
)?;

record_batch_stream_builder =
record_batch_stream_builder.with_row_selection(row_selection);
}

if let Some(selected_row_groups) = selected_row_groups {
record_batch_stream_builder =
record_batch_stream_builder.with_row_groups(selected_row_groups);
Expand Down Expand Up @@ -377,6 +405,67 @@ impl ArrowReader {

Ok(results)
}

fn get_row_selection(
predicate: &BoundPredicate,
parquet_metadata: &Arc<ParquetMetaData>,
selected_row_groups: &Option<Vec<usize>>,
field_id_map: &HashMap<i32, usize>,
snapshot_schema: &Schema,
) -> Result<RowSelection> {
let Some(column_index) = parquet_metadata.column_index() else {
return Err(Error::new(
ErrorKind::Unexpected,
"Parquet file metadata does not contain a column index",
));
};

let Some(offset_index) = parquet_metadata.offset_index() else {
return Err(Error::new(
ErrorKind::Unexpected,
"Parquet file metadata does not contain an offset index",
));
};

let mut selected_row_groups_idx = 0;

let page_index = column_index
.iter()
.enumerate()
.zip(offset_index)
.zip(parquet_metadata.row_groups());

let mut results = Vec::new();
for (((idx, column_index), offset_index), row_group_metadata) in page_index {
if let Some(selected_row_groups) = selected_row_groups {
// skip row groups that aren't present in selected_row_groups
if idx == selected_row_groups[selected_row_groups_idx] {
selected_row_groups_idx += 1;
} else {
continue;
}
}

let selections_for_page = PageIndexEvaluator::eval(
predicate,
column_index,
offset_index,
row_group_metadata,
field_id_map,
snapshot_schema,
)?;

results.push(selections_for_page);

if let Some(selected_row_groups) = selected_row_groups {
if selected_row_groups_idx == selected_row_groups.len() {
break;
}
}
}

Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
}
}

/// Build the map of parquet field id to Parquet column index in the schema.
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/expr/visitors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ pub(crate) mod expression_evaluator;
pub(crate) mod inclusive_metrics_evaluator;
pub(crate) mod inclusive_projection;
pub(crate) mod manifest_evaluator;
pub(crate) mod page_index_evaluator;
pub(crate) mod row_group_metrics_evaluator;
Loading
Loading