Skip to content

Commit

Permalink
Support Utf8View and BinaryView in substrait serialization. (#12199)
Browse files Browse the repository at this point in the history
* feat(12118): logical plan support for Utf8View

* feat(12118): physical plan support for Utf8View

* feat(12118): logical plan support for BinaryView

* feat(12118): physical plan support for BinaryView

* refactor(12118): remove BinaryView work-arounds, now that upstream arrow changes are in

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
wiedld and alamb authored Sep 6, 2024
1 parent dd04929 commit aed84c2
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 6 deletions.
6 changes: 5 additions & 1 deletion datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::variation_const::{
DECIMAL_128_TYPE_VARIATION_REF, DECIMAL_256_TYPE_VARIATION_REF,
DEFAULT_CONTAINER_TYPE_VARIATION_REF, DEFAULT_TYPE_VARIATION_REF,
INTERVAL_MONTH_DAY_NANO_TYPE_NAME, LARGE_CONTAINER_TYPE_VARIATION_REF,
UNSIGNED_INTEGER_TYPE_VARIATION_REF,
UNSIGNED_INTEGER_TYPE_VARIATION_REF, VIEW_CONTAINER_TYPE_VARIATION_REF,
};
#[allow(deprecated)]
use crate::variation_const::{
Expand Down Expand Up @@ -1432,6 +1432,7 @@ fn from_substrait_type(
r#type::Kind::Binary(binary) => match binary.type_variation_reference {
DEFAULT_CONTAINER_TYPE_VARIATION_REF => Ok(DataType::Binary),
LARGE_CONTAINER_TYPE_VARIATION_REF => Ok(DataType::LargeBinary),
VIEW_CONTAINER_TYPE_VARIATION_REF => Ok(DataType::BinaryView),
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {s_kind:?}"
),
Expand All @@ -1442,6 +1443,7 @@ fn from_substrait_type(
r#type::Kind::String(string) => match string.type_variation_reference {
DEFAULT_CONTAINER_TYPE_VARIATION_REF => Ok(DataType::Utf8),
LARGE_CONTAINER_TYPE_VARIATION_REF => Ok(DataType::LargeUtf8),
VIEW_CONTAINER_TYPE_VARIATION_REF => Ok(DataType::Utf8View),
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {s_kind:?}"
),
Expand Down Expand Up @@ -1759,6 +1761,7 @@ fn from_substrait_literal(
Some(LiteralType::String(s)) => match lit.type_variation_reference {
DEFAULT_CONTAINER_TYPE_VARIATION_REF => ScalarValue::Utf8(Some(s.clone())),
LARGE_CONTAINER_TYPE_VARIATION_REF => ScalarValue::LargeUtf8(Some(s.clone())),
VIEW_CONTAINER_TYPE_VARIATION_REF => ScalarValue::Utf8View(Some(s.clone())),
others => {
return substrait_err!("Unknown type variation reference {others}");
}
Expand All @@ -1768,6 +1771,7 @@ fn from_substrait_literal(
LARGE_CONTAINER_TYPE_VARIATION_REF => {
ScalarValue::LargeBinary(Some(b.clone()))
}
VIEW_CONTAINER_TYPE_VARIATION_REF => ScalarValue::BinaryView(Some(b.clone())),
others => {
return substrait_err!("Unknown type variation reference {others}");
}
Expand Down
24 changes: 23 additions & 1 deletion datafusion/substrait/src/logical_plan/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::variation_const::{
DECIMAL_128_TYPE_VARIATION_REF, DECIMAL_256_TYPE_VARIATION_REF,
DEFAULT_CONTAINER_TYPE_VARIATION_REF, DEFAULT_TYPE_VARIATION_REF,
INTERVAL_MONTH_DAY_NANO_TYPE_NAME, LARGE_CONTAINER_TYPE_VARIATION_REF,
UNSIGNED_INTEGER_TYPE_VARIATION_REF,
UNSIGNED_INTEGER_TYPE_VARIATION_REF, VIEW_CONTAINER_TYPE_VARIATION_REF,
};
use datafusion::arrow::array::{Array, GenericListArray, OffsetSizeTrait};
use datafusion::common::{
Expand Down Expand Up @@ -1450,6 +1450,12 @@ fn to_substrait_type(
nullability,
})),
}),
DataType::BinaryView => Ok(substrait::proto::Type {
kind: Some(r#type::Kind::Binary(r#type::Binary {
type_variation_reference: VIEW_CONTAINER_TYPE_VARIATION_REF,
nullability,
})),
}),
DataType::Utf8 => Ok(substrait::proto::Type {
kind: Some(r#type::Kind::String(r#type::String {
type_variation_reference: DEFAULT_CONTAINER_TYPE_VARIATION_REF,
Expand All @@ -1462,6 +1468,12 @@ fn to_substrait_type(
nullability,
})),
}),
DataType::Utf8View => Ok(substrait::proto::Type {
kind: Some(r#type::Kind::String(r#type::String {
type_variation_reference: VIEW_CONTAINER_TYPE_VARIATION_REF,
nullability,
})),
}),
DataType::List(inner) => {
let inner_type =
to_substrait_type(inner.data_type(), inner.is_nullable(), extensions)?;
Expand Down Expand Up @@ -1902,6 +1914,10 @@ fn to_substrait_literal(
LiteralType::Binary(b.clone()),
LARGE_CONTAINER_TYPE_VARIATION_REF,
),
ScalarValue::BinaryView(Some(b)) => (
LiteralType::Binary(b.clone()),
VIEW_CONTAINER_TYPE_VARIATION_REF,
),
ScalarValue::FixedSizeBinary(_, Some(b)) => (
LiteralType::FixedBinary(b.clone()),
DEFAULT_TYPE_VARIATION_REF,
Expand All @@ -1914,6 +1930,10 @@ fn to_substrait_literal(
LiteralType::String(s.clone()),
LARGE_CONTAINER_TYPE_VARIATION_REF,
),
ScalarValue::Utf8View(Some(s)) => (
LiteralType::String(s.clone()),
VIEW_CONTAINER_TYPE_VARIATION_REF,
),
ScalarValue::Decimal128(v, p, s) if v.is_some() => (
LiteralType::Decimal(Decimal {
value: v.unwrap().to_le_bytes().to_vec(),
Expand Down Expand Up @@ -2335,8 +2355,10 @@ mod test {
round_trip_type(DataType::Binary)?;
round_trip_type(DataType::FixedSizeBinary(10))?;
round_trip_type(DataType::LargeBinary)?;
round_trip_type(DataType::BinaryView)?;
round_trip_type(DataType::Utf8)?;
round_trip_type(DataType::LargeUtf8)?;
round_trip_type(DataType::Utf8View)?;
round_trip_type(DataType::Decimal128(10, 2))?;
round_trip_type(DataType::Decimal256(30, 2))?;

Expand Down
27 changes: 26 additions & 1 deletion datafusion/substrait/src/physical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ use substrait::proto::{
expression::MaskExpression, read_rel::ReadType, rel::RelType, Rel,
};

use crate::variation_const::{
DEFAULT_CONTAINER_TYPE_VARIATION_REF, LARGE_CONTAINER_TYPE_VARIATION_REF,
VIEW_CONTAINER_TYPE_VARIATION_REF,
};

/// Convert Substrait Rel to DataFusion ExecutionPlan
#[async_recursion]
pub async fn from_substrait_rel(
Expand Down Expand Up @@ -177,7 +182,27 @@ fn to_field(name: &String, r#type: &Type) -> Result<Field> {
}
Kind::String(string) => {
nullable = is_nullable(string.nullability);
Ok(DataType::Utf8)
match string.type_variation_reference {
DEFAULT_CONTAINER_TYPE_VARIATION_REF => Ok(DataType::Utf8),
LARGE_CONTAINER_TYPE_VARIATION_REF => Ok(DataType::LargeUtf8),
VIEW_CONTAINER_TYPE_VARIATION_REF => Ok(DataType::Utf8View),
_ => substrait_err!(
"Invalid type variation found for substrait string type class: {}",
string.type_variation_reference
),
}
}
Kind::Binary(binary) => {
nullable = is_nullable(binary.nullability);
match binary.type_variation_reference {
DEFAULT_CONTAINER_TYPE_VARIATION_REF => Ok(DataType::Binary),
LARGE_CONTAINER_TYPE_VARIATION_REF => Ok(DataType::LargeBinary),
VIEW_CONTAINER_TYPE_VARIATION_REF => Ok(DataType::BinaryView),
_ => substrait_err!(
"Invalid type variation found for substrait binary type class: {}",
binary.type_variation_reference
),
}
}
_ => substrait_err!(
"Unsupported kind: {:?} in the type with name {}",
Expand Down
39 changes: 37 additions & 2 deletions datafusion/substrait/src/physical_plan/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::collections::HashMap;
use substrait::proto::expression::mask_expression::{StructItem, StructSelect};
use substrait::proto::expression::MaskExpression;
use substrait::proto::r#type::{
Boolean, Fp64, Kind, Nullability, String as SubstraitString, Struct, I64,
Binary, Boolean, Fp64, Kind, Nullability, String as SubstraitString, Struct, I64,
};
use substrait::proto::read_rel::local_files::file_or_files::ParquetReadOptions;
use substrait::proto::read_rel::local_files::file_or_files::{FileFormat, PathType};
Expand All @@ -35,6 +35,11 @@ use substrait::proto::ReadRel;
use substrait::proto::Rel;
use substrait::proto::{extensions, NamedStruct, Type};

use crate::variation_const::{
DEFAULT_CONTAINER_TYPE_VARIATION_REF, LARGE_CONTAINER_TYPE_VARIATION_REF,
VIEW_CONTAINER_TYPE_VARIATION_REF,
};

/// Convert DataFusion ExecutionPlan to Substrait Rel
pub fn to_substrait_rel(
plan: &dyn ExecutionPlan,
Expand Down Expand Up @@ -155,7 +160,37 @@ fn to_substrait_type(data_type: &DataType, nullable: bool) -> Result<Type> {
}),
DataType::Utf8 => Ok(Type {
kind: Some(Kind::String(SubstraitString {
type_variation_reference: 0,
type_variation_reference: DEFAULT_CONTAINER_TYPE_VARIATION_REF,
nullability,
})),
}),
DataType::LargeUtf8 => Ok(Type {
kind: Some(Kind::String(SubstraitString {
type_variation_reference: LARGE_CONTAINER_TYPE_VARIATION_REF,
nullability,
})),
}),
DataType::Utf8View => Ok(Type {
kind: Some(Kind::String(SubstraitString {
type_variation_reference: VIEW_CONTAINER_TYPE_VARIATION_REF,
nullability,
})),
}),
DataType::Binary => Ok(Type {
kind: Some(Kind::Binary(Binary {
type_variation_reference: DEFAULT_CONTAINER_TYPE_VARIATION_REF,
nullability,
})),
}),
DataType::LargeBinary => Ok(Type {
kind: Some(Kind::Binary(Binary {
type_variation_reference: LARGE_CONTAINER_TYPE_VARIATION_REF,
nullability,
})),
}),
DataType::BinaryView => Ok(Type {
kind: Some(Kind::Binary(Binary {
type_variation_reference: VIEW_CONTAINER_TYPE_VARIATION_REF,
nullability,
})),
}),
Expand Down
1 change: 1 addition & 0 deletions datafusion/substrait/src/variation_const.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub const DATE_32_TYPE_VARIATION_REF: u32 = 0;
pub const DATE_64_TYPE_VARIATION_REF: u32 = 1;
pub const DEFAULT_CONTAINER_TYPE_VARIATION_REF: u32 = 0;
pub const LARGE_CONTAINER_TYPE_VARIATION_REF: u32 = 1;
pub const VIEW_CONTAINER_TYPE_VARIATION_REF: u32 = 2;
pub const DECIMAL_128_TYPE_VARIATION_REF: u32 = 0;
pub const DECIMAL_256_TYPE_VARIATION_REF: u32 = 1;

Expand Down
6 changes: 5 additions & 1 deletion datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,8 +716,10 @@ async fn all_type_literal() -> Result<()> {
date32_col = arrow_cast('2020-01-01', 'Date32') AND
binary_col = arrow_cast('binary', 'Binary') AND
large_binary_col = arrow_cast('large_binary', 'LargeBinary') AND
view_binary_col = arrow_cast('binary_view', 'BinaryView') AND
utf8_col = arrow_cast('utf8', 'Utf8') AND
large_utf8_col = arrow_cast('large_utf8', 'LargeUtf8');",
large_utf8_col = arrow_cast('large_utf8', 'LargeUtf8') AND
view_utf8_col = arrow_cast('utf8_view', 'Utf8View');",
)
.await
}
Expand Down Expand Up @@ -1231,9 +1233,11 @@ async fn create_all_type_context() -> Result<SessionContext> {
Field::new("date64_col", DataType::Date64, true),
Field::new("binary_col", DataType::Binary, true),
Field::new("large_binary_col", DataType::LargeBinary, true),
Field::new("view_binary_col", DataType::BinaryView, true),
Field::new("fixed_size_binary_col", DataType::FixedSizeBinary(42), true),
Field::new("utf8_col", DataType::Utf8, true),
Field::new("large_utf8_col", DataType::LargeUtf8, true),
Field::new("view_utf8_col", DataType::Utf8View, true),
Field::new_list("list_col", Field::new("item", DataType::Int64, true), true),
Field::new_list(
"large_list_col",
Expand Down

0 comments on commit aed84c2

Please sign in to comment.