Skip to content

Commit

Permalink
feat: Expose Parquet Schema Adapter (#10515)
Browse files Browse the repository at this point in the history
* feat: Expose Parquet Schema Adapter
  • Loading branch information
HawaiianSpork authored May 17, 2024
1 parent ea023e2 commit 4e55768
Show file tree
Hide file tree
Showing 6 changed files with 312 additions and 44 deletions.
6 changes: 4 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use crate::arrow::array::{
use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::{
FileGroupDisplay, FileSinkConfig, ParquetExec, SchemaAdapter,
DefaultSchemaAdapterFactory, FileGroupDisplay, FileSinkConfig, ParquetExec,
SchemaAdapterFactory,
};
use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
use crate::error::Result;
Expand Down Expand Up @@ -470,7 +471,8 @@ async fn fetch_statistics(
let mut null_counts = vec![Precision::Exact(0); num_fields];
let mut has_statistics = false;

let schema_adapter = SchemaAdapter::new(table_schema.clone());
let schema_adapter =
DefaultSchemaAdapterFactory::default().create(table_schema.clone());

let (mut max_values, mut min_values) = create_max_min_accs(&table_schema);

Expand Down
78 changes: 39 additions & 39 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,19 @@ mod statistics;
pub(crate) use self::csv::plan_to_csv;
pub(crate) use self::json::plan_to_json;
#[cfg(feature = "parquet")]
pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory};
pub use self::parquet::{
ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, SchemaAdapter,
SchemaAdapterFactory, SchemaMapper,
};
#[cfg(feature = "parquet")]
use arrow::{
array::new_null_array,
compute::{can_cast_types, cast},
datatypes::Schema,
record_batch::{RecordBatch, RecordBatchOptions},
};
#[cfg(feature = "parquet")]
use datafusion_common::plan_err;

pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
Expand Down Expand Up @@ -61,13 +73,7 @@ use crate::{
physical_plan::display::{display_orderings, ProjectSchemaDisplay},
};

use arrow::{
array::new_null_array,
compute::{can_cast_types, cast},
datatypes::{DataType, Schema, SchemaRef},
record_batch::{RecordBatch, RecordBatchOptions},
};
use datafusion_common::plan_err;
use arrow::datatypes::{DataType, SchemaRef};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;

Expand Down Expand Up @@ -241,39 +247,31 @@ where
Ok(())
}

/// A utility which can adapt file-level record batches to a table schema which may have a schema
/// obtained from merging multiple file-level schemas.
///
/// This is useful for enabling schema evolution in partitioned datasets.
///
/// This has to be done in two stages.
///
/// 1. Before reading the file, we have to map projected column indexes from the table schema to
/// the file schema.
///
/// 2. After reading a record batch we need to map the read columns back to the expected columns
/// indexes and insert null-valued columns wherever the file schema was missing a colum present
/// in the table schema.
#[cfg(feature = "parquet")]
#[derive(Clone, Debug, Default)]
pub(crate) struct DefaultSchemaAdapterFactory {}

#[cfg(feature = "parquet")]
impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
Box::new(DefaultSchemaAdapter { table_schema })
}
}

