Skip to content

Commit

Permalink
test(row-selection): add first few row selection tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sdd committed Aug 28, 2024
1 parent ad2931e commit 80c99f8
Showing 1 changed file with 181 additions and 2 deletions.
183 changes: 181 additions & 2 deletions crates/iceberg/src/expr/visitors/page_index_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ impl BoundPredicateVisitor for PageIndexEvaluator<'_> {
/// returned: NYYYYYNNYYNYN
///
/// This can be removed from here once RowSelection::union is in parquet::arrow
/// (Hopefully once https://github.com/apache/arrow-rs/pull/6308 gets merged)
fn union_row_selections(left: &RowSelection, right: &RowSelection) -> RowSelection {
let mut l_iter = left.iter().copied().peekable();
let mut r_iter = right.iter().copied().peekable();
Expand Down Expand Up @@ -840,9 +841,24 @@ fn union_row_selections(left: &RowSelection, right: &RowSelection) -> RowSelecti

#[cfg(test)]
mod tests {
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use std::collections::HashMap;
use std::sync::Arc;

use crate::expr::visitors::page_index_evaluator::union_row_selections;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::basic::{LogicalType as ParquetLogicalType, Type as ParquetPhysicalType};
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use parquet::file::page_index::index::{Index, NativeIndex};
use parquet::file::statistics::Statistics;
use parquet::format::{BoundaryOrder, PageLocation};
use parquet::schema::types::{
ColumnDescriptor, ColumnPath, SchemaDescriptor, Type as parquetSchemaType,
};

use super::{union_row_selections, PageIndexEvaluator};
// use rand::{thread_rng, Rng};
use crate::expr::{Bind, Reference};
use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
use crate::Result;

#[test]
fn test_union_row_selections() {
Expand Down Expand Up @@ -873,4 +889,167 @@ mod tests {
&RowSelector::select(40)
]);
}

#[test]
fn eval_matches_no_rows_for_empty_row_group() -> Result<()> {
let row_group_metadata = create_row_group_metadata(0, 0, None, 0, None)?;
let (column_index, offset_index) = create_page_index()?;

let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;

let filter = Reference::new("col_float")
.greater_than(Datum::float(1.0))
.bind(iceberg_schema_ref.clone(), false)?;

let result = PageIndexEvaluator::eval(
&filter,
&column_index,
&offset_index,
&row_group_metadata,
&field_id_map,
iceberg_schema_ref.as_ref(),
)?;

let expected = vec![];

assert_eq!(result, expected);

Ok(())
}

#[test]
fn eval_is_null_none_null_select_all_rows() -> Result<()> {
let row_group_metadata = create_row_group_metadata(0, 0, None, 0, None)?;
let (column_index, offset_index) = create_page_index()?;

let (iceberg_schema_ref, field_id_map) = build_iceberg_schema_and_field_map()?;

let filter = Reference::new("col_float")
.is_null()
.bind(iceberg_schema_ref.clone(), false)?;

let result = PageIndexEvaluator::eval(
&filter,
&column_index,
&offset_index,
&row_group_metadata,
&field_id_map,
iceberg_schema_ref.as_ref(),
)?;

let expected = vec![];

assert_eq!(result, expected);

Ok(())
}

fn build_iceberg_schema_and_field_map() -> Result<(Arc<Schema>, HashMap<i32, usize>)> {
let iceberg_schema = Schema::builder()
.with_fields([
Arc::new(NestedField::new(
1,
"col_float",
Type::Primitive(PrimitiveType::Float),
false,
)),
Arc::new(NestedField::new(
2,
"col_string",
Type::Primitive(PrimitiveType::String),
false,
)),
])
.build()?;
let iceberg_schema_ref = Arc::new(iceberg_schema);

let field_id_map = HashMap::from_iter([(1, 0), (2, 1)]);

Ok((iceberg_schema_ref, field_id_map))
}

fn build_parquet_schema_descriptor() -> Result<Arc<SchemaDescriptor>> {
let field_1 = Arc::new(
parquetSchemaType::primitive_type_builder("col_float", ParquetPhysicalType::FLOAT)
.with_id(Some(1))
.build()?,
);

let field_2 = Arc::new(
parquetSchemaType::primitive_type_builder(
"col_string",
ParquetPhysicalType::BYTE_ARRAY,
)
.with_id(Some(2))
.with_logical_type(Some(ParquetLogicalType::String))
.build()?,
);

let group_type = Arc::new(
parquetSchemaType::group_type_builder("all")
.with_id(Some(1000))
.with_fields(vec![field_1, field_2])
.build()?,
);

let schema_descriptor = SchemaDescriptor::new(group_type);
let schema_descriptor_arc = Arc::new(schema_descriptor);
Ok(schema_descriptor_arc)
}

fn create_row_group_metadata(
num_rows: i64,
col_1_num_vals: i64,
col_1_stats: Option<Statistics>,
col_2_num_vals: i64,
col_2_stats: Option<Statistics>,
) -> Result<RowGroupMetaData> {
let schema_descriptor_arc = build_parquet_schema_descriptor()?;

let column_1_desc_ptr = Arc::new(ColumnDescriptor::new(
schema_descriptor_arc.column(0).self_type_ptr(),
1,
1,
ColumnPath::new(vec!["col_float".to_string()]),
));

let column_2_desc_ptr = Arc::new(ColumnDescriptor::new(
schema_descriptor_arc.column(1).self_type_ptr(),
1,
1,
ColumnPath::new(vec!["col_string".to_string()]),
));

let mut col_1_meta =
ColumnChunkMetaData::builder(column_1_desc_ptr).set_num_values(col_1_num_vals);
if let Some(stats1) = col_1_stats {
col_1_meta = col_1_meta.set_statistics(stats1)
}

let mut col_2_meta =
ColumnChunkMetaData::builder(column_2_desc_ptr).set_num_values(col_2_num_vals);
if let Some(stats2) = col_2_stats {
col_2_meta = col_2_meta.set_statistics(stats2)
}

let row_group_metadata = RowGroupMetaData::builder(schema_descriptor_arc)
.set_num_rows(num_rows)
.set_column_metadata(vec![
col_1_meta.build()?,
// .set_statistics(Statistics::float(None, None, None, 1, false))
col_2_meta.build()?,
])
.build();

Ok(row_group_metadata?)
}

fn create_page_index() -> Result<(Vec<Index>, Vec<Vec<PageLocation>>)> {
let idx = Index::FLOAT(NativeIndex::<f32> {
indexes: vec![],
boundary_order: BoundaryOrder(0), // UNORDERED
});

Ok((vec![], vec![]))
}
}

0 comments on commit 80c99f8

Please sign in to comment.