From 06e06102ccd961cf2384bb5468e5e0813aecb1e8 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Sun, 17 Mar 2024 21:30:15 +0000 Subject: [PATCH 01/10] check in ipc format for view types --- arrow-ipc/src/convert.rs | 15 +++++- arrow-ipc/src/reader.rs | 76 +++++++++++++++++++------- arrow-ipc/src/writer.rs | 112 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 183 insertions(+), 20 deletions(-) diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs index b2e580241adc..d0b9145c7f27 100644 --- a/arrow-ipc/src/convert.rs +++ b/arrow-ipc/src/convert.rs @@ -242,8 +242,10 @@ pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> Dat } } crate::Type::Binary => DataType::Binary, + crate::Type::BinaryView => DataType::BinaryView, crate::Type::LargeBinary => DataType::LargeBinary, crate::Type::Utf8 => DataType::Utf8, + crate::Type::Utf8View => DataType::Utf8View, crate::Type::LargeUtf8 => DataType::LargeUtf8, crate::Type::FixedSizeBinary => { let fsb = field.type_as_fixed_size_binary().unwrap(); @@ -543,7 +545,16 @@ pub(crate) fn get_fb_field_type<'a>( .as_union_value(), children: Some(fbb.create_vector(&empty_fields[..])), }, - BinaryView | Utf8View => unimplemented!("unimplemented"), + BinaryView => FBFieldType { + type_type: crate::Type::BinaryView, + type_: crate::BinaryViewBuilder::new(fbb).finish().as_union_value(), + children: Some(fbb.create_vector(&empty_fields[..])), + }, + Utf8View => FBFieldType { + type_type: crate::Type::Utf8View, + type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(), + children: Some(fbb.create_vector(&empty_fields[..])), + }, Utf8 => FBFieldType { type_type: crate::Type::Utf8, type_: crate::Utf8Builder::new(fbb).finish().as_union_value(), @@ -877,7 +888,9 @@ mod tests { true, ), Field::new("utf8", DataType::Utf8, false), + Field::new("utf8_view", DataType::Utf8View, false), Field::new("binary", DataType::Binary, false), + Field::new("binary_view", DataType::BinaryView, false), Field::new_list("list[u8]", Field::new("item", DataType::UInt8, false), true), Field::new_fixed_size_list( "fixed_size_list[u8]", diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index f015674d6813..c4d9703847e5 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -21,7 +21,7 @@ //! however the `FileReader` expects a reader that supports `Seek`ing use flatbuffers::{VectorIter, VerifierOptions}; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::fmt; use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; @@ -67,7 +67,11 @@ fn read_buffer( /// - check if the bit width of non-64-bit numbers is 64, and /// - read the buffer as 64-bit (signed integer or float), and /// - cast the 64-bit array to the appropriate data type -fn create_array(reader: &mut ArrayReader, field: &Field) -> Result { +fn create_array( + reader: &mut ArrayReader, + field: &Field, + variadic_counts: &mut VecDeque, +) -> Result { let data_type = field.data_type(); match data_type { Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array( @@ -79,6 +83,16 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result { + let count = variadic_counts + .pop_front() + .expect("Incorrect variadic count!"); + let count = count + 1; // add the null buffer. + let buffers = (0..count) + .map(|_| reader.next_buffer()) + .collect::, _>>()?; + create_primitive_array(reader.next_node(field)?, data_type, &buffers) + } FixedSizeBinary(_) => create_primitive_array( reader.next_node(field)?, data_type, @@ -87,13 +101,13 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result { let list_node = reader.next_node(field)?; let list_buffers = [reader.next_buffer()?, reader.next_buffer()?]; - let values = create_array(reader, list_field)?; + let values = create_array(reader, list_field, variadic_counts)?; create_list_array(list_node, data_type, &list_buffers, values) } FixedSizeList(ref list_field, _) => { let list_node = reader.next_node(field)?; let list_buffers = [reader.next_buffer()?]; - let values = create_array(reader, list_field)?; + let values = create_array(reader, list_field, variadic_counts)?; create_list_array(list_node, data_type, &list_buffers, values) } Struct(struct_fields) => { @@ -105,7 +119,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result Result { let run_node = reader.next_node(field)?; - let run_ends = create_array(reader, run_ends_field)?; - let values = create_array(reader, values_field)?; + let run_ends = create_array(reader, run_ends_field, variadic_counts)?; + let values = create_array(reader, values_field, variadic_counts)?; let run_array_length = run_node.length() as usize; let data = ArrayData::builder(data_type.clone()) @@ -173,7 +187,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result ArrayData::builder(data_type.clone()) + .len(length) + .buffers(buffers[1..].to_vec()) + .null_bit_buffer(null_buffer) + .build_aligned()?, _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => { // read 2 buffers: null buffer (optional) and data buffer ArrayData::builder(data_type.clone()) @@ -324,7 +343,11 @@ impl<'a> ArrayReader<'a> { }) } - fn skip_field(&mut self, field: &Field) -> Result<(), ArrowError> { + fn skip_field( + &mut self, + field: &Field, + variadic_count: &mut VecDeque, + ) -> Result<(), ArrowError> { self.next_node(field)?; match field.data_type() { @@ -333,6 +356,14 @@ impl<'a> ArrayReader<'a> { self.skip_buffer() } } + Utf8View | BinaryView => { + let count = variadic_count + .pop_front() + .expect("Incorrect variadic count!"); + for _i in 0..count { + self.skip_buffer() + } + } FixedSizeBinary(_) => { self.skip_buffer(); self.skip_buffer(); @@ -340,23 +371,23 @@ impl<'a> ArrayReader<'a> { List(list_field) | LargeList(list_field) | Map(list_field, _) => { self.skip_buffer(); self.skip_buffer(); - self.skip_field(list_field)?; + self.skip_field(list_field, variadic_count)?; } FixedSizeList(list_field, _) => { self.skip_buffer(); - self.skip_field(list_field)?; + self.skip_field(list_field, variadic_count)?; } Struct(struct_fields) => { self.skip_buffer(); // skip for each field for struct_field in struct_fields { - self.skip_field(struct_field)? + self.skip_field(struct_field, variadic_count)? } } RunEndEncoded(run_ends_field, values_field) => { - self.skip_field(run_ends_field)?; - self.skip_field(values_field)?; + self.skip_field(run_ends_field, variadic_count)?; + self.skip_field(values_field, variadic_count)?; } Dictionary(_, _) => { self.skip_buffer(); // Nulls @@ -371,7 +402,7 @@ impl<'a> ArrayReader<'a> { }; for (_, field) in fields.iter() { - self.skip_field(field)? + self.skip_field(field, variadic_count)? } } Null => {} // No buffer increases @@ -399,6 +430,13 @@ pub fn read_record_batch( let field_nodes = batch.nodes().ok_or_else(|| { ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string()) })?; + + let mut variadic_counts: VecDeque = if let Some(v) = batch.variadicBufferCounts() { + v.iter().collect() + } else { + VecDeque::default() + }; + let batch_compression = batch.compression(); let compression = batch_compression .map(|batch_compression| batch_compression.codec().try_into()) @@ -421,12 +459,13 @@ pub fn read_record_batch( for (idx, field) in schema.fields().iter().enumerate() { // Create array for projected field if let Some(proj_idx) = projection.iter().position(|p| p == &idx) { - let child = create_array(&mut reader, field)?; + let child = create_array(&mut reader, field, &mut variadic_counts)?; arrays.push((proj_idx, child)); } else { - reader.skip_field(field)?; + reader.skip_field(field, &mut variadic_counts)?; } } + assert!(variadic_counts.is_empty()); arrays.sort_by_key(|t| t.0); RecordBatch::try_new_with_options( Arc::new(schema.project(projection)?), @@ -437,9 +476,10 @@ pub fn read_record_batch( let mut children = vec![]; // keep track of index as lists require more than one node for field in schema.fields() { - let child = create_array(&mut reader, field)?; + let child = create_array(&mut reader, field, &mut variadic_counts)?; children.push(child); } + assert!(variadic_counts.is_empty()); RecordBatch::try_new_with_options(schema, children, &options) } } diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 4e32b04b0fba..183a34b40b35 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -29,7 +29,10 @@ use flatbuffers::FlatBufferBuilder; use arrow_array::builder::BufferBuilder; use arrow_array::cast::*; -use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType}; +use arrow_array::types::{ + Int16Type, Int32Type, Int64Type, Int8Type, RunEndIndexType, UInt16Type, UInt32Type, UInt64Type, + UInt8Type, +}; use arrow_array::*; use arrow_buffer::bit_util; use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer}; @@ -412,6 +415,8 @@ impl IpcDataGenerator { let compression_codec: Option = batch_compression_type.map(TryInto::try_into).transpose()?; + let mut variadic_buffer_counts = vec![]; + for array in batch.columns() { let array_data = array.to_data(); offset = write_array_data( @@ -425,6 +430,8 @@ impl IpcDataGenerator { compression_codec, write_options, )?; + + set_variadic_buffer_counts(&mut variadic_buffer_counts, array); } // pad the tail of body data let len = arrow_data.len(); @@ -434,6 +441,12 @@ impl IpcDataGenerator { // write data let buffers = fbb.create_vector(&buffers); let nodes = fbb.create_vector(&nodes); + let variadic_buffer = if variadic_buffer_counts.is_empty() { + None + } else { + Some(fbb.create_vector(&variadic_buffer_counts)) + }; + let root = { let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb); batch_builder.add_length(batch.num_rows() as i64); @@ -442,6 +455,10 @@ impl IpcDataGenerator { if let Some(c) = compression { batch_builder.add_compression(c); } + + if let Some(v) = variadic_buffer { + batch_builder.add_variadicBufferCounts(v); + } let b = batch_builder.finish(); b.as_union_value() }; @@ -547,6 +564,54 @@ impl IpcDataGenerator { } } +fn set_variadic_buffer_counts(counts: &mut Vec, array: &dyn Array) { + match array.data_type() { + DataType::BinaryView | DataType::Utf8View => { + counts.push(array.to_data().buffers().len() as i64); + } + DataType::Struct(_) => { + let array = array.as_any().downcast_ref::().unwrap(); + for column in array.columns() { + set_variadic_buffer_counts(counts, column.as_ref()); + } + } + DataType::LargeList(_) => { + let array = array.as_any().downcast_ref::().unwrap(); + set_variadic_buffer_counts(counts, array.values()); + } + DataType::List(_) => { + let array = array.as_any().downcast_ref::().unwrap(); + set_variadic_buffer_counts(counts, array.values()); + } + DataType::FixedSizeList(_, _) => { + let array = array.as_any().downcast_ref::().unwrap(); + set_variadic_buffer_counts(counts, array.values()); + } + DataType::Dictionary(kt, _) => { + macro_rules! set_subarray_counts { + ($array:expr, $counts:expr, $type:ty, $variant:ident) => { + if &DataType::$variant == kt.as_ref() { + let array = $array + .as_any() + .downcast_ref::>() + .unwrap(); + set_variadic_buffer_counts($counts, array.values()); + } + }; + } + set_subarray_counts!(array, counts, Int8Type, Int8); + set_subarray_counts!(array, counts, Int16Type, Int16); + set_subarray_counts!(array, counts, Int32Type, Int32); + set_subarray_counts!(array, counts, Int64Type, Int64); + set_subarray_counts!(array, counts, UInt8Type, UInt8); + set_subarray_counts!(array, counts, UInt16Type, UInt16); + set_subarray_counts!(array, counts, UInt32Type, UInt32); + set_subarray_counts!(array, counts, UInt64Type, UInt64); + } + _ => {} + } +} + pub(crate) fn unslice_run_array(arr: ArrayData) -> Result { match arr.data_type() { DataType::RunEndEncoded(k, _) => match k.data_type() { @@ -1247,6 +1312,17 @@ fn write_array_data( compression_codec, )?; } + } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) { + // Todo: if the array has a offset, we should slice the ArrayData to save space. + for buffer in array_data.buffers() { + offset = write_buffer( + buffer.as_slice(), + buffers, + arrow_data, + offset, + compression_codec, + )?; + } } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) { let (offsets, values) = get_byte_array_buffers::(array_data); for buffer in [offsets, values] { @@ -1802,6 +1878,40 @@ mod tests { write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap()); } + #[test] + fn test_write_binary_view() { + const LONG_TEST_STRING: &str = + "This is a long string to make sure binary view array handles it"; + let schema = Schema::new(vec![Field::new("field1", DataType::BinaryView, true)]); + let values: Vec> = vec![ + Some(b"foo"), + Some(b"bar"), + Some(LONG_TEST_STRING.as_bytes()), + ]; + let array = BinaryViewArray::from_iter(values); + let record_batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap(); + + let mut file = tempfile::tempfile().unwrap(); + { + let mut writer = FileWriter::try_new(&mut file, &schema).unwrap(); + writer.write(&record_batch).unwrap(); + writer.finish().unwrap(); + } + file.rewind().unwrap(); + { + let mut reader = FileReader::try_new(file, None).unwrap(); + let read_batch = reader.next().unwrap().unwrap(); + read_batch + .columns() + .iter() + .zip(record_batch.columns()) + .for_each(|(a, b)| { + assert_eq!(a, b); + }); + } + } + #[test] fn truncate_ipc_record_batch() { fn create_batch(rows: usize) -> RecordBatch { From 4f0c80dcf817770c2e61f551c6b7516b765cebd6 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Sun, 17 Mar 2024 22:23:06 +0000 Subject: [PATCH 02/10] update tests --- arrow-ipc/src/writer.rs | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 183a34b40b35..c5632f0ddc34 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -1882,15 +1882,24 @@ mod tests { fn test_write_binary_view() { const LONG_TEST_STRING: &str = "This is a long string to make sure binary view array handles it"; - let schema = Schema::new(vec![Field::new("field1", DataType::BinaryView, true)]); + let schema = Schema::new(vec![ + Field::new("field1", DataType::BinaryView, true), + Field::new("field2", DataType::Utf8View, true), + ]); let values: Vec> = vec![ Some(b"foo"), Some(b"bar"), Some(LONG_TEST_STRING.as_bytes()), ]; - let array = BinaryViewArray::from_iter(values); - let record_batch = - RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap(); + let binary_array = BinaryViewArray::from_iter(values); + let utf8_array = StringViewArray::from_iter( + vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)].into_iter(), + ); + let record_batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(binary_array), Arc::new(utf8_array)], + ) + .unwrap(); let mut file = tempfile::tempfile().unwrap(); { @@ -1900,7 +1909,7 @@ mod tests { } file.rewind().unwrap(); { - let mut reader = FileReader::try_new(file, None).unwrap(); + let mut reader = FileReader::try_new(&file, None).unwrap(); let read_batch = reader.next().unwrap().unwrap(); read_batch .columns() @@ -1910,6 +1919,15 @@ mod tests { assert_eq!(a, b); }); } + file.rewind().unwrap(); + { + let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap(); + let read_batch = reader.next().unwrap().unwrap(); + assert_eq!(read_batch.num_columns(), 1); + let read_array = read_batch.column(0); + let write_array = record_batch.column(0); + assert_eq!(read_array, write_array); + } } #[test] From 4c29007f111a6a147f532c408fbb84761e722dd7 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Sun, 17 Mar 2024 22:48:03 +0000 Subject: [PATCH 03/10] fix variadic counting --- arrow-ipc/src/reader.rs | 3 ++- arrow-ipc/src/writer.rs | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index c4d9703847e5..d9d474d5d3f4 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -87,7 +87,7 @@ fn create_array( let count = variadic_counts .pop_front() .expect("Incorrect variadic count!"); - let count = count + 1; // add the null buffer. + let count = count + 2; // view and null buffer. let buffers = (0..count) .map(|_| reader.next_buffer()) .collect::, _>>()?; @@ -360,6 +360,7 @@ impl<'a> ArrayReader<'a> { let count = variadic_count .pop_front() .expect("Incorrect variadic count!"); + let count = count + 2; // view and null buffer. for _i in 0..count { self.skip_buffer() } diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index c5632f0ddc34..46dd17c3e805 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -567,7 +567,10 @@ impl IpcDataGenerator { fn set_variadic_buffer_counts(counts: &mut Vec, array: &dyn Array) { match array.data_type() { DataType::BinaryView | DataType::Utf8View => { - counts.push(array.to_data().buffers().len() as i64); + // The spec is not clear on whether the view/null buffer should be included in the variadic buffer count. + // But from C++ impl https://github.com/apache/arrow/blob/b448b33808f2dd42866195fa4bb44198e2fc26b9/cpp/src/arrow/ipc/writer.cc#L477 + // we know they are not included. + counts.push(array.to_data().buffers().len() as i64 - 1); } DataType::Struct(_) => { let array = array.as_any().downcast_ref::().unwrap(); From 72854c48e9010b547e27128a712472c1d445f5b3 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Mon, 18 Mar 2024 02:33:02 +0000 Subject: [PATCH 04/10] fix linting, address comments --- arrow-ipc/src/reader.rs | 4 ++-- arrow-ipc/src/writer.rs | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index d9d474d5d3f4..a067732cce68 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -86,7 +86,7 @@ fn create_array( BinaryView | Utf8View => { let count = variadic_counts .pop_front() - .expect("Incorrect variadic count!"); + .ok_or(ArrowError::IpcError("Incorrect variadic count!".to_owned()))?; let count = count + 2; // view and null buffer. let buffers = (0..count) .map(|_| reader.next_buffer()) @@ -359,7 +359,7 @@ impl<'a> ArrayReader<'a> { Utf8View | BinaryView => { let count = variadic_count .pop_front() - .expect("Incorrect variadic count!"); + .ok_or(ArrowError::IpcError("Incorrect variadic count!".to_owned()))?; let count = count + 2; // view and null buffer. for _i in 0..count { self.skip_buffer() diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 46dd17c3e805..b3c298a43259 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -1895,9 +1895,8 @@ mod tests { Some(LONG_TEST_STRING.as_bytes()), ]; let binary_array = BinaryViewArray::from_iter(values); - let utf8_array = StringViewArray::from_iter( - vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)].into_iter(), - ); + let utf8_array = + StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]); let record_batch = RecordBatch::try_new( Arc::new(schema.clone()), vec![Arc::new(binary_array), Arc::new(utf8_array)], From 850b01eee5ce91fd405305c078921c29378d0594 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 20 Mar 2024 13:27:44 -0500 Subject: [PATCH 05/10] Apply suggestions from code review Co-authored-by: Benjamin Kietzman Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- arrow-ipc/src/reader.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index a067732cce68..d376a111f624 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -86,7 +86,7 @@ fn create_array( BinaryView | Utf8View => { let count = variadic_counts .pop_front() - .ok_or(ArrowError::IpcError("Incorrect variadic count!".to_owned()))?; + .ok_or(ArrowError::IpcError(format!("Missing variadic count for {data_type} column")))?; let count = count + 2; // view and null buffer. let buffers = (0..count) .map(|_| reader.next_buffer()) @@ -432,11 +432,7 @@ pub fn read_record_batch( ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string()) })?; - let mut variadic_counts: VecDeque = if let Some(v) = batch.variadicBufferCounts() { - v.iter().collect() - } else { - VecDeque::default() - }; + let mut variadic_counts: VecDeque = batch.variadicBufferCounts().into_iter().flatten().collect() let batch_compression = batch.compression(); let compression = batch_compression From 3b59c0822692152fb31fcc056efcc8e5037afd59 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 20 Mar 2024 19:50:07 +0000 Subject: [PATCH 06/10] address some review comments --- arrow-ipc/src/reader.rs | 7 +++++-- arrow-ipc/src/writer.rs | 32 ++++++++++++++------------------ 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index d376a111f624..5d3c2e94fa50 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -86,7 +86,9 @@ fn create_array( BinaryView | Utf8View => { let count = variadic_counts .pop_front() - .ok_or(ArrowError::IpcError(format!("Missing variadic count for {data_type} column")))?; + .ok_or(ArrowError::IpcError(format!( + "Missing variadic count for {data_type} column" + )))?; let count = count + 2; // view and null buffer. let buffers = (0..count) .map(|_| reader.next_buffer()) @@ -432,7 +434,8 @@ pub fn read_record_batch( ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string()) })?; - let mut variadic_counts: VecDeque = batch.variadicBufferCounts().into_iter().flatten().collect() + let mut variadic_counts: VecDeque = + batch.variadicBufferCounts().into_iter().flatten().collect(); let batch_compression = batch.compression(); let compression = batch_compression diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index b3c298a43259..543386fbabdf 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -431,7 +431,7 @@ impl IpcDataGenerator { write_options, )?; - set_variadic_buffer_counts(&mut variadic_buffer_counts, array); + append_variadic_buffer_counts(&mut variadic_buffer_counts, array); } // pad the tail of body data let len = arrow_data.len(); @@ -564,41 +564,37 @@ impl IpcDataGenerator { } } -fn set_variadic_buffer_counts(counts: &mut Vec, array: &dyn Array) { +fn append_variadic_buffer_counts(counts: &mut Vec, array: &dyn Array) { match array.data_type() { DataType::BinaryView | DataType::Utf8View => { - // The spec is not clear on whether the view/null buffer should be included in the variadic buffer count. - // But from C++ impl https://github.com/apache/arrow/blob/b448b33808f2dd42866195fa4bb44198e2fc26b9/cpp/src/arrow/ipc/writer.cc#L477 - // we know they are not included. + // The spec documents the counts only includes the variadic buffers, not the view/null buffers. + // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers counts.push(array.to_data().buffers().len() as i64 - 1); } DataType::Struct(_) => { - let array = array.as_any().downcast_ref::().unwrap(); + let array = array.as_struct(); for column in array.columns() { - set_variadic_buffer_counts(counts, column.as_ref()); + append_variadic_buffer_counts(counts, column.as_ref()); } } DataType::LargeList(_) => { - let array = array.as_any().downcast_ref::().unwrap(); - set_variadic_buffer_counts(counts, array.values()); + let array: &LargeListArray = array.as_list(); + append_variadic_buffer_counts(counts, array.values()); } DataType::List(_) => { - let array = array.as_any().downcast_ref::().unwrap(); - set_variadic_buffer_counts(counts, array.values()); + let array: &ListArray = array.as_list(); + append_variadic_buffer_counts(counts, array.values()); } DataType::FixedSizeList(_, _) => { - let array = array.as_any().downcast_ref::().unwrap(); - set_variadic_buffer_counts(counts, array.values()); + let array = array.as_fixed_size_list(); + append_variadic_buffer_counts(counts, array.values()); } DataType::Dictionary(kt, _) => { macro_rules! set_subarray_counts { ($array:expr, $counts:expr, $type:ty, $variant:ident) => { if &DataType::$variant == kt.as_ref() { - let array = $array - .as_any() - .downcast_ref::>() - .unwrap(); - set_variadic_buffer_counts($counts, array.values()); + let array: &DictionaryArray<$type> = $array.as_dictionary(); + append_variadic_buffer_counts($counts, array.values()); } }; } From d121ce654a2dc5afb375b23e781118add1fa7b03 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 20 Mar 2024 20:30:52 +0000 Subject: [PATCH 07/10] update comments --- arrow-ipc/src/reader.rs | 8 +++++++- arrow-ipc/src/writer.rs | 7 ++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 5d3c2e94fa50..fd7d6be3af44 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -60,6 +60,9 @@ fn read_buffer( /// Coordinates reading arrays based on data types. /// +/// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView) +/// When encounter such types, we pop from the front of the queue to get the number of buffers to read. +/// /// Notes: /// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls /// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size. @@ -361,7 +364,10 @@ impl<'a> ArrayReader<'a> { Utf8View | BinaryView => { let count = variadic_count .pop_front() - .ok_or(ArrowError::IpcError("Incorrect variadic count!".to_owned()))?; + .ok_or(ArrowError::IpcError(format!( + "Missing variadic count for {} column", + field.data_type() + )))?; let count = count + 2; // view and null buffer. for _i in 0..count { self.skip_buffer() diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 543386fbabdf..d570a198e9cb 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -1312,7 +1312,12 @@ fn write_array_data( )?; } } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) { - // Todo: if the array has a offset, we should slice the ArrayData to save space. + // Slicing the views buffer is safe and easy, + // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers + // + // Current implementation just serialize the raw arrays as given and not try to optimize anything. + // If users wants to "compact" the arrays prior to sending them over IPC, + // they should consider the gc API suggested in #5513 for buffer in array_data.buffers() { offset = write_buffer( buffer.as_slice(), From d16cf2331a8f390491ff1928df68e9b56163a1c8 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 27 Mar 2024 16:32:03 +0000 Subject: [PATCH 08/10] Add tests and fix bugs with dict types --- arrow-ipc/src/reader.rs | 107 ++++++++++++++++++++++++++++++++++++++++ arrow-ipc/src/writer.rs | 64 +++++++++--------------- 2 files changed, 129 insertions(+), 42 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index fd7d6be3af44..b219821086a5 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -1801,6 +1801,113 @@ mod tests { assert_eq!(input_batch, output_batch); } + const LONG_TEST_STRING: &str = + "This is a long string to make sure binary view array handles it"; + + #[test] + fn test_roundtrip_view_types() { + let schema = Schema::new(vec![ + Field::new("field_1", DataType::BinaryView, true), + Field::new("field_2", DataType::Utf8, true), + Field::new("field_3", DataType::Utf8View, true), + ]); + let bin_values: Vec> = vec![ + Some(b"foo"), + Some(b"bar"), + Some(LONG_TEST_STRING.as_bytes()), + ]; + let utf8_values: Vec> = vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]; + let bin_view_array = BinaryViewArray::from_iter(bin_values); + let utf8_array = StringArray::from_iter(utf8_values.iter()); + let utf8_view_array = StringViewArray::from_iter(utf8_values); + let record_batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(bin_view_array), + Arc::new(utf8_array), + Arc::new(utf8_view_array), + ], + ) + .unwrap(); + + assert_eq!(record_batch, roundtrip_ipc(&record_batch)); + assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch)); + + let sliced_batch = record_batch.slice(1, 2); + assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch)); + assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch)); + } + + #[test] + fn test_roundtrip_view_types_nested_dict() { + let bin_values: Vec> = vec![ + Some(b"foo"), + Some(b"bar"), + Some(LONG_TEST_STRING.as_bytes()), + Some(b"field"), + ]; + let utf8_values: Vec> = vec![ + Some("foo"), + Some("bar"), + Some(LONG_TEST_STRING), + Some("field"), + ]; + let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values)); + let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values)); + + let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]); + let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone()); + let keys_field = Arc::new(Field::new_dict( + "keys", + DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)), + true, + 1, + false, + )); + + let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]); + let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array); + let values_field = Arc::new(Field::new_dict( + "values", + DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)), + true, + 2, + false, + )); + let entry_struct = StructArray::from(vec![ + (keys_field, make_array(key_dict_array.into_data())), + (values_field, make_array(value_dict_array.into_data())), + ]); + + let map_data_type = DataType::Map( + Arc::new(Field::new( + "entries", + entry_struct.data_type().clone(), + false, + )), + false, + ); + let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]); + let map_data = ArrayData::builder(map_data_type) + .len(3) + .add_buffer(entry_offsets) + .add_child_data(entry_struct.into_data()) + .build() + .unwrap(); + let map_array = MapArray::from(map_data); + + let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]); + let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array)); + let schema = Arc::new(Schema::new(vec![Field::new( + "f1", + dict_dict_array.data_type().clone(), + false, + )])); + let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap(); + assert_eq!(batch, roundtrip_ipc(&batch)); + assert_eq!(batch, roundtrip_ipc_stream(&batch)); + } + #[test] fn test_no_columns_batch() { let schema = Arc::new(Schema::empty()); diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index d570a198e9cb..11bf18122d65 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -29,10 +29,7 @@ use flatbuffers::FlatBufferBuilder; use arrow_array::builder::BufferBuilder; use arrow_array::cast::*; -use arrow_array::types::{ - Int16Type, Int32Type, Int64Type, Int8Type, RunEndIndexType, UInt16Type, UInt32Type, UInt64Type, - UInt8Type, -}; +use arrow_array::types::{Int16Type, Int32Type, Int64Type, RunEndIndexType}; use arrow_array::*; use arrow_buffer::bit_util; use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer}; @@ -431,7 +428,7 @@ impl IpcDataGenerator { write_options, )?; - append_variadic_buffer_counts(&mut variadic_buffer_counts, array); + append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data); } // pad the tail of body data let len = arrow_data.len(); @@ -518,6 +515,9 @@ impl IpcDataGenerator { write_options, )?; + let mut variadic_buffer_counts = vec![]; + append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data); + // pad the tail of body data let len = arrow_data.len(); let pad_len = pad_to_8(len as u32); @@ -526,6 +526,11 @@ impl IpcDataGenerator { // write data let buffers = fbb.create_vector(&buffers); let nodes = fbb.create_vector(&nodes); + let variadic_buffer = if variadic_buffer_counts.is_empty() { + None + } else { + Some(fbb.create_vector(&variadic_buffer_counts)) + }; let root = { let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb); @@ -535,6 +540,9 @@ impl IpcDataGenerator { if let Some(c) = compression { batch_builder.add_compression(c); } + if let Some(v) = variadic_buffer { + batch_builder.add_variadicBufferCounts(v); + } batch_builder.finish() }; @@ -564,50 +572,22 @@ impl IpcDataGenerator { } } -fn append_variadic_buffer_counts(counts: &mut Vec, array: &dyn Array) { +fn append_variadic_buffer_counts(counts: &mut Vec, array: &ArrayData) { match array.data_type() { DataType::BinaryView | DataType::Utf8View => { // The spec documents the counts only includes the variadic buffers, not the view/null buffers. // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers - counts.push(array.to_data().buffers().len() as i64 - 1); - } - DataType::Struct(_) => { - let array = array.as_struct(); - for column in array.columns() { - append_variadic_buffer_counts(counts, column.as_ref()); - } + counts.push(array.buffers().len() as i64 - 1); } - DataType::LargeList(_) => { - let array: &LargeListArray = array.as_list(); - append_variadic_buffer_counts(counts, array.values()); + DataType::Dictionary(_, _) => { + // Dictionary types are handled in `encode_dictionaries`. + return; } - DataType::List(_) => { - let array: &ListArray = array.as_list(); - append_variadic_buffer_counts(counts, array.values()); - } - DataType::FixedSizeList(_, _) => { - let array = array.as_fixed_size_list(); - append_variadic_buffer_counts(counts, array.values()); - } - DataType::Dictionary(kt, _) => { - macro_rules! set_subarray_counts { - ($array:expr, $counts:expr, $type:ty, $variant:ident) => { - if &DataType::$variant == kt.as_ref() { - let array: &DictionaryArray<$type> = $array.as_dictionary(); - append_variadic_buffer_counts($counts, array.values()); - } - }; + _ => { + for child in array.child_data() { + append_variadic_buffer_counts(counts, child) } - set_subarray_counts!(array, counts, Int8Type, Int8); - set_subarray_counts!(array, counts, Int16Type, Int16); - set_subarray_counts!(array, counts, Int32Type, Int32); - set_subarray_counts!(array, counts, Int64Type, Int64); - set_subarray_counts!(array, counts, UInt8Type, UInt8); - set_subarray_counts!(array, counts, UInt16Type, UInt16); - set_subarray_counts!(array, counts, UInt32Type, UInt32); - set_subarray_counts!(array, counts, UInt64Type, UInt64); } - _ => {} } } @@ -1883,7 +1863,7 @@ mod tests { } #[test] - fn test_write_binary_view() { + fn test_write_view_types() { const LONG_TEST_STRING: &str = "This is a long string to make sure binary view array handles it"; let schema = Schema::new(vec![ From bb9e42f40c0db30fae68dafb61f462b14baa82a4 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Wed, 27 Mar 2024 16:36:55 +0000 Subject: [PATCH 09/10] make clippy happy --- arrow-ipc/src/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 11bf18122d65..265d6be1a503 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -580,8 +580,8 @@ fn append_variadic_buffer_counts(counts: &mut Vec, array: &ArrayData) { counts.push(array.buffers().len() as i64 - 1); } DataType::Dictionary(_, _) => { + // Do nothing // Dictionary types are handled in `encode_dictionaries`. - return; } _ => { for child in array.child_data() { From 0ffd783f28851f482f017bf90022e81079fcf976 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hao Date: Thu, 28 Mar 2024 16:41:29 -0400 Subject: [PATCH 10/10] update test cases --- arrow-ipc/src/reader.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index b219821086a5..da5adf8d8f2c 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -1813,10 +1813,12 @@ mod tests { ]); let bin_values: Vec> = vec![ Some(b"foo"), + None, Some(b"bar"), Some(LONG_TEST_STRING.as_bytes()), ]; - let utf8_values: Vec> = vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]; + let utf8_values: Vec> = + vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)]; let bin_view_array = BinaryViewArray::from_iter(bin_values); let utf8_array = StringArray::from_iter(utf8_values.iter()); let utf8_view_array = StringViewArray::from_iter(utf8_values); @@ -1842,12 +1844,14 @@ mod tests { fn test_roundtrip_view_types_nested_dict() { let bin_values: Vec> = vec![ Some(b"foo"), + None, Some(b"bar"), Some(LONG_TEST_STRING.as_bytes()), Some(b"field"), ]; let utf8_values: Vec> = vec![ Some("foo"), + None, Some("bar"), Some(LONG_TEST_STRING), Some("field"), @@ -1906,6 +1910,10 @@ mod tests { let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap(); assert_eq!(batch, roundtrip_ipc(&batch)); assert_eq!(batch, roundtrip_ipc_stream(&batch)); + + let sliced_batch = batch.slice(1, 2); + assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch)); + assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch)); } #[test]