Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for StringView and BinaryView statistics in StatisticsConverter #6181

Merged
173 changes: 164 additions & 9 deletions parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use crate::file::page_index::index::{Index, PageIndex};
use crate::file::statistics::Statistics as ParquetStatistics;
use crate::schema::types::SchemaDescriptor;
use arrow_array::builder::{
BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder,
BinaryViewBuilder, BooleanBuilder, FixedSizeBinaryBuilder, LargeStringBuilder, StringBuilder,
StringViewBuilder,
};
use arrow_array::{
new_empty_array, new_null_array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
Expand Down Expand Up @@ -446,14 +447,43 @@ macro_rules! get_statistics {
},
DataType::Dictionary(_, value_type) => {
[<$stat_type_prefix:lower _ statistics>](value_type, $iterator)
},
DataType::Utf8View => {
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = StringViewBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

let Ok(x) = std::str::from_utf8(x) else {
builder.append_null();
continue;
};

builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
},
DataType::BinaryView => {
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
let mut builder = BinaryViewBuilder::new();
for x in iterator {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

builder.append_value(x);
}
Ok(Arc::new(builder.finish()))
}

DataType::Map(_,_) |
DataType::Duration(_) |
DataType::Interval(_) |
DataType::Null |
DataType::BinaryView |
DataType::Utf8View |
DataType::List(_) |
DataType::ListView(_) |
DataType::FixedSizeList(_, _) |
Expand Down Expand Up @@ -919,7 +949,7 @@ macro_rules! get_data_page_statistics {
}
})
},
Some(DataType::FixedSizeBinary(size)) => {
Some(DataType::FixedSizeBinary(size)) => {
let mut builder = FixedSizeBinaryBuilder::new(*size);
let iterator = [<$stat_type_prefix FixedLenByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
Expand All @@ -943,7 +973,58 @@ macro_rules! get_data_page_statistics {
}
Ok(Arc::new(builder.finish()))
},
_ => unimplemented!()
Some(DataType::Utf8View) => {
let mut builder = StringViewBuilder::new();
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
for x in x.into_iter() {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

let Ok(x) = std::str::from_utf8(x.data()) else {
builder.append_null();
continue;
};

builder.append_value(x);
}
}
Ok(Arc::new(builder.finish()))
},
Some(DataType::BinaryView) => {
let mut builder = BinaryViewBuilder::new();
let iterator = [<$stat_type_prefix ByteArrayDataPageStatsIterator>]::new($iterator);
for x in iterator {
for x in x.into_iter() {
let Some(x) = x else {
builder.append_null(); // no statistics value
continue;
};

builder.append_value(x);
}
}
Ok(Arc::new(builder.finish()))
},
Some(DataType::Null) |
Some(DataType::Duration(_)) |
Some(DataType::Interval(_)) |
Some(DataType::List(_)) |
Some(DataType::ListView(_)) |
Some(DataType::FixedSizeList(_, _)) |
Some(DataType::LargeList(_)) |
Some(DataType::LargeListView(_)) |
Some(DataType::Struct(_)) |
Some(DataType::Union(_, _)) |
Some(DataType::Map(_, _)) |
Some(DataType::RunEndEncoded(_, _)) => {
let len = $iterator.count();
// don't know how to extract statistics, so return a null array
Ok(new_null_array($data_type.unwrap(), len))
},
None => unimplemented!() // not sure how to handle this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it confusing at first, but the unimplemented! macro in Rust panics if it is hit (rather than returning an Err that the application can respond to)

However, I spent some time looking into this code and I think it is always called with Some so this code is unreachable.

I'll make a follow on PR to remove it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I must have misunderstood the demand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sorry about that, I don't think I was clear about the issue either until I spent some time trying to remove it myself

}
}
}
Expand Down Expand Up @@ -1499,10 +1580,10 @@ mod test {
use arrow::datatypes::{i256, Date32Type, Date64Type};
use arrow::util::test_util::parquet_test_data;
use arrow_array::{
new_empty_array, new_null_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array,
Date64Array, Decimal128Array, Decimal256Array, Float32Array, Float64Array, Int16Array,
Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch, StringArray, StructArray,
TimestampNanosecondArray,
new_empty_array, new_null_array, Array, ArrayRef, BinaryArray, BinaryViewArray,
BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, Float32Array,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch,
StringArray, StringViewArray, StructArray, TimestampNanosecondArray,
};
use arrow_schema::{DataType, Field, SchemaRef};
use bytes::Bytes;
Expand Down Expand Up @@ -1916,6 +1997,65 @@ mod test {
.run()
}

#[test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that there are tests for statistics both here and in parquest/tests/arrow_reader/statistics.rs is very confusing. It would be aweosme to remove the duplication (maybe in a follow on PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great, I could do it in later days.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much -- I filed #6185 to track the idea and what is needed

fn roundtrip_string_view() {
Test {
input: string_view_array([
// row group 1
Some("A"),
None,
Some("Q"),
// row group 2
Some("ZZ"),
Some("A_longerthan12"),
None,
// row group 3
Some("A_longerthan12"),
None,
None,
]),
expected_min: string_view_array([
Some("A"),
Some("A_longerthan12"),
Some("A_longerthan12"),
]),
expected_max: string_view_array([Some("Q"), Some("ZZ"), Some("A_longerthan12")]),
}
.run()
}

#[test]
fn roundtrip_binary_view() {
let input: Vec<Option<&[u8]>> = vec![
// row group 1
Some(b"A"),
None,
Some(b"Q"),
// row group 2
Some(b"ZZ"),
Kev1n8 marked this conversation as resolved.
Show resolved Hide resolved
Some(b"A_longerthan12"),
None,
// row group 3
Some(b"A_longerthan12"),
None,
None,
];

let expected_min: Vec<Option<&[u8]>> =
vec![Some(b"A"), Some(b"A_longerthan12"), Some(b"A_longerthan12")];
let expected_max: Vec<Option<&[u8]>> =
vec![Some(b"Q"), Some(b"ZZ"), Some(b"A_longerthan12")];

let array = binary_view_array(input);

Test {
input: array,
expected_min: binary_view_array(expected_min),
expected_max: binary_view_array(expected_max),
}
.run()
}

#[test]
fn roundtrip_struct() {
let mut test = Test {
Expand Down Expand Up @@ -2539,4 +2679,19 @@ mod test {

Arc::new(array)
}

fn string_view_array<'a>(input: impl IntoIterator<Item = Option<&'a str>>) -> ArrayRef {
let array: StringViewArray = input
.into_iter()
.map(|s| s.map(|s| s.to_string()))
.collect();

Arc::new(array)
}

fn binary_view_array(input: Vec<Option<&[u8]>>) -> ArrayRef {
let array = BinaryViewArray::from(input.into_iter().collect::<Vec<Option<&[u8]>>>());

Arc::new(array)
}
}
55 changes: 48 additions & 7 deletions parquet/tests/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

use arrow_array::types::{Int32Type, Int8Type};
use arrow_array::{
Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
Decimal256Array, DictionaryArray, FixedSizeBinaryArray, Float16Array, Float32Array,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
LargeStringArray, RecordBatch, StringArray, StructArray, Time32MillisecondArray,
Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
Array, ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Decimal256Array, DictionaryArray, FixedSizeBinaryArray, Float16Array,
Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
LargeStringArray, RecordBatch, StringArray, StringViewArray, StructArray,
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow_buffer::i256;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
Expand Down Expand Up @@ -88,6 +88,8 @@ enum Scenario {
PeriodsInColumnNames,
StructArray,
UTF8,
UTF8View,
BinaryView,
}

fn make_boolean_batch(v: Vec<Option<bool>>) -> RecordBatch {
Expand Down Expand Up @@ -589,6 +591,16 @@ fn make_utf8_batch(value: Vec<Option<&str>>) -> RecordBatch {
.unwrap()
}

fn make_utf8_view_batch(value: Vec<Option<&str>>) -> RecordBatch {
let utf8_view = StringViewArray::from(value);
RecordBatch::try_from_iter(vec![("utf8_view", Arc::new(utf8_view) as _)]).unwrap()
}

fn make_binary_view_batch(value: Vec<Option<&[u8]>>) -> RecordBatch {
let binary_view = BinaryViewArray::from(value);
RecordBatch::try_from_iter(vec![("binary_view", Arc::new(binary_view) as _)]).unwrap()
}

fn make_dict_batch() -> RecordBatch {
let values = [
Some("abc"),
Expand Down Expand Up @@ -972,6 +984,35 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
make_utf8_batch(vec![Some("e"), Some("f"), Some("g"), Some("h"), Some("i")]),
]
}
Scenario::UTF8View => {
// Make utf8_view batch including string length <12 and >12 bytes
// as the internal representation of StringView is differed for strings
// shorter and longer than that length
vec![
make_utf8_view_batch(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]),
make_utf8_view_batch(vec![Some("a"), Some("e_longerthan12"), None, None, None]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

make_utf8_view_batch(vec![
Some("e_longerthan12"),
Some("f_longerthan12"),
Some("g_longerthan12"),
Some("h_longerthan12"),
Some("i_longerthan12"),
]),
]
}
Scenario::BinaryView => {
vec![
make_binary_view_batch(vec![Some(b"a"), Some(b"b"), Some(b"c"), Some(b"d"), None]),
make_binary_view_batch(vec![Some(b"a"), Some(b"e_longerthan12"), None, None, None]),
make_binary_view_batch(vec![
Some(b"e_longerthan12"),
Some(b"f_longerthan12"),
Some(b"g_longerthan12"),
Some(b"h_longerthan12"),
Some(b"i_longerthan12"),
]),
]
}
}
}

Expand Down
62 changes: 58 additions & 4 deletions parquet/tests/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use arrow::datatypes::{
TimestampNanosecondType, TimestampSecondType,
};
use arrow_array::{
make_array, new_null_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array,
Date64Array, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
make_array, new_null_array, Array, ArrayRef, BinaryArray, BinaryViewArray, BooleanArray,
Date32Array, Date64Array, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
LargeStringArray, RecordBatch, StringArray, Time32MillisecondArray, Time32SecondArray,
Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
LargeStringArray, RecordBatch, StringArray, StringViewArray, Time32MillisecondArray,
Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
};
Expand Down Expand Up @@ -2059,6 +2059,60 @@ async fn test_utf8() {
.run();
}

// UTF8View
#[tokio::test]
async fn test_utf8_view() {
let reader = TestReader {
scenario: Scenario::UTF8View,
row_per_group: 5,
}
.build()
.await;

// test for utf8_view
Test {
reader: &reader,
expected_min: Arc::new(StringViewArray::from(vec!["a", "a", "e_longerthan12"])),
expected_max: Arc::new(StringViewArray::from(vec![
"d",
"e_longerthan12",
"i_longerthan12",
])),
expected_null_counts: UInt64Array::from(vec![1, 3, 0]),
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])),
column_name: "utf8_view",
check: Check::Both,
}
.run()
}

// BinaryView
#[tokio::test]
async fn test_binary_view() {
let reader = TestReader {
scenario: Scenario::BinaryView,
row_per_group: 5,
}
.build()
.await;

let expected_min: Vec<Option<&[u8]>> = vec![Some(b"a"), Some(b"a"), Some(b"e_longerthan12")];
let expected_max: Vec<Option<&[u8]>> =
vec![Some(b"d"), Some(b"e_longerthan12"), Some(b"i_longerthan12")];

// test for utf8_view
Test {
reader: &reader,
expected_min: Arc::new(BinaryViewArray::from(expected_min)),
expected_max: Arc::new(BinaryViewArray::from(expected_max)),
expected_null_counts: UInt64Array::from(vec![1, 3, 0]),
expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])),
column_name: "binary_view",
check: Check::Both,
}
.run()
}

////// Files with missing statistics ///////

#[tokio::test]
Expand Down
Loading