Skip to content

Commit

Permalink
IPC format support for StringViewArray and BinaryViewArray (#5525)
Browse files Browse the repository at this point in the history
* check in ipc format for view types

* update tests

* fix variadic counting

* fix linting, address comments

* Apply suggestions from code review

Co-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>

* address some review comments

* update comments

* Add tests and fix bugs with dict types

* make clippy happy

* update test cases

---------

Co-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
4 people authored Apr 1, 2024
1 parent 9f36c88 commit 17058c7
Show file tree
Hide file tree
Showing 3 changed files with 304 additions and 19 deletions.
15 changes: 14 additions & 1 deletion arrow-ipc/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,10 @@ pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> Dat
}
}
crate::Type::Binary => DataType::Binary,
crate::Type::BinaryView => DataType::BinaryView,
crate::Type::LargeBinary => DataType::LargeBinary,
crate::Type::Utf8 => DataType::Utf8,
crate::Type::Utf8View => DataType::Utf8View,
crate::Type::LargeUtf8 => DataType::LargeUtf8,
crate::Type::FixedSizeBinary => {
let fsb = field.type_as_fixed_size_binary().unwrap();
Expand Down Expand Up @@ -548,7 +550,16 @@ pub(crate) fn get_fb_field_type<'a>(
.as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
},
BinaryView | Utf8View => unimplemented!("unimplemented"),
BinaryView => FBFieldType {
type_type: crate::Type::BinaryView,
type_: crate::BinaryViewBuilder::new(fbb).finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
},
Utf8View => FBFieldType {
type_type: crate::Type::Utf8View,
type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
},
Utf8 => FBFieldType {
type_type: crate::Type::Utf8,
type_: crate::Utf8Builder::new(fbb).finish().as_union_value(),
Expand Down Expand Up @@ -921,7 +932,9 @@ mod tests {
true,
),
Field::new("utf8", DataType::Utf8, false),
Field::new("utf8_view", DataType::Utf8View, false),
Field::new("binary", DataType::Binary, false),
Field::new("binary_view", DataType::BinaryView, false),
Field::new_list("list[u8]", Field::new("item", DataType::UInt8, false), true),
Field::new_fixed_size_list(
"fixed_size_list[u8]",
Expand Down
197 changes: 179 additions & 18 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod stream;
pub use stream::*;

use flatbuffers::{VectorIter, VerifierOptions};
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
Expand Down Expand Up @@ -64,14 +64,21 @@ fn read_buffer(

/// Coordinates reading arrays based on data types.
///
/// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView)
/// When encounter such types, we pop from the front of the queue to get the number of buffers to read.
///
/// Notes:
/// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls
/// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size.
/// We thus:
/// - check if the bit width of non-64-bit numbers is 64, and
/// - read the buffer as 64-bit (signed integer or float), and
/// - cast the 64-bit array to the appropriate data type
fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, ArrowError> {
fn create_array(
reader: &mut ArrayReader,
field: &Field,
variadic_counts: &mut VecDeque<i64>,
) -> Result<ArrayRef, ArrowError> {
let data_type = field.data_type();
match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
Expand All @@ -83,6 +90,18 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
reader.next_buffer()?,
],
),
BinaryView | Utf8View => {
let count = variadic_counts
.pop_front()
.ok_or(ArrowError::IpcError(format!(
"Missing variadic count for {data_type} column"
)))?;
let count = count + 2; // view and null buffer.
let buffers = (0..count)
.map(|_| reader.next_buffer())
.collect::<Result<Vec<_>, _>>()?;
create_primitive_array(reader.next_node(field)?, data_type, &buffers)
}
FixedSizeBinary(_) => create_primitive_array(
reader.next_node(field)?,
data_type,
Expand All @@ -91,13 +110,13 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => {
let list_node = reader.next_node(field)?;
let list_buffers = [reader.next_buffer()?, reader.next_buffer()?];
let values = create_array(reader, list_field)?;
let values = create_array(reader, list_field, variadic_counts)?;
create_list_array(list_node, data_type, &list_buffers, values)
}
FixedSizeList(ref list_field, _) => {
let list_node = reader.next_node(field)?;
let list_buffers = [reader.next_buffer()?];
let values = create_array(reader, list_field)?;
let values = create_array(reader, list_field, variadic_counts)?;
create_list_array(list_node, data_type, &list_buffers, values)
}
Struct(struct_fields) => {
Expand All @@ -109,7 +128,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
// TODO investigate whether just knowing the number of buffers could
// still work
for struct_field in struct_fields {
let child = create_array(reader, struct_field)?;
let child = create_array(reader, struct_field, variadic_counts)?;
struct_arrays.push((struct_field.clone(), child));
}
let null_count = struct_node.null_count() as usize;
Expand All @@ -123,8 +142,8 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
}
RunEndEncoded(run_ends_field, values_field) => {
let run_node = reader.next_node(field)?;
let run_ends = create_array(reader, run_ends_field)?;
let values = create_array(reader, values_field)?;
let run_ends = create_array(reader, run_ends_field, variadic_counts)?;
let values = create_array(reader, values_field, variadic_counts)?;

let run_array_length = run_node.length() as usize;
let data = ArrayData::builder(data_type.clone())
Expand Down Expand Up @@ -177,7 +196,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
let mut ids = Vec::with_capacity(fields.len());

for (id, field) in fields.iter() {
let child = create_array(reader, field)?;
let child = create_array(reader, field, variadic_counts)?;
children.push((field.as_ref().clone(), child));
ids.push(id);
}
Expand Down Expand Up @@ -230,6 +249,11 @@ fn create_primitive_array(
.null_bit_buffer(null_buffer)
.build_aligned()?
}
BinaryView | Utf8View => ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.null_bit_buffer(null_buffer)
.build_aligned()?,
_ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
// read 2 buffers: null buffer (optional) and data buffer
ArrayData::builder(data_type.clone())
Expand Down Expand Up @@ -328,7 +352,11 @@ impl<'a> ArrayReader<'a> {
})
}

fn skip_field(&mut self, field: &Field) -> Result<(), ArrowError> {
fn skip_field(
&mut self,
field: &Field,
variadic_count: &mut VecDeque<i64>,
) -> Result<(), ArrowError> {
self.next_node(field)?;

match field.data_type() {
Expand All @@ -337,30 +365,42 @@ impl<'a> ArrayReader<'a> {
self.skip_buffer()
}
}
Utf8View | BinaryView => {
let count = variadic_count
.pop_front()
.ok_or(ArrowError::IpcError(format!(
"Missing variadic count for {} column",
field.data_type()
)))?;
let count = count + 2; // view and null buffer.
for _i in 0..count {
self.skip_buffer()
}
}
FixedSizeBinary(_) => {
self.skip_buffer();
self.skip_buffer();
}
List(list_field) | LargeList(list_field) | Map(list_field, _) => {
self.skip_buffer();
self.skip_buffer();
self.skip_field(list_field)?;
self.skip_field(list_field, variadic_count)?;
}
FixedSizeList(list_field, _) => {
self.skip_buffer();
self.skip_field(list_field)?;
self.skip_field(list_field, variadic_count)?;
}
Struct(struct_fields) => {
self.skip_buffer();

// skip for each field
for struct_field in struct_fields {
self.skip_field(struct_field)?
self.skip_field(struct_field, variadic_count)?
}
}
RunEndEncoded(run_ends_field, values_field) => {
self.skip_field(run_ends_field)?;
self.skip_field(values_field)?;
self.skip_field(run_ends_field, variadic_count)?;
self.skip_field(values_field, variadic_count)?;
}
Dictionary(_, _) => {
self.skip_buffer(); // Nulls
Expand All @@ -375,7 +415,7 @@ impl<'a> ArrayReader<'a> {
};

for (_, field) in fields.iter() {
self.skip_field(field)?
self.skip_field(field, variadic_count)?
}
}
Null => {} // No buffer increases
Expand Down Expand Up @@ -403,6 +443,10 @@ pub fn read_record_batch(
let field_nodes = batch.nodes().ok_or_else(|| {
ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
})?;

let mut variadic_counts: VecDeque<i64> =
batch.variadicBufferCounts().into_iter().flatten().collect();

let batch_compression = batch.compression();
let compression = batch_compression
.map(|batch_compression| batch_compression.codec().try_into())
Expand All @@ -425,12 +469,13 @@ pub fn read_record_batch(
for (idx, field) in schema.fields().iter().enumerate() {
// Create array for projected field
if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
let child = create_array(&mut reader, field)?;
let child = create_array(&mut reader, field, &mut variadic_counts)?;
arrays.push((proj_idx, child));
} else {
reader.skip_field(field)?;
reader.skip_field(field, &mut variadic_counts)?;
}
}
assert!(variadic_counts.is_empty());
arrays.sort_by_key(|t| t.0);
RecordBatch::try_new_with_options(
Arc::new(schema.project(projection)?),
Expand All @@ -441,9 +486,10 @@ pub fn read_record_batch(
let mut children = vec![];
// keep track of index as lists require more than one node
for field in schema.fields() {
let child = create_array(&mut reader, field)?;
let child = create_array(&mut reader, field, &mut variadic_counts)?;
children.push(child);
}
assert!(variadic_counts.is_empty());
RecordBatch::try_new_with_options(schema, children, &options)
}
}
Expand Down Expand Up @@ -1759,6 +1805,121 @@ mod tests {
assert_eq!(input_batch, output_batch);
}

const LONG_TEST_STRING: &str =
"This is a long string to make sure binary view array handles it";

#[test]
fn test_roundtrip_view_types() {
let schema = Schema::new(vec![
Field::new("field_1", DataType::BinaryView, true),
Field::new("field_2", DataType::Utf8, true),
Field::new("field_3", DataType::Utf8View, true),
]);
let bin_values: Vec<Option<&[u8]>> = vec![
Some(b"foo"),
None,
Some(b"bar"),
Some(LONG_TEST_STRING.as_bytes()),
];
let utf8_values: Vec<Option<&str>> =
vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
let bin_view_array = BinaryViewArray::from_iter(bin_values);
let utf8_array = StringArray::from_iter(utf8_values.iter());
let utf8_view_array = StringViewArray::from_iter(utf8_values);
let record_batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(bin_view_array),
Arc::new(utf8_array),
Arc::new(utf8_view_array),
],
)
.unwrap();

assert_eq!(record_batch, roundtrip_ipc(&record_batch));
assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));

let sliced_batch = record_batch.slice(1, 2);
assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
}

#[test]
fn test_roundtrip_view_types_nested_dict() {
let bin_values: Vec<Option<&[u8]>> = vec![
Some(b"foo"),
None,
Some(b"bar"),
Some(LONG_TEST_STRING.as_bytes()),
Some(b"field"),
];
let utf8_values: Vec<Option<&str>> = vec![
Some("foo"),
None,
Some("bar"),
Some(LONG_TEST_STRING),
Some("field"),
];
let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));

let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
let keys_field = Arc::new(Field::new_dict(
"keys",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
true,
1,
false,
));

let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
let values_field = Arc::new(Field::new_dict(
"values",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
true,
2,
false,
));
let entry_struct = StructArray::from(vec![
(keys_field, make_array(key_dict_array.into_data())),
(values_field, make_array(value_dict_array.into_data())),
]);

let map_data_type = DataType::Map(
Arc::new(Field::new(
"entries",
entry_struct.data_type().clone(),
false,
)),
false,
);
let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
let map_data = ArrayData::builder(map_data_type)
.len(3)
.add_buffer(entry_offsets)
.add_child_data(entry_struct.into_data())
.build()
.unwrap();
let map_array = MapArray::from(map_data);

let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
let schema = Arc::new(Schema::new(vec![Field::new(
"f1",
dict_dict_array.data_type().clone(),
false,
)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
assert_eq!(batch, roundtrip_ipc(&batch));
assert_eq!(batch, roundtrip_ipc_stream(&batch));

let sliced_batch = batch.slice(1, 2);
assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
}

#[test]
fn test_no_columns_batch() {
let schema = Arc::new(Schema::empty());
Expand Down
Loading

0 comments on commit 17058c7

Please sign in to comment.