Skip to content

Commit

Permalink
Complete StringViewArray and BinaryViewArray parquet decoder: imp…
Browse files Browse the repository at this point in the history
…lement delta byte array and delta length byte array encoding (#6004)

* implement all encodings

* address comments

* fix bug

* Update parquet/src/arrow/array_reader/byte_view_array.rs

Co-authored-by: Andrew Lamb <[email protected]>

* fix test

* update comments

* update test

* Only copy strings one

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
XiangpengHao and alamb authored Jul 8, 2024
1 parent b9e4497 commit 8355823
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 72 deletions.
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;

use arrow_schema::{DataType, Fields, SchemaBuilder};

use crate::arrow::array_reader::byte_array::make_byte_view_array_reader;
use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
use crate::arrow::array_reader::{
Expand Down
63 changes: 22 additions & 41 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,36 +74,6 @@ pub fn make_byte_array_reader(
}
}

/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types.
pub fn make_byte_view_array_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
) -> Result<Box<dyn ArrayReader>> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
Some(t) => t,
None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() {
ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View,
_ => ArrowType::BinaryView,
},
};

match data_type {
ArrowType::BinaryView | ArrowType::Utf8View => {
let reader = GenericRecordReader::new(column_desc);
Ok(Box::new(ByteArrayReader::<i32>::new(
pages, data_type, reader,
)))
}

_ => Err(general_err!(
"invalid data type for byte array reader read to view type - {}",
data_type
)),
}
}

/// An [`ArrayReader`] for variable length byte arrays
struct ByteArrayReader<I: OffsetSizeTrait> {
data_type: ArrowType,
Expand Down Expand Up @@ -472,6 +442,23 @@ impl ByteArrayDecoderDeltaLength {
let mut lengths = vec![0; values];
len_decoder.get(&mut lengths)?;

let mut total_bytes = 0;

for l in lengths.iter() {
if *l < 0 {
return Err(ParquetError::General(
"negative delta length byte array length".to_string(),
));
}
total_bytes += *l as usize;
}

if total_bytes + len_decoder.get_offset() > data.len() {
return Err(ParquetError::General(
"Insufficient delta length byte array bytes".to_string(),
));
}

Ok(Self {
lengths,
data,
Expand All @@ -496,23 +483,17 @@ impl ByteArrayDecoderDeltaLength {
let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
output.values.reserve(total_bytes);

if self.data_offset + total_bytes > self.data.len() {
return Err(ParquetError::EOF(
"Insufficient delta length byte array bytes".to_string(),
));
}

let mut start_offset = self.data_offset;
let mut current_offset = self.data_offset;
for length in src_lengths {
let end_offset = start_offset + *length as usize;
let end_offset = current_offset + *length as usize;
output.try_push(
&self.data.as_ref()[start_offset..end_offset],
&self.data.as_ref()[current_offset..end_offset],
self.validate_utf8,
)?;
start_offset = end_offset;
current_offset = end_offset;
}

self.data_offset = start_offset;
self.data_offset = current_offset;
self.length_offset += to_read;

if self.validate_utf8 {
Expand Down
205 changes: 191 additions & 14 deletions parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,23 @@

use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::view_buffer::ViewBuffer;
use crate::arrow::decoder::DictIndexDecoder;
use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::data_type::Int32Type;
use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow_array::ArrayRef;
use arrow_array::{builder::make_view, ArrayRef};
use arrow_data::ByteView;
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;
use std::any::Any;

/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types.
#[allow(unused)]
pub fn make_byte_view_array_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
Expand Down Expand Up @@ -61,7 +62,6 @@ pub fn make_byte_view_array_reader(
}

/// An [`ArrayReader`] for variable length byte arrays
#[allow(unused)]
struct ByteViewArrayReader {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
Expand Down Expand Up @@ -213,6 +213,8 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder {
pub enum ByteViewArrayDecoder {
Plain(ByteViewArrayDecoderPlain),
Dictionary(ByteViewArrayDecoderDictionary),
DeltaLength(ByteViewArrayDecoderDeltaLength),
DeltaByteArray(ByteViewArrayDecoderDelta),
}

impl ByteViewArrayDecoder {
Expand All @@ -235,9 +237,12 @@ impl ByteViewArrayDecoder {
data, num_levels, num_values,
))
}
Encoding::DELTA_LENGTH_BYTE_ARRAY | Encoding::DELTA_BYTE_ARRAY => {
unimplemented!("stay tuned!")
}
Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength(
ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?,
),
Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray(
ByteViewArrayDecoderDelta::new(data, validate_utf8)?,
),
_ => {
return Err(general_err!(
"unsupported encoding for byte array: {}",
Expand All @@ -263,6 +268,8 @@ impl ByteViewArrayDecoder {
.ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
d.read(out, dict, len)
}
ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len),
ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len),
}
}

Expand All @@ -275,6 +282,8 @@ impl ByteViewArrayDecoder {
.ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
d.skip(dict, len)
}
ByteViewArrayDecoder::DeltaLength(d) => d.skip(len),
ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len),
}
}
}
Expand Down Expand Up @@ -487,6 +496,181 @@ impl ByteViewArrayDecoderDictionary {
}
}

/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
pub struct ByteViewArrayDecoderDeltaLength {
lengths: Vec<i32>,
data: Bytes,
length_offset: usize,
data_offset: usize,
validate_utf8: bool,
}

impl ByteViewArrayDecoderDeltaLength {
fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
len_decoder.set_data(data.clone(), 0)?;
let values = len_decoder.values_left();

let mut lengths = vec![0; values];
len_decoder.get(&mut lengths)?;

let mut total_bytes = 0;

for l in lengths.iter() {
if *l < 0 {
return Err(ParquetError::General(
"negative delta length byte array length".to_string(),
));
}
total_bytes += *l as usize;
}

if total_bytes + len_decoder.get_offset() > data.len() {
return Err(ParquetError::General(
"Insufficient delta length byte array bytes".to_string(),
));
}

Ok(Self {
lengths,
data,
validate_utf8,
length_offset: 0,
data_offset: len_decoder.get_offset(),
})
}

fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
let to_read = len.min(self.lengths.len() - self.length_offset);
output.views.reserve(to_read);

let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read];

