Skip to content

Commit

Permalink
feat: Layout metadata reader and column statistics (#1455)
Browse files Browse the repository at this point in the history
Adds a dedicated file metadata reader and uses it to provide DataFusion
with file-level statistics.

---------

Co-authored-by: Will Manning <[email protected]>
Co-authored-by: Daniel King <[email protected]>
  • Loading branch information
3 people authored Nov 26, 2024
1 parent 866d892 commit 630835b
Show file tree
Hide file tree
Showing 16 changed files with 486 additions and 40 deletions.
2 changes: 1 addition & 1 deletion bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
use vortex::{ArrayData, IntoArrayData, IntoCanonical};

static DISPATCHER: LazyLock<Arc<IoDispatcher>> =
LazyLock::new(|| Arc::new(IoDispatcher::new_tokio(1)));
LazyLock::new(|| Arc::new(IoDispatcher::default()));

pub const BATCH_SIZE: usize = 65_536;

Expand Down
6 changes: 6 additions & 0 deletions vortex-buffer/src/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ impl From<String> for BufferString {
}
}

impl From<&str> for BufferString {
fn from(value: &str) -> Self {
Self(Buffer::from(String::from(value).into_bytes()))
}
}

impl TryFrom<Buffer> for BufferString {
type Error = Utf8Error;

Expand Down
60 changes: 51 additions & 9 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::any::Any;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
Expand All @@ -9,18 +9,25 @@ use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use datafusion::execution::SessionState;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
use datafusion_common::{not_impl_err, DataFusionError, Result as DFResult, Statistics};
use datafusion_common::{
not_impl_err, ColumnStatistics, DataFusionError, Result as DFResult, Statistics,
};
use datafusion_expr::Expr;
use datafusion_physical_expr::{LexRequirement, PhysicalExpr};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::ExecutionPlan;
use object_store::{ObjectMeta, ObjectStore};
use vortex_array::arrow::infer_schema;
use vortex_array::Context;
use vortex_file::{read_initial_bytes, VORTEX_FILE_EXTENSION};
use vortex_io::ObjectStoreReadAt;
use vortex_file::metadata::MetadataFetcher;
use vortex_file::{
read_initial_bytes, LayoutContext, LayoutDeserializer, LayoutMessageCache, RelativeLayoutCache,
Scan, VORTEX_FILE_EXTENSION,
};
use vortex_io::{IoDispatcher, ObjectStoreReadAt};

use super::execution::VortexExec;
use super::statistics::array_to_col_statistics;
use crate::can_be_pushed_down;

#[derive(Debug, Default)]
Expand Down Expand Up @@ -86,13 +93,48 @@ impl FileFormat for VortexFormat {
let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone());
let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?;
let layout = initial_read.fb_layout()?;
let dtype = initial_read.lazy_dtype().map_err(|e| {
DataFusionError::External(Box::new(
e.with_context("Failed to fetch dtype from initial read"),
))
})?;
let row_count = layout.row_count();

let stats = Statistics {
num_rows: Precision::Exact(row_count as usize),
total_byte_size: Precision::Absent,
column_statistics: Statistics::unknown_column(&table_schema),
};
let layout_deserializer =
LayoutDeserializer::new(Context::default().into(), LayoutContext::default().into());
let layout_message_cache = Arc::new(RwLock::new(LayoutMessageCache::new()));
let relative_message_cache =
RelativeLayoutCache::new(layout_message_cache.clone(), dtype.into());

let root_layout = vortex_file::read_layout_from_initial(
&initial_read,
&layout_deserializer,
Scan::empty(),
relative_message_cache,
)?;

let io = IoDispatcher::default();
let mut stats = Statistics::new_unknown(&table_schema);
stats.num_rows = Precision::Exact(row_count as usize);

let metadata_table =
MetadataFetcher::fetch(os_read_at, io.into(), root_layout, layout_message_cache)
.await?;

if let Some(metadata) = metadata_table {
let mut column_statistics = Vec::with_capacity(table_schema.fields().len());

for col_stats in metadata.into_iter() {
let col_stats = match col_stats {
Some(array) => array_to_col_statistics(array.try_into()?)?,
None => ColumnStatistics::new_unknown(),
};

column_statistics.push(col_stats);
}

stats.column_statistics = column_statistics;
}

Ok(stats)
}
Expand Down
1 change: 1 addition & 0 deletions vortex-datafusion/src/persistent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod config;
pub mod execution;
pub mod format;
pub mod opener;
pub mod statistics;
2 changes: 1 addition & 1 deletion vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use vortex_io::{IoDispatcher, ObjectStoreReadAt};

/// Share an IO dispatcher across all DataFusion instances.
static IO_DISPATCHER: LazyLock<Arc<IoDispatcher>> =
LazyLock::new(|| Arc::new(IoDispatcher::new_tokio(1)));
LazyLock::new(|| Arc::new(IoDispatcher::default()));

