Skip to content

Commit

Permalink
Add FileReaderBuilder for arrow-ipc to allow reading large no. of col…
Browse files Browse the repository at this point in the history
…umn files (apache#5136)

* Add FileReaderBuilder for arrow-ipc

* Switch parameter to not expose flatbuffer types
  • Loading branch information
Jefffrey authored Dec 26, 2023
1 parent ff951b4 commit add8f56
Showing 1 changed file with 188 additions and 50 deletions.
238 changes: 188 additions & 50 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//! The `FileReader` and `StreamReader` have similar interfaces,
//! however the `FileReader` expects a reader that supports `Seek`ing
use flatbuffers::VectorIter;
use flatbuffers::{VectorIter, VerifierOptions};
use std::collections::HashMap;
use std::fmt;
use std::io::{BufReader, Read, Seek, SeekFrom};
Expand Down Expand Up @@ -522,59 +522,78 @@ fn parse_message(buf: &[u8]) -> Result<Message, ArrowError> {
.map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
}

/// Arrow File reader
pub struct FileReader<R: Read + Seek> {
/// Buffered file reader that supports reading and seeking
reader: R,
/// Build an Arrow [`FileReader`] with custom options.
#[derive(Debug)]
pub struct FileReaderBuilder {
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<usize>>,
/// Passed through to construct [`VerifierOptions`]
max_footer_fb_tables: usize,
/// Passed through to construct [`VerifierOptions`]
max_footer_fb_depth: usize,
}

/// The schema that is read from the file header
schema: SchemaRef,
impl Default for FileReaderBuilder {
fn default() -> Self {
let verifier_options = VerifierOptions::default();
Self {
max_footer_fb_tables: verifier_options.max_tables,
max_footer_fb_depth: verifier_options.max_depth,
projection: None,
}
}
}

/// The blocks in the file
impl FileReaderBuilder {
/// Options for creating a new [`FileReader`].
///
/// A block indicates the regions in the file to read to get data
blocks: Vec<crate::Block>,

/// A counter to keep track of the current block that should be read
current_block: usize,
/// To convert a builder into a reader, call [`FileReaderBuilder::build`].
pub fn new() -> Self {
Self::default()
}

/// The total number of blocks, which may contain record batches and other types
total_blocks: usize,
/// Optional projection for which columns to load (zero-based column indices).
pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
self.projection = Some(projection);
self
}

/// Optional dictionaries for each schema field.
/// Flatbuffers option for parsing the footer. Controls the max number of fields and
/// metadata key-value pairs that can be parsed from the schema of the footer.
///
/// Dictionaries may be appended to in the streaming format.
dictionaries_by_id: HashMap<i64, ArrayRef>,

/// Metadata version
metadata_version: crate::MetadataVersion,

/// User defined metadata
custom_metadata: HashMap<String, String>,

/// Optional projection and projected_schema
projection: Option<(Vec<usize>, Schema)>,
}

impl<R: Read + Seek> fmt::Debug for FileReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
f.debug_struct("FileReader<R>")
.field("schema", &self.schema)
.field("blocks", &self.blocks)
.field("current_block", &self.current_block)
.field("total_blocks", &self.total_blocks)
.field("dictionaries_by_id", &self.dictionaries_by_id)
.field("metadata_version", &self.metadata_version)
.field("projection", &self.projection)
.finish_non_exhaustive()
/// By default this is set to `1_000_000` which roughly translates to a schema with
/// no metadata key-value pairs but 499,999 fields.
///
/// This default limit is enforced to protect against malicious files with a massive
/// amount of flatbuffer tables which could cause a denial of service attack.
///
/// If you need to ingest a trusted file with a massive number of fields and/or
/// metadata key-value pairs and are facing the error `"Unable to get root as
/// footer: TooManyTables"` then increase this parameter as necessary.
pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> Self {
self.max_footer_fb_tables = max_footer_fb_tables;
self
}
}