#[cfg(feature = "parquet")]
#[derive(Clone, Debug)]
pub(crate) struct SchemaAdapter {
pub(crate) struct DefaultSchemaAdapter {
/// Schema for the table
table_schema: SchemaRef,
}

impl SchemaAdapter {
pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter {
Self { table_schema }
}

#[cfg(feature = "parquet")]
impl SchemaAdapter for DefaultSchemaAdapter {
/// Map a column index in the table schema to a column index in a particular
/// file schema
///
/// Panics if index is not in range for the table schema
pub(crate) fn map_column_index(
&self,
index: usize,
file_schema: &Schema,
) -> Option<usize> {
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
let field = self.table_schema.field(index);
Some(file_schema.fields.find(field.name())?.0)
}
Expand All @@ -286,10 +284,10 @@ impl SchemaAdapter {
///
/// Returns a [`SchemaMapping`] that can be applied to the output batch
/// along with an ordered list of columns to project from the file
pub fn map_schema(
fn map_schema(
&self,
file_schema: &Schema,
) -> Result<(SchemaMapping, Vec<usize>)> {
) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
let mut projection = Vec::with_capacity(file_schema.fields().len());
let mut field_mappings = vec![None; self.table_schema.fields().len()];

Expand All @@ -315,17 +313,18 @@ impl SchemaAdapter {
}

Ok((
SchemaMapping {
Arc::new(SchemaMapping {
table_schema: self.table_schema.clone(),
field_mappings,
},
}),
projection,
))
}
}

/// The SchemaMapping struct holds a mapping from the file schema to the table schema
/// and any necessary type conversions that need to be applied.
#[cfg(feature = "parquet")]
#[derive(Debug)]
pub struct SchemaMapping {
/// The schema of the table. This is the expected schema after conversion and it should match the schema of the query result.
Expand All @@ -334,7 +333,8 @@ pub struct SchemaMapping {
field_mappings: Vec<Option<usize>>,
}

impl SchemaMapping {
#[cfg(feature = "parquet")]
impl SchemaMapper for SchemaMapping {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions.
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
let batch_rows = batch.num_rows();
Expand Down Expand Up @@ -636,7 +636,7 @@ mod tests {
Field::new("c3", DataType::Float64, true),
]));

let adapter = SchemaAdapter::new(table_schema.clone());
let adapter = DefaultSchemaAdapterFactory::default().create(table_schema.clone());

let file_schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Expand Down Expand Up @@ -693,7 +693,7 @@ mod tests {

let indices = vec![1, 2, 4];
let schema = SchemaRef::from(table_schema.project(&indices).unwrap());
let adapter = SchemaAdapter::new(schema);
let adapter = DefaultSchemaAdapterFactory::default().create(schema);
let (mapping, projection) = adapter.map_schema(&file_schema).unwrap();

let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
Expand Down
31 changes: 28 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::datasource::physical_plan::{
parquet::page_filter::PagePruningPredicate, DisplayAs, FileGroupPartitioner,
FileMeta, FileScanConfig, SchemaAdapter,
parquet::page_filter::PagePruningPredicate, DefaultSchemaAdapterFactory, DisplayAs,
FileGroupPartitioner, FileMeta, FileScanConfig,
};
use crate::{
config::{ConfigOptions, TableParquetOptions},
Expand Down Expand Up @@ -67,9 +67,11 @@ mod metrics;
mod page_filter;
mod row_filter;
mod row_groups;
mod schema_adapter;
mod statistics;

pub use metrics::ParquetFileMetrics;
pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};

/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
Expand All @@ -93,6 +95,8 @@ pub struct ParquetExec {
cache: PlanProperties,
/// Options for reading Parquet files
table_parquet_options: TableParquetOptions,
/// Optional user defined schema adapter
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
}

impl ParquetExec {
Expand Down Expand Up @@ -157,6 +161,7 @@ impl ParquetExec {
parquet_file_reader_factory: None,
cache,
table_parquet_options,
schema_adapter_factory: None,
}
}

Expand Down Expand Up @@ -195,6 +200,19 @@ impl ParquetExec {
self
}

/// Optional schema adapter factory.
///
/// `SchemaAdapterFactory` allows user to specify how fields from the parquet file get mapped to
/// that of the table schema. The default schema adapter uses arrow's cast library to map
/// the parquet fields to the table schema.
pub fn with_schema_adapter_factory(
mut self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Self {
self.schema_adapter_factory = Some(schema_adapter_factory);
self
}

/// If true, any filter [`Expr`]s on the scan will converted to a
/// [`RowFilter`](parquet::arrow::arrow_reader::RowFilter) in the
/// `ParquetRecordBatchStream`. These filters are applied by the
Expand Down Expand Up @@ -402,6 +420,11 @@ impl ExecutionPlan for ParquetExec {
})
})?;

let schema_adapter_factory = self
.schema_adapter_factory
.clone()
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory::default()));

let opener = ParquetOpener {
partition_index,
projection: Arc::from(projection),
Expand All @@ -418,6 +441,7 @@ impl ExecutionPlan for ParquetExec {
reorder_filters: self.reorder_filters(),
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
schema_adapter_factory,
};

let stream =
Expand Down Expand Up @@ -452,6 +476,7 @@ struct ParquetOpener {
reorder_filters: bool,
enable_page_index: bool,
enable_bloom_filter: bool,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
}

impl FileOpener for ParquetOpener {
Expand All @@ -475,7 +500,7 @@ impl FileOpener for ParquetOpener {
let batch_size = self.batch_size;
let projection = self.projection.clone();
let projected_schema = SchemaRef::from(self.table_schema.project(&projection)?);
let schema_adapter = SchemaAdapter::new(projected_schema);
let schema_adapter = self.schema_adapter_factory.create(projected_schema);
let predicate = self.predicate.clone();
let pruning_predicate = self.pruning_predicate.clone();
let page_pruning_predicate = self.page_pruning_predicate.clone();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use std::fmt::Debug;
use std::sync::Arc;

/// Factory of schema adapters.
///
/// Provides means to implement custom schema adaptation.
pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
/// Provides `SchemaAdapter` for the ParquetExec.
fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
}

/// A utility which can adapt file-level record batches to a table schema which may have a schema
/// obtained from merging multiple file-level schemas.
///
/// This is useful for enabling schema evolution in partitioned datasets.
///
/// This has to be done in two stages.
///
/// 1. Before reading the file, we have to map projected column indexes from the table schema to
/// the file schema.
///
/// 2. After reading a record batch we need to map the read columns back to the expected columns
/// indexes and insert null-valued columns wherever the file schema was missing a colum present
/// in the table schema.
pub trait SchemaAdapter: Send + Sync {
/// Map a column index in the table schema to a column index in a particular
/// file schema
///
/// Panics if index is not in range for the table schema
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;

/// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema.
///
/// If the provided `file_schema` contains columns of a different type to the expected
/// `table_schema`, the method will attempt to cast the array data from the file schema
/// to the table schema where possible.
///
/// Returns a [`SchemaMapper`] that can be applied to the output batch
/// along with an ordered list of columns to project from the file
fn map_schema(
&self,
file_schema: &Schema,
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
}

/// Transforms a RecordBatch from Parquet to a RecordBatch that meets the table schema.
pub trait SchemaMapper: Send + Sync {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions.
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
}
1 change: 1 addition & 0 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ mod filter_pushdown;
mod page_pruning;
mod row_group_pruning;
mod schema;
mod schema_adapter;
mod schema_coercion;

#[cfg(test)]
Expand Down
Loading

0 comments on commit 4e55768

Please sign in to comment.