pub struct VortexFileOpener {
pub ctx: Arc<Context>,
Expand Down
42 changes: 42 additions & 0 deletions vortex-datafusion/src/persistent/statistics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use arrow_array::cast::AsArray;
use arrow_array::types::UInt64Type;
use datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_common::stats::Precision;
use datafusion_common::ColumnStatistics;
use datafusion_expr::Accumulator;
use vortex_array::array::StructArray;
use vortex_array::variants::StructArrayTrait as _;
use vortex_array::IntoCanonical;
use vortex_error::VortexResult;

pub fn array_to_col_statistics(array: StructArray) -> VortexResult<ColumnStatistics> {
let mut stats = ColumnStatistics::new_unknown();

if let Some(null_count_array) = array.field_by_name("null_count") {
let array = null_count_array.into_canonical()?.into_arrow()?;
let array = array.as_primitive::<UInt64Type>();

let null_count = array.iter().map(|v| v.unwrap_or_default()).sum::<u64>();
stats.null_count = Precision::Exact(null_count as usize);
}

if let Some(max_value_array) = array.field_by_name("max") {
let array = max_value_array.into_canonical()?.into_arrow()?;
let mut acc = MaxAccumulator::try_new(array.data_type())?;
acc.update_batch(&[array])?;

let max_val = acc.evaluate()?;
stats.max_value = Precision::Exact(max_val)
}

if let Some(min_value_array) = array.field_by_name("min") {
let array = min_value_array.into_canonical()?.into_arrow()?;
let mut acc = MinAccumulator::try_new(array.data_type())?;
acc.update_batch(&[array])?;

let max_val = acc.evaluate()?;
stats.min_value = Precision::Exact(max_val)
}

Ok(stats)
}
11 changes: 2 additions & 9 deletions vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ futures-executor = { workspace = true }
futures-util = { workspace = true }
itertools = { workspace = true }
once_cell = { workspace = true }
tokio = { workspace = true, features = [
"io-util",
"fs",
"rt-multi-thread",
] }
tokio = { workspace = true, features = ["io-util", "fs", "rt-multi-thread"] }
tracing = { workspace = true, optional = true }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
Expand All @@ -50,8 +46,5 @@ workspace = true

[features]
futures = ["futures-util/io", "vortex-io/futures"]
object_store = [
"vortex-error/object_store",
"vortex-io/object_store",
]
object_store = ["vortex-error/object_store", "vortex-io/object_store"]
tracing = ["dep:tracing", "vortex-io/tracing"]
4 changes: 1 addition & 3 deletions vortex-file/src/read/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,7 @@ impl<R: VortexReadAt> VortexReadBuilder<R> {
.transpose()?;

// Default: fallback to single-threaded tokio dispatcher.
let io_dispatcher = self
.io_dispatcher
.unwrap_or_else(|| Arc::new(IoDispatcher::new_tokio(1)));
let io_dispatcher = self.io_dispatcher.unwrap_or_default();

Ok(VortexFileArrayStream::new(
self.read_at,
Expand Down
32 changes: 28 additions & 4 deletions vortex-file/src/read/layouts/chunked.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::BTreeSet;
use std::sync::RwLock;
use std::sync::{OnceLock, RwLock};

use bytes::Bytes;
use itertools::Itertools;
Expand All @@ -14,7 +14,7 @@ use crate::read::cache::RelativeLayoutCache;
use crate::read::mask::RowMask;
use crate::{
BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, MessageLocator,
Scan, CHUNKED_LAYOUT_ID,
MetadataRead, Scan, CHUNKED_LAYOUT_ID,
};

#[derive(Default, Debug)]
Expand Down Expand Up @@ -84,7 +84,7 @@ impl ChunkedLayoutBuilder {
self.fb_bytes.clone(),
metadata_fb._tab.loc(),
// TODO(robert): Create stats projection
Scan::new(None),
Scan::empty(),
self.message_cache.unknown_dtype(METADATA_LAYOUT_PART_ID),
)
})
Expand Down Expand Up @@ -170,6 +170,7 @@ pub struct ChunkedLayoutReader {
layouts: Vec<RangedLayoutReader>,
metadata_layout: Option<Box<dyn LayoutReader>>,
in_progress_ranges: InProgressLayoutRanges,
cached_metadata: OnceLock<ArrayData>,
}

impl ChunkedLayoutReader {
Expand All @@ -181,6 +182,7 @@ impl ChunkedLayoutReader {
layouts,
metadata_layout,
in_progress_ranges: RwLock::new(HashMap::new()),
cached_metadata: OnceLock::new(),
}
}

Expand Down Expand Up @@ -234,7 +236,6 @@ impl ChunkedLayoutReader {
self.layouts.len()
}

#[allow(dead_code)]
pub fn metadata_layout(&self) -> Option<&dyn LayoutReader> {
self.metadata_layout.as_deref()
}
Expand Down Expand Up @@ -277,6 +278,29 @@ impl LayoutReader for ChunkedLayoutReader {
Ok(None)
}
}

fn read_metadata(&self) -> VortexResult<MetadataRead> {
match self.metadata_layout() {
None => Ok(MetadataRead::None),
Some(metadata_layout) => {
if let Some(md) = self.cached_metadata.get() {
return Ok(MetadataRead::Batches(vec![Some(md.clone())]));
}

match metadata_layout
.read_selection(&RowMask::new_valid_between(0, self.n_chunks()))?
{
Some(BatchRead::Batch(array)) => {
// We don't care if the write failed
_ = self.cached_metadata.set(array.clone());
Ok(MetadataRead::Batches(vec![Some(array)]))
}
Some(BatchRead::ReadMore(messages)) => Ok(MetadataRead::ReadMore(messages)),
None => Ok(MetadataRead::None),
}
}
}
}
}