impl<R: Read + Seek> FileReader<R> {
/// Try to create a new file reader
/// Flatbuffers option for parsing the footer. Controls the max depth for schemas with
/// nested fields parsed from the footer.
///
/// Returns errors if the file does not meet the Arrow Format footer requirements
pub fn try_new(mut reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
/// By default this is set to `64` which roughly translates to a schema with
/// a field nested 60 levels down through other struct fields.
///
/// This default limit is enforced to protect against malicious files with a extremely
/// deep flatbuffer structure which could cause a denial of service attack.
///
/// If you need to ingest a trusted file with a deeply nested field and are facing the
/// error `"Unable to get root as footer: DepthLimitReached"` then increase this
/// parameter as necessary.
pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> Self {
self.max_footer_fb_depth = max_footer_fb_depth;
self
}

/// Build [`FileReader`] with given reader.
pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, ArrowError> {
// Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
let mut buffer = [0; 10];
reader.seek(SeekFrom::End(-10))?;
Expand All @@ -594,9 +613,14 @@ impl<R: Read + Seek> FileReader<R> {
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
reader.read_exact(&mut footer_data)?;

let footer = crate::root_as_footer(&footer_data[..]).map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as footer: {err:?}"))
})?;
let verifier_options = VerifierOptions {
max_tables: self.max_footer_fb_tables,
max_depth: self.max_footer_fb_depth,
..Default::default()
};
let footer = crate::root_as_footer_with_opts(&verifier_options, &footer_data[..]).map_err(
|err| ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")),
)?;

let blocks = footer.recordBatches().ok_or_else(|| {
ArrowError::ParseError("Unable to get record batches from IPC Footer".to_string())
Expand Down Expand Up @@ -643,15 +667,15 @@ impl<R: Read + Seek> FileReader<R> {
}
}
}
let projection = match projection {
let projection = match self.projection {
Some(projection_indices) => {
let schema = schema.project(&projection_indices)?;
Some((projection_indices, schema))
}
_ => None,
};

Ok(Self {
Ok(FileReader {
reader,
schema: Arc::new(schema),
blocks: blocks.iter().copied().collect(),
Expand All @@ -663,6 +687,67 @@ impl<R: Read + Seek> FileReader<R> {
projection,
})
}
}

/// Arrow File reader
pub struct FileReader<R: Read + Seek> {
/// Buffered file reader that supports reading and seeking
reader: R,

/// The schema that is read from the file header
schema: SchemaRef,

/// The blocks in the file
///
/// A block indicates the regions in the file to read to get data
blocks: Vec<crate::Block>,

/// A counter to keep track of the current block that should be read
current_block: usize,

/// The total number of blocks, which may contain record batches and other types
total_blocks: usize,

/// Optional dictionaries for each schema field.
///
/// Dictionaries may be appended to in the streaming format.
dictionaries_by_id: HashMap<i64, ArrayRef>,

/// Metadata version
metadata_version: crate::MetadataVersion,

/// User defined metadata
custom_metadata: HashMap<String, String>,

/// Optional projection and projected_schema
projection: Option<(Vec<usize>, Schema)>,
}

impl<R: Read + Seek> fmt::Debug for FileReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
f.debug_struct("FileReader<R>")
.field("schema", &self.schema)
.field("blocks", &self.blocks)
.field("current_block", &self.current_block)
.field("total_blocks", &self.total_blocks)
.field("dictionaries_by_id", &self.dictionaries_by_id)
.field("metadata_version", &self.metadata_version)
.field("projection", &self.projection)
.finish_non_exhaustive()
}
}

impl<R: Read + Seek> FileReader<R> {
/// Try to create a new file reader
///
/// Returns errors if the file does not meet the Arrow Format footer requirements
pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
let builder = FileReaderBuilder {
projection,
..Default::default()
};
builder.build(reader)
}

/// Return user defined customized metadata
pub fn custom_metadata(&self) -> &HashMap<String, String> {
Expand Down Expand Up @@ -1622,4 +1707,57 @@ mod tests {
.unwrap();
assert_eq!(batch, roundtrip);
}

#[test]
fn test_file_with_massive_column_count() {
// 499_999 is upper limit for default settings (1_000_000)
let limit = 600_000;

let fields = (0..limit)
.map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
.collect::<Vec<_>>();
let schema = Arc::new(Schema::new(fields));
let batch = RecordBatch::new_empty(schema);

let mut buf = Vec::new();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
drop(writer);

let mut reader = FileReaderBuilder::new()
.with_max_footer_fb_tables(1_500_000)
.build(std::io::Cursor::new(buf))
.unwrap();
let roundtrip_batch = reader.next().unwrap().unwrap();

assert_eq!(batch, roundtrip_batch);
}

#[test]
fn test_file_with_deeply_nested_columns() {
// 60 is upper limit for default settings (64)
let limit = 61;

let fields = (0..limit).fold(
vec![Field::new("leaf", DataType::Boolean, false)],
|field, index| vec![Field::new_struct(format!("{index}"), field, false)],
);
let schema = Arc::new(Schema::new(fields));
let batch = RecordBatch::new_empty(schema);

let mut buf = Vec::new();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
drop(writer);

let mut reader = FileReaderBuilder::new()
.with_max_footer_fb_depth(65)
.build(std::io::Cursor::new(buf))
.unwrap();
let roundtrip_batch = reader.next().unwrap().unwrap();

assert_eq!(batch, roundtrip_batch);
}
}

0 comments on commit add8f56

Please sign in to comment.