Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IPC format support for StringViewArray and BinaryViewArray #5525

Merged
merged 11 commits into from
Apr 1, 2024
15 changes: 14 additions & 1 deletion arrow-ipc/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,10 @@ pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> Dat
}
}
crate::Type::Binary => DataType::Binary,
crate::Type::BinaryView => DataType::BinaryView,
crate::Type::LargeBinary => DataType::LargeBinary,
crate::Type::Utf8 => DataType::Utf8,
crate::Type::Utf8View => DataType::Utf8View,
crate::Type::LargeUtf8 => DataType::LargeUtf8,
crate::Type::FixedSizeBinary => {
let fsb = field.type_as_fixed_size_binary().unwrap();
Expand Down Expand Up @@ -543,7 +545,16 @@ pub(crate) fn get_fb_field_type<'a>(
.as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
},
BinaryView | Utf8View => unimplemented!("unimplemented"),
BinaryView => FBFieldType {
type_type: crate::Type::BinaryView,
type_: crate::BinaryViewBuilder::new(fbb).finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
},
Utf8View => FBFieldType {
type_type: crate::Type::Utf8View,
type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
},
Utf8 => FBFieldType {
type_type: crate::Type::Utf8,
type_: crate::Utf8Builder::new(fbb).finish().as_union_value(),
Expand Down Expand Up @@ -877,7 +888,9 @@ mod tests {
true,
),
Field::new("utf8", DataType::Utf8, false),
Field::new("utf8_view", DataType::Utf8View, false),
Field::new("binary", DataType::Binary, false),
Field::new("binary_view", DataType::BinaryView, false),
Field::new_list("list[u8]", Field::new("item", DataType::UInt8, false), true),
Field::new_fixed_size_list(
"fixed_size_list[u8]",
Expand Down
77 changes: 59 additions & 18 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! however the `FileReader` expects a reader that supports `Seek`ing

use flatbuffers::{VectorIter, VerifierOptions};
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
Expand Down Expand Up @@ -67,7 +67,11 @@ fn read_buffer(
/// - check if the bit width of non-64-bit numbers is 64, and
/// - read the buffer as 64-bit (signed integer or float), and
/// - cast the 64-bit array to the appropriate data type
fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, ArrowError> {
fn create_array(
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved
reader: &mut ArrayReader,
field: &Field,
variadic_counts: &mut VecDeque<i64>,
) -> Result<ArrayRef, ArrowError> {
let data_type = field.data_type();
match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
Expand All @@ -79,6 +83,16 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
reader.next_buffer()?,
],
),
BinaryView | Utf8View => {
let count = variadic_counts
.pop_front()
.ok_or(ArrowError::IpcError("Incorrect variadic count!".to_owned()))?;
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved
let count = count + 2; // view and null buffer.
let buffers = (0..count)
.map(|_| reader.next_buffer())
.collect::<Result<Vec<_>, _>>()?;
create_primitive_array(reader.next_node(field)?, data_type, &buffers)
alamb marked this conversation as resolved.
Show resolved Hide resolved
}
FixedSizeBinary(_) => create_primitive_array(
reader.next_node(field)?,
data_type,
Expand All @@ -87,13 +101,13 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => {
let list_node = reader.next_node(field)?;
let list_buffers = [reader.next_buffer()?, reader.next_buffer()?];
let values = create_array(reader, list_field)?;
let values = create_array(reader, list_field, variadic_counts)?;
create_list_array(list_node, data_type, &list_buffers, values)
}
FixedSizeList(ref list_field, _) => {
let list_node = reader.next_node(field)?;
let list_buffers = [reader.next_buffer()?];
let values = create_array(reader, list_field)?;
let values = create_array(reader, list_field, variadic_counts)?;
create_list_array(list_node, data_type, &list_buffers, values)
}
Struct(struct_fields) => {
Expand All @@ -105,7 +119,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
// TODO investigate whether just knowing the number of buffers could
// still work
for struct_field in struct_fields {
let child = create_array(reader, struct_field)?;
let child = create_array(reader, struct_field, variadic_counts)?;
struct_arrays.push((struct_field.clone(), child));
}
let null_count = struct_node.null_count() as usize;
Expand All @@ -119,8 +133,8 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
}
RunEndEncoded(run_ends_field, values_field) => {
let run_node = reader.next_node(field)?;
let run_ends = create_array(reader, run_ends_field)?;
let values = create_array(reader, values_field)?;
let run_ends = create_array(reader, run_ends_field, variadic_counts)?;
let values = create_array(reader, values_field, variadic_counts)?;

let run_array_length = run_node.length() as usize;
let data = ArrayData::builder(data_type.clone())
Expand Down Expand Up @@ -173,7 +187,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
let mut ids = Vec::with_capacity(fields.len());

for (id, field) in fields.iter() {
let child = create_array(reader, field)?;
let child = create_array(reader, field, variadic_counts)?;
children.push((field.as_ref().clone(), child));
ids.push(id);
}
Expand Down Expand Up @@ -226,6 +240,11 @@ fn create_primitive_array(
.null_bit_buffer(null_buffer)
.build_aligned()?
}
BinaryView | Utf8View => ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.null_bit_buffer(null_buffer)
.build_aligned()?,
_ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
// read 2 buffers: null buffer (optional) and data buffer
ArrayData::builder(data_type.clone())
Expand Down Expand Up @@ -324,7 +343,11 @@ impl<'a> ArrayReader<'a> {
})
}

fn skip_field(&mut self, field: &Field) -> Result<(), ArrowError> {
fn skip_field(
&mut self,
field: &Field,
variadic_count: &mut VecDeque<i64>,
) -> Result<(), ArrowError> {
self.next_node(field)?;

match field.data_type() {
Expand All @@ -333,30 +356,39 @@ impl<'a> ArrayReader<'a> {
self.skip_buffer()
}
}
Utf8View | BinaryView => {
let count = variadic_count
.pop_front()
.ok_or(ArrowError::IpcError("Incorrect variadic count!".to_owned()))?;
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved
let count = count + 2; // view and null buffer.
for _i in 0..count {
self.skip_buffer()
}
}
FixedSizeBinary(_) => {
self.skip_buffer();
self.skip_buffer();
}
List(list_field) | LargeList(list_field) | Map(list_field, _) => {
self.skip_buffer();
self.skip_buffer();
self.skip_field(list_field)?;
self.skip_field(list_field, variadic_count)?;
}
FixedSizeList(list_field, _) => {
self.skip_buffer();
self.skip_field(list_field)?;
self.skip_field(list_field, variadic_count)?;
}
Struct(struct_fields) => {
self.skip_buffer();

// skip for each field
for struct_field in struct_fields {
self.skip_field(struct_field)?
self.skip_field(struct_field, variadic_count)?
}
}
RunEndEncoded(run_ends_field, values_field) => {
self.skip_field(run_ends_field)?;
self.skip_field(values_field)?;
self.skip_field(run_ends_field, variadic_count)?;
self.skip_field(values_field, variadic_count)?;
}
Dictionary(_, _) => {
self.skip_buffer(); // Nulls
Expand All @@ -371,7 +403,7 @@ impl<'a> ArrayReader<'a> {
};

for (_, field) in fields.iter() {
self.skip_field(field)?
self.skip_field(field, variadic_count)?
}
}
Null => {} // No buffer increases
Expand Down Expand Up @@ -399,6 +431,13 @@ pub fn read_record_batch(
let field_nodes = batch.nodes().ok_or_else(|| {
ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
})?;

let mut variadic_counts: VecDeque<i64> = if let Some(v) = batch.variadicBufferCounts() {
v.iter().collect()
} else {
VecDeque::default()
};
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved

let batch_compression = batch.compression();
let compression = batch_compression
.map(|batch_compression| batch_compression.codec().try_into())
Expand All @@ -421,12 +460,13 @@ pub fn read_record_batch(
for (idx, field) in schema.fields().iter().enumerate() {
// Create array for projected field
if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
let child = create_array(&mut reader, field)?;
let child = create_array(&mut reader, field, &mut variadic_counts)?;
arrays.push((proj_idx, child));
} else {
reader.skip_field(field)?;
reader.skip_field(field, &mut variadic_counts)?;
}
}
assert!(variadic_counts.is_empty());
arrays.sort_by_key(|t| t.0);
RecordBatch::try_new_with_options(
Arc::new(schema.project(projection)?),
Expand All @@ -437,9 +477,10 @@ pub fn read_record_batch(
let mut children = vec![];
// keep track of index as lists require more than one node
for field in schema.fields() {
let child = create_array(&mut reader, field)?;
let child = create_array(&mut reader, field, &mut variadic_counts)?;
children.push(child);
}
assert!(variadic_counts.is_empty());
RecordBatch::try_new_with_options(schema, children, &options)
}
}
Expand Down
Loading
Loading