#[cfg(test)]
Expand Down
48 changes: 44 additions & 4 deletions vortex-file/src/read/layouts/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ use vortex_array::stats::ArrayStatistics;
use vortex_array::validity::Validity;
use vortex_array::{ArrayData, IntoArrayData};
use vortex_dtype::field::Field;
use vortex_dtype::FieldNames;
use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};
use vortex_dtype::{FieldName, FieldNames};
use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexExpect, VortexResult};
use vortex_expr::{Column, Select, VortexExpr};
use vortex_flatbuffers::footer;

use crate::read::cache::{LazyDType, RelativeLayoutCache};
use crate::read::expr_project::expr_project;
use crate::read::mask::RowMask;
use crate::{
BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutReader, RowFilter, Scan,
BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutReader, MetadataRead, RowFilter, Scan,
COLUMNAR_LAYOUT_ID,
};

Expand Down Expand Up @@ -203,6 +203,7 @@ pub struct ColumnarLayoutReader {
expr: Option<Arc<dyn VortexExpr>>,
// TODO(robert): This is a hack/optimization that tells us if we're reducing results with AND or not
shortcircuit_siblings: bool,
in_progress_metadata: RwLock<HashMap<FieldName, Option<ArrayData>>>,
}

impl ColumnarLayoutReader {
Expand All @@ -220,9 +221,10 @@ impl ColumnarLayoutReader {
Self {
names,
children,
in_progress_ranges: RwLock::new(HashMap::new()),
expr,
shortcircuit_siblings,
in_progress_ranges: RwLock::new(HashMap::new()),
in_progress_metadata: RwLock::new(HashMap::new()),
}
}
}
Expand Down Expand Up @@ -303,6 +305,44 @@ impl LayoutReader for ColumnarLayoutReader {
Ok(Some(BatchRead::ReadMore(messages)))
}
}

fn read_metadata(&self) -> VortexResult<MetadataRead> {
let mut in_progress_metadata = self
.in_progress_metadata
.write()
.unwrap_or_else(|e| vortex_panic!("lock is poisoned: {e}"));
let mut messages = Vec::default();

for (name, child_reader) in self.names.iter().zip(self.children.iter()) {
match child_reader.read_metadata()? {
MetadataRead::Batches(data) => {
if data.len() != 1 {
vortex_bail!("expected exactly one metadata array per-child");
}
in_progress_metadata.insert(name.clone(), data[0].clone());
}
MetadataRead::ReadMore(rm) => {
messages.extend(rm);
}
MetadataRead::None => {
in_progress_metadata.insert(name.clone(), None);
}
}
}

// We're done reading
if messages.is_empty() {
let child_arrays = self
.names
.iter()
.map(|name| in_progress_metadata[name].clone()) // TODO(Adam): Some columns might not have statistics
.collect::<Vec<_>>();

Ok(MetadataRead::Batches(child_arrays))
} else {
Ok(MetadataRead::ReadMore(messages))
}
}
}

#[cfg(test)]
Expand Down
8 changes: 6 additions & 2 deletions vortex-file/src/read/layouts/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use vortex_ipc::stream_writer::ByteRange;
use crate::read::cache::RelativeLayoutCache;
use crate::read::mask::RowMask;
use crate::{
BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutReader, MessageLocator, Scan,
FLAT_LAYOUT_ID,
BatchRead, Layout, LayoutDeserializer, LayoutId, LayoutReader, MessageLocator, MetadataRead,
Scan, FLAT_LAYOUT_ID,
};

#[derive(Debug)]
Expand Down Expand Up @@ -116,6 +116,10 @@ impl LayoutReader for FlatLayoutReader {
Ok(Some(BatchRead::ReadMore(vec![self.own_message()])))
}
}

fn read_metadata(&self) -> VortexResult<MetadataRead> {
Ok(MetadataRead::None)
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 630835b

Please sign in to comment.