let block_id = output.append_block(self.data.clone().into());

let mut current_offset = self.data_offset;
let initial_offset = current_offset;
for length in src_lengths {
// # Safety
// The length is from the delta length decoder, so it is valid
// The start_offset is calculated from the lengths, so it is valid
// `start_offset + length` is guaranteed to be within the bounds of `data`, as checked in `new`
unsafe { output.append_view_unchecked(block_id, current_offset as u32, *length as u32) }

current_offset += *length as usize;
}

// Delta length encoding has continuous strings, we can validate utf8 in one go
if self.validate_utf8 {
check_valid_utf8(&self.data[initial_offset..current_offset])?;
}

self.data_offset = current_offset;
self.length_offset += to_read;

Ok(to_read)
}

fn skip(&mut self, to_skip: usize) -> Result<usize> {
let remain_values = self.lengths.len() - self.length_offset;
let to_skip = remain_values.min(to_skip);

let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip];
let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();

self.data_offset += total_bytes;
self.length_offset += to_skip;
Ok(to_skip)
}
}

/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`]
pub struct ByteViewArrayDecoderDelta {
decoder: DeltaByteArrayDecoder,
validate_utf8: bool,
}

impl ByteViewArrayDecoderDelta {
fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
Ok(Self {
decoder: DeltaByteArrayDecoder::new(data)?,
validate_utf8,
})
}

// Unlike other encodings, we need to copy the data.
//
// DeltaByteArray data is stored using shared prefixes/suffixes,
// which results in potentially non-contiguous
// strings, while Arrow encodings require contiguous strings
//
// <https://parquet.apache.org/docs/file-format/data-pages/encodings/#delta-strings-delta_byte_array--7>

fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
output.views.reserve(len.min(self.decoder.remaining()));

// array buffer only have long strings
let mut array_buffer: Vec<u8> = Vec::with_capacity(4096);

let buffer_id = output.buffers.len() as u32;

let read = if !self.validate_utf8 {
self.decoder.read(len, |bytes| {
let offset = array_buffer.len();
let view = make_view(bytes, buffer_id, offset as u32);
if bytes.len() > 12 {
// only copy the data to buffer if the string can not be inlined.
array_buffer.extend_from_slice(bytes);
}

// # Safety
// The buffer_id is the last buffer in the output buffers
// The offset is calculated from the buffer, so it is valid
unsafe {
output.append_raw_view_unchecked(&view);
}
Ok(())
})?
} else {
// utf8 validation buffer has only short strings. These short
// strings are inlined into the views but we copy them into a
// contiguous buffer to accelerate validation.®
let mut utf8_validation_buffer = Vec::with_capacity(4096);

let v = self.decoder.read(len, |bytes| {
let offset = array_buffer.len();
let view = make_view(bytes, buffer_id, offset as u32);
if bytes.len() > 12 {
// only copy the data to buffer if the string can not be inlined.
array_buffer.extend_from_slice(bytes);
} else {
utf8_validation_buffer.extend_from_slice(bytes);
}

// # Safety
// The buffer_id is the last buffer in the output buffers
// The offset is calculated from the buffer, so it is valid
// Utf-8 validation is done later
unsafe {
output.append_raw_view_unchecked(&view);
}
Ok(())
})?;
check_valid_utf8(&array_buffer)?;
check_valid_utf8(&utf8_validation_buffer)?;
v
};

let actual_block_id = output.append_block(array_buffer.into());
assert_eq!(actual_block_id, buffer_id);
Ok(read)
}

fn skip(&mut self, to_skip: usize) -> Result<usize> {
self.decoder.skip(to_skip)
}
}

/// Check that `val` is a valid UTF-8 sequence
pub fn check_valid_utf8(val: &[u8]) -> Result<()> {
match std::str::from_utf8(val) {
Expand Down Expand Up @@ -525,13 +709,6 @@ mod tests {
.unwrap();

for (encoding, page) in pages {
if encoding != Encoding::PLAIN
&& encoding != Encoding::RLE_DICTIONARY
&& encoding != Encoding::PLAIN_DICTIONARY
{
// skip unsupported encodings for now as they are not yet implemented
continue;
}
let mut output = ViewBuffer::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

Expand Down
Loading

0 comments on commit 8355823

Please sign in to comment.