diff --git a/crates/iceberg/src/expr/visitors/page_index_evaluator.rs b/crates/iceberg/src/expr/visitors/page_index_evaluator.rs index af20be0a4..18aeb5587 100644 --- a/crates/iceberg/src/expr/visitors/page_index_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/page_index_evaluator.rs @@ -246,10 +246,10 @@ impl<'a> PageIndexEvaluator<'a> { .zip(row_counts.iter()) .map(|(item, &row_count)| { predicate( - item.max.map(|val| { + item.min.map(|val| { Datum::new(field_type.clone(), PrimitiveLiteral::Boolean(val)) }), - item.min.map(|val| { + item.max.map(|val| { Datum::new(field_type.clone(), PrimitiveLiteral::Boolean(val)) }), PageNullCount::from_row_and_null_counts(row_count, item.null_count), @@ -262,10 +262,10 @@ impl<'a> PageIndexEvaluator<'a> { .zip(row_counts.iter()) .map(|(item, &row_count)| { predicate( - item.max - .map(|val| Datum::new(field_type.clone(), PrimitiveLiteral::Int(val))), item.min .map(|val| Datum::new(field_type.clone(), PrimitiveLiteral::Int(val))), + item.max + .map(|val| Datum::new(field_type.clone(), PrimitiveLiteral::Int(val))), PageNullCount::from_row_and_null_counts(row_count, item.null_count), ) }) @@ -276,10 +276,10 @@ impl<'a> PageIndexEvaluator<'a> { .zip(row_counts.iter()) .map(|(item, &row_count)| { predicate( - item.max - .map(|val| Datum::new(field_type.clone(), PrimitiveLiteral::Long(val))), item.min .map(|val| Datum::new(field_type.clone(), PrimitiveLiteral::Long(val))), + item.max + .map(|val| Datum::new(field_type.clone(), PrimitiveLiteral::Long(val))), PageNullCount::from_row_and_null_counts(row_count, item.null_count), ) }) @@ -312,13 +312,13 @@ impl<'a> PageIndexEvaluator<'a> { .zip(row_counts.iter()) .map(|(item, &row_count)| { predicate( - item.max.map(|val| { + item.min.map(|val| { Datum::new( field_type.clone(), PrimitiveLiteral::Double(OrderedFloat::from(val)), ) }), - item.min.map(|val| { + item.max.map(|val| { Datum::new( field_type.clone(), PrimitiveLiteral::Double(OrderedFloat::from(val)), diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 1c3361163..f5cbbcf06 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -913,7 +913,9 @@ mod tests { use std::fs::File; use std::sync::Arc; - use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray}; + use arrow_array::{ + ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, + }; use futures::{stream, TryStreamExt}; use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; use parquet::basic::Compression; @@ -1110,10 +1112,29 @@ mod tests { PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string(), )])), + arrow_schema::Field::new("dbl", arrow_schema::DataType::Float64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "5".to_string(), + )])), + arrow_schema::Field::new("i32", arrow_schema::DataType::Int32, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "6".to_string(), + )])), + arrow_schema::Field::new("i64", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "7".to_string(), + )])), + arrow_schema::Field::new("bool", arrow_schema::DataType::Boolean, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "8".to_string(), + )])), ]; Arc::new(arrow_schema::Schema::new(fields)) }; - // 4 columns: // x: [1, 1, 1, 1, ...] let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef; @@ -1136,8 +1157,34 @@ mod tests { values.append(vec!["Iceberg"; 512].as_mut()); let col4 = Arc::new(StringArray::from_iter_values(values)) as ArrayRef; - let to_write = - RecordBatch::try_new(schema.clone(), vec![col1, col2, col3, col4]).unwrap(); + // dbl: + let mut values = vec![100.0f64; 512]; + values.append(vec![150.0f64; 12].as_mut()); + values.append(vec![200.0f64; 500].as_mut()); + let col5 = Arc::new(Float64Array::from_iter_values(values)) as ArrayRef; + + // i32: + let mut values = vec![100i32; 512]; + values.append(vec![150i32; 12].as_mut()); + values.append(vec![200i32; 500].as_mut()); + let col6 = Arc::new(Int32Array::from_iter_values(values)) as ArrayRef; + + // i64: + let mut values = vec![100i64; 512]; + values.append(vec![150i64; 12].as_mut()); + values.append(vec![200i64; 500].as_mut()); + let col7 = Arc::new(Int64Array::from_iter_values(values)) as ArrayRef; + + // bool: + let mut values = vec![false; 512]; + values.append(vec![true; 512].as_mut()); + let values: BooleanArray = values.into(); + let col8 = Arc::new(values) as ArrayRef; + + let to_write = RecordBatch::try_new(schema.clone(), vec![ + col1, col2, col3, col4, col5, col6, col7, col8, + ]) + .unwrap(); // Write the Parquet files let props = WriterProperties::builder() @@ -1215,6 +1262,7 @@ mod tests { let table_scan = table .scan() .snapshot_id(3051729675574597004) + .with_row_selection_enabled(true) .build() .unwrap(); assert_eq!(table_scan.snapshot().snapshot_id(), 3051729675574597004); @@ -1226,7 +1274,13 @@ mod tests { fixture.setup_manifest_files().await; // Create table scan for current snapshot and plan files - let table_scan = fixture.table.scan().build().unwrap(); + let table_scan = fixture + .table + .scan() + .with_row_selection_enabled(true) + .build() + .unwrap(); + let mut tasks = table_scan .plan_files() .await @@ -1261,7 +1315,12 @@ mod tests { fixture.setup_manifest_files().await; // Create table scan for current snapshot and plan files - let table_scan = fixture.table.scan().build().unwrap(); + let table_scan = fixture + .table + .scan() + .with_row_selection_enabled(true) + .build() + .unwrap(); let batch_stream = table_scan.to_arrow().await.unwrap(); @@ -1279,7 +1338,12 @@ mod tests { fixture.setup_manifest_files().await; // Create table scan for current snapshot and plan files - let table_scan = fixture.table.scan().build().unwrap(); + let table_scan = fixture + .table + .scan() + .with_row_selection_enabled(true) + .build() + .unwrap(); let mut plan_task: Vec<_> = table_scan .plan_files() @@ -1312,7 +1376,13 @@ mod tests { fixture.setup_manifest_files().await; // Create table scan for current snapshot and plan files - let table_scan = fixture.table.scan().select(["x", "z"]).build().unwrap(); + let table_scan = fixture + .table + .scan() + .select(["x", "z"]) + .with_row_selection_enabled(true) + .build() + .unwrap(); let batch_stream = table_scan.to_arrow().await.unwrap(); @@ -1337,7 +1407,9 @@ mod tests { // Filter: y < 3 let mut builder = fixture.table.scan(); let predicate = Reference::new("y").less_than(Datum::long(3)); - builder = builder.with_filter(predicate); + builder = builder + .with_filter(predicate) + .with_row_selection_enabled(true); let table_scan = builder.build().unwrap(); let batch_stream = table_scan.to_arrow().await.unwrap(); @@ -1363,7 +1435,9 @@ mod tests { // Filter: y >= 5 let mut builder = fixture.table.scan(); let predicate = Reference::new("y").greater_than_or_equal_to(Datum::long(5)); - builder = builder.with_filter(predicate); + builder = builder + .with_filter(predicate) + .with_row_selection_enabled(true); let table_scan = builder.build().unwrap(); let batch_stream = table_scan.to_arrow().await.unwrap(); @@ -1381,6 +1455,106 @@ mod tests { assert_eq!(int64_arr.value(0), 5); } + #[tokio::test] + async fn test_filter_double_eq() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Filter: dbl == 150.0 + let mut builder = fixture.table.scan(); + let predicate = Reference::new("dbl").equal_to(Datum::double(150.0f64)); + builder = builder + .with_filter(predicate) + .with_row_selection_enabled(true); + let table_scan = builder.build().unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].num_rows(), 12); + + let col = batches[0].column_by_name("dbl").unwrap(); + let f64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(f64_arr.value(1), 150.0f64); + } + + #[tokio::test] + async fn test_filter_int_eq() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Filter: i32 == 150 + let mut builder = fixture.table.scan(); + let predicate = Reference::new("i32").equal_to(Datum::int(150i32)); + builder = builder + .with_filter(predicate) + .with_row_selection_enabled(true); + let table_scan = builder.build().unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].num_rows(), 12); + + let col = batches[0].column_by_name("i32").unwrap(); + let i32_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(i32_arr.value(1), 150i32); + } + + #[tokio::test] + async fn test_filter_long_eq() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Filter: i64 == 150 + let mut builder = fixture.table.scan(); + let predicate = Reference::new("i64").equal_to(Datum::long(150i64)); + builder = builder + .with_filter(predicate) + .with_row_selection_enabled(true); + let table_scan = builder.build().unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].num_rows(), 12); + + let col = batches[0].column_by_name("i64").unwrap(); + let i64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(i64_arr.value(1), 150i64); + } + + #[tokio::test] + async fn test_filter_bool_eq() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Filter: bool == true + let mut builder = fixture.table.scan(); + let predicate = Reference::new("bool").equal_to(Datum::bool(true)); + builder = builder + .with_filter(predicate) + .with_row_selection_enabled(true); + let table_scan = builder.build().unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].num_rows(), 512); + + let col = batches[0].column_by_name("bool").unwrap(); + let bool_arr = col.as_any().downcast_ref::().unwrap(); + assert!(bool_arr.value(1)); + } + #[tokio::test] async fn test_filter_on_arrow_is_null() { let mut fixture = TableTestFixture::new(); @@ -1389,7 +1563,9 @@ mod tests { // Filter: y is null let mut builder = fixture.table.scan(); let predicate = Reference::new("y").is_null(); - builder = builder.with_filter(predicate); + builder = builder + .with_filter(predicate) + .with_row_selection_enabled(true); let table_scan = builder.build().unwrap(); let batch_stream = table_scan.to_arrow().await.unwrap(); @@ -1406,7 +1582,9 @@ mod tests { // Filter: y is not null let mut builder = fixture.table.scan(); let predicate = Reference::new("y").is_not_null(); - builder = builder.with_filter(predicate); + builder = builder + .with_filter(predicate) + .with_row_selection_enabled(true); let table_scan = builder.build().unwrap(); let batch_stream = table_scan.to_arrow().await.unwrap(); @@ -1425,7 +1603,9 @@ mod tests { let predicate = Reference::new("y") .less_than(Datum::long(5)) .and(Reference::new("z").greater_than_or_equal_to(Datum::long(4))); - builder = builder.with_filter(predicate); + builder = builder + .with_filter(predicate) + .with_row_selection_enabled(true); let table_scan = builder.build().unwrap(); let batch_stream = table_scan.to_arrow().await.unwrap(); @@ -1459,7 +1639,9 @@ mod tests { let predicate = Reference::new("y") .less_than(Datum::long(5)) .or(Reference::new("z").greater_than_or_equal_to(Datum::long(4))); - builder = builder.with_filter(predicate); + builder = builder + .with_filter(predicate) + .with_row_selection_enabled(true); let table_scan = builder.build().unwrap(); let batch_stream = table_scan.to_arrow().await.unwrap(); @@ -1494,7 +1676,9 @@ mod tests { // Filter: a STARTSWITH "Ice" let mut builder = fixture.table.scan(); let predicate = Reference::new("a").starts_with(Datum::string("Ice")); - builder = builder.with_filter(predicate); + builder = builder + .with_filter(predicate) + .with_row_selection_enabled(true); let table_scan = builder.build().unwrap(); let batch_stream = table_scan.to_arrow().await.unwrap(); @@ -1516,7 +1700,9 @@ mod tests { // Filter: a NOT STARTSWITH "Ice" let mut builder = fixture.table.scan(); let predicate = Reference::new("a").not_starts_with(Datum::string("Ice")); - builder = builder.with_filter(predicate); + builder = builder + .with_filter(predicate) + .with_row_selection_enabled(true); let table_scan = builder.build().unwrap(); let batch_stream = table_scan.to_arrow().await.unwrap(); @@ -1539,7 +1725,9 @@ mod tests { let mut builder = fixture.table.scan(); let predicate = Reference::new("a").is_in([Datum::string("Sioux"), Datum::string("Iceberg")]); - builder = builder.with_filter(predicate); + builder = builder + .with_filter(predicate) + .with_row_selection_enabled(true); let table_scan = builder.build().unwrap(); let batch_stream = table_scan.to_arrow().await.unwrap(); @@ -1562,7 +1750,9 @@ mod tests { let mut builder = fixture.table.scan(); let predicate = Reference::new("a").is_not_in([Datum::string("Sioux"), Datum::string("Iceberg")]); - builder = builder.with_filter(predicate); + builder = builder + .with_filter(predicate) + .with_row_selection_enabled(true); let table_scan = builder.build().unwrap(); let batch_stream = table_scan.to_arrow().await.unwrap(); diff --git a/crates/iceberg/testdata/example_table_metadata_v2.json b/crates/iceberg/testdata/example_table_metadata_v2.json index cf9fef96d..35230966a 100644 --- a/crates/iceberg/testdata/example_table_metadata_v2.json +++ b/crates/iceberg/testdata/example_table_metadata_v2.json @@ -16,7 +16,11 @@ {"id": 1, "name": "x", "required": true, "type": "long"}, {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, {"id": 3, "name": "z", "required": true, "type": "long"}, - {"id": 4, "name": "a", "required": true, "type": "string"} + {"id": 4, "name": "a", "required": true, "type": "string"}, + {"id": 5, "name": "dbl", "required": true, "type": "double"}, + {"id": 6, "name": "i32", "required": true, "type": "int"}, + {"id": 7, "name": "i64", "required": true, "type": "long"}, + {"id": 8, "name": "bool", "required": true, "type": "boolean"} ] } ],