diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index cde8bb241ee4..a0f6f6a65b1f 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -313,7 +313,6 @@ pub(crate) mod tests { use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_int64_array; - use datafusion_common::ToDFSchema; use datafusion_functions_aggregate::count::count_udaf; use datafusion_physical_expr::expressions::cast; use datafusion_physical_expr::PhysicalExpr; @@ -411,7 +410,7 @@ pub(crate) mod tests { // Return appropriate expr depending if COUNT is for col or table (*) pub(crate) fn count_expr(&self, schema: &Schema) -> Arc { AggregateExprBuilder::new(count_udaf(), vec![self.column()]) - .dfschema(schema.clone().to_dfschema().unwrap()) + .schema(Arc::new(schema.clone())) .name(self.column_name()) .build() .unwrap() diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index ab547b86f582..63e9bac81fba 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -174,7 +174,6 @@ mod tests { use crate::physical_plan::{displayable, Partitioning}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use datafusion_common::ToDFSchema; use datafusion_functions_aggregate::count::count_udaf; use datafusion_functions_aggregate::sum::sum_udaf; use datafusion_physical_expr::expressions::col; @@ -280,7 +279,7 @@ mod tests { schema: &Schema, ) -> Arc { AggregateExprBuilder::new(count_udaf(), vec![expr]) - .dfschema(schema.clone().to_dfschema().unwrap()) + .schema(schema.clone()) .name(name) .build() .unwrap() diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 31fa59af8c18..6f286c9aeba1 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -32,7 +32,6 @@ use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::{collect, displayable, ExecutionPlan}; use datafusion::prelude::{DataFrame, SessionConfig, SessionContext}; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; -use datafusion_common::ToDFSchema; use datafusion_functions_aggregate::sum::sum_udaf; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::PhysicalSortExpr; @@ -107,7 +106,7 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str let aggregate_expr = vec![ AggregateExprBuilder::new(sum_udaf(), vec![col("d", &schema).unwrap()]) - .dfschema(Arc::clone(&schema).to_dfschema().unwrap()) + .schema(Arc::clone(&schema)) .name("sum1") .build() .unwrap(), diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs index 1ed793228ded..2c1e0110ba68 100644 --- a/datafusion/expr/src/function.rs +++ b/datafusion/expr/src/function.rs @@ -20,7 +20,7 @@ use crate::physical_expr::PhysicalExpr; use crate::ColumnarValue; use crate::{Accumulator, Expr, PartitionEvaluator}; -use arrow::datatypes::{DataType, Field}; +use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::{DFSchema, Result}; use std::sync::Arc; @@ -55,6 +55,9 @@ pub struct AccumulatorArgs<'a> { /// The return type of the aggregate function. pub data_type: &'a DataType, + /// The schema of the input arguments + pub schema: &'a Schema, + /// The schema of the input arguments pub dfschema: &'a DFSchema, diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 26909471fc67..4bcb646291ce 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -278,7 +278,7 @@ impl AggregateUDFImpl for ApproxDistinct { fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { let accumulator: Box = - match &acc_args.input_exprs[0].data_type(acc_args.dfschema.as_arrow())? { + match &acc_args.input_exprs[0].data_type(acc_args.schema)? { // TODO u8, i8, u16, i16 shall really be done using bitmap, not HLL // TODO support for boolean (trivial case) // https://github.com/apache/datafusion/issues/1109 diff --git a/datafusion/functions-aggregate/src/approx_median.rs b/datafusion/functions-aggregate/src/approx_median.rs index 85b79cdd5267..f7a7be7723da 100644 --- a/datafusion/functions-aggregate/src/approx_median.rs +++ b/datafusion/functions-aggregate/src/approx_median.rs @@ -113,7 +113,7 @@ impl AggregateUDFImpl for ApproxMedian { Ok(Box::new(ApproxPercentileAccumulator::new( 0.5_f64, - acc_args.input_exprs[0].data_type(acc_args.dfschema.as_arrow())?, + acc_args.input_exprs[0].data_type(acc_args.schema)?, ))) } } diff --git a/datafusion/functions-aggregate/src/approx_percentile_cont.rs b/datafusion/functions-aggregate/src/approx_percentile_cont.rs index 03ab32e5ab03..efb9dc503a32 100644 --- a/datafusion/functions-aggregate/src/approx_percentile_cont.rs +++ b/datafusion/functions-aggregate/src/approx_percentile_cont.rs @@ -104,7 +104,7 @@ impl ApproxPercentileCont { None }; - let accumulator: ApproxPercentileAccumulator = match &args.input_exprs[0].data_type(args.dfschema.as_arrow())? { + let accumulator: ApproxPercentileAccumulator = match &args.input_exprs[0].data_type(args.schema)? { t @ (DataType::UInt8 | DataType::UInt16 | DataType::UInt32 diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index abb344eaf693..06750754006e 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -118,7 +118,7 @@ impl AggregateUDFImpl for ArrayAgg { fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { let data_type = - acc_args.input_exprs[0].data_type(acc_args.dfschema.as_arrow())?; + acc_args.input_exprs[0].data_type(acc_args.schema)?; if acc_args.is_distinct { return Ok(Box::new(DistinctArrayAggAccumulator::try_new(&data_type)?)); @@ -135,7 +135,7 @@ impl AggregateUDFImpl for ArrayAgg { let ordering_dtypes = ordering_req .iter() - .map(|e| e.expr.data_type(acc_args.dfschema.as_arrow())) + .map(|e| e.expr.data_type(acc_args.schema)) .collect::>>()?; OrderSensitiveArrayAggAccumulator::try_new( diff --git a/datafusion/functions-aggregate/src/average.rs b/datafusion/functions-aggregate/src/average.rs index f9d10426df0b..e27acb31a9d4 100644 --- a/datafusion/functions-aggregate/src/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -94,7 +94,7 @@ impl AggregateUDFImpl for Avg { use DataType::*; // instantiate specialized accumulator based for the type let input_type = - acc_args.input_exprs[0].data_type(acc_args.dfschema.as_arrow())?; + acc_args.input_exprs[0].data_type(acc_args.schema)?; match (&input_type, acc_args.data_type) { (Float64, Float64) => Ok(Box::::default()), @@ -157,7 +157,7 @@ impl AggregateUDFImpl for Avg { ) -> Result> { use DataType::*; // instantiate specialized accumulator based for the type - let sum_data_type = &args.input_exprs[0].data_type(args.dfschema.as_arrow())?; + let sum_data_type = &args.input_exprs[0].data_type(args.schema)?; match (sum_data_type, args.data_type) { (Float64, Float64) => { diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 4668f1caf101..6b241fde4e8e 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -149,7 +149,7 @@ impl AggregateUDFImpl for Count { } let data_type = - &acc_args.input_exprs[0].data_type(acc_args.dfschema.as_arrow())?; + &acc_args.input_exprs[0].data_type(acc_args.schema)?; Ok(match data_type { // try and use a specialized accumulator if possible, otherwise fall back to generic accumulator DataType::Int8 => Box::new( diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 7563506b3b65..587767b8e356 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -124,7 +124,7 @@ impl AggregateUDFImpl for FirstValue { let ordering_dtypes = ordering_req .iter() - .map(|e| e.expr.data_type(acc_args.dfschema.as_arrow())) + .map(|e| e.expr.data_type(acc_args.schema)) .collect::>>()?; // When requirement is empty, or it is signalled by outside caller that @@ -423,7 +423,7 @@ impl AggregateUDFImpl for LastValue { let ordering_dtypes = ordering_req .iter() - .map(|e| e.expr.data_type(acc_args.dfschema.as_arrow())) + .map(|e| e.expr.data_type(acc_args.schema)) .collect::>>()?; let requirement_satisfied = ordering_req.is_empty() || self.requirement_satisfied; diff --git a/datafusion/functions-aggregate/src/median.rs b/datafusion/functions-aggregate/src/median.rs index a0a1dbeb4d3c..fe12055e8712 100644 --- a/datafusion/functions-aggregate/src/median.rs +++ b/datafusion/functions-aggregate/src/median.rs @@ -133,7 +133,7 @@ impl AggregateUDFImpl for Median { }; } - let dt = &acc_args.input_exprs[0].data_type(acc_args.dfschema.as_arrow())?; + let dt = &acc_args.input_exprs[0].data_type(acc_args.schema)?; downcast_integer! { dt => (helper, dt), DataType::Float16 => helper!(Float16Type, dt), diff --git a/datafusion/functions-aggregate/src/nth_value.rs b/datafusion/functions-aggregate/src/nth_value.rs index 6362bdcc9287..af279db3ed8f 100644 --- a/datafusion/functions-aggregate/src/nth_value.rs +++ b/datafusion/functions-aggregate/src/nth_value.rs @@ -116,12 +116,12 @@ impl AggregateUDFImpl for NthValueAgg { let ordering_dtypes = ordering_req .iter() - .map(|e| e.expr.data_type(acc_args.dfschema.as_arrow())) + .map(|e| e.expr.data_type(acc_args.schema)) .collect::>>()?; NthValueAccumulator::try_new( n, - &acc_args.input_exprs[0].data_type(acc_args.dfschema.as_arrow())?, + &acc_args.input_exprs[0].data_type(acc_args.schema)?, &ordering_dtypes, ordering_req, ) diff --git a/datafusion/functions-aggregate/src/stddev.rs b/datafusion/functions-aggregate/src/stddev.rs index caa27d059729..859f42262eea 100644 --- a/datafusion/functions-aggregate/src/stddev.rs +++ b/datafusion/functions-aggregate/src/stddev.rs @@ -328,6 +328,7 @@ mod tests { let dfschema = DFSchema::empty(); let args1 = AccumulatorArgs { data_type: &DataType::Float64, + schema: &schema, dfschema: &dfschema, ignore_nulls: false, sort_exprs: &[], @@ -339,6 +340,7 @@ mod tests { let args2 = AccumulatorArgs { data_type: &DataType::Float64, + schema: &schema, dfschema: &dfschema, ignore_nulls: false, sort_exprs: &[], diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs b/datafusion/physical-expr-common/src/aggregate/mod.rs index acf1d3c4aef6..8d5d0ffaa793 100644 --- a/datafusion/physical-expr-common/src/aggregate/mod.rs +++ b/datafusion/physical-expr-common/src/aggregate/mod.rs @@ -18,9 +18,9 @@ use std::fmt::Debug; use std::{any::Any, sync::Arc}; -use arrow::datatypes::{DataType, Field, Schema}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::{exec_err, ToDFSchema}; +use datafusion_common::exec_err; use datafusion_common::{internal_err, not_impl_err, DFSchema, Result}; use datafusion_expr::function::StateFieldsArgs; use datafusion_expr::type_coercion::aggregates::check_arg_count; @@ -76,7 +76,7 @@ pub fn create_aggregate_expr( builder = builder.sort_exprs(sort_exprs.to_vec()); builder = builder.order_by(ordering_req.to_vec()); builder = builder.logical_exprs(input_exprs.to_vec()); - builder = builder.dfschema(Arc::new(schema.clone()).to_dfschema()?); + builder = builder.schema(Arc::new(schema.clone())); builder = builder.name(name); if ignore_nulls { @@ -108,7 +108,8 @@ pub fn create_aggregate_expr_with_dfschema( builder = builder.sort_exprs(sort_exprs.to_vec()); builder = builder.order_by(ordering_req.to_vec()); builder = builder.logical_exprs(input_exprs.to_vec()); - builder = builder.dfschema(dfschema.clone()); + let schema: Schema = dfschema.into(); + builder = builder.schema(Arc::new(schema)); builder = builder.name(name); if ignore_nulls { @@ -136,6 +137,8 @@ pub struct AggregateExprBuilder { /// Logical expressions of the aggregate function, it will be deprecated in logical_args: Vec, name: String, + /// Arrow Schema for the aggregate function + schema: SchemaRef, /// Datafusion Schema for the aggregate function dfschema: DFSchema, /// The logical order by expressions, it will be deprecated in @@ -157,6 +160,7 @@ impl AggregateExprBuilder { args, logical_args: vec![], name: String::new(), + schema: Arc::new(Schema::empty()), dfschema: DFSchema::empty(), sort_exprs: vec![], ordering_req: vec![], @@ -172,6 +176,7 @@ impl AggregateExprBuilder { args, logical_args, name, + schema, dfschema, sort_exprs, ordering_req, @@ -189,7 +194,7 @@ impl AggregateExprBuilder { if !ordering_req.is_empty() { let ordering_types = ordering_req .iter() - .map(|e| e.expr.data_type(dfschema.as_arrow())) + .map(|e| e.expr.data_type(&schema)) .collect::>>()?; ordering_fields = utils::ordering_fields(&ordering_req, &ordering_types); @@ -197,7 +202,7 @@ impl AggregateExprBuilder { let input_exprs_types = args .iter() - .map(|arg| arg.data_type(dfschema.as_arrow())) + .map(|arg| arg.data_type(&schema)) .collect::>>()?; check_arg_count( @@ -214,6 +219,7 @@ impl AggregateExprBuilder { logical_args, data_type, name, + schema: Arc::unwrap_or_clone(schema), dfschema, sort_exprs, ordering_req, @@ -230,6 +236,11 @@ impl AggregateExprBuilder { self } + pub fn schema(mut self, schema: SchemaRef) -> Self { + self.schema = schema; + self + } + pub fn dfschema(mut self, dfschema: DFSchema) -> Self { self.dfschema = dfschema; self @@ -444,6 +455,7 @@ pub struct AggregateFunctionExpr { /// Output / return type of this aggregate data_type: DataType, name: String, + schema: Schema, dfschema: DFSchema, // The logical order by expressions sort_exprs: Vec, @@ -509,6 +521,7 @@ impl AggregateExpr for AggregateFunctionExpr { fn create_accumulator(&self) -> Result> { let acc_args = AccumulatorArgs { data_type: &self.data_type, + schema: &self.schema, dfschema: &self.dfschema, ignore_nulls: self.ignore_nulls, sort_exprs: &self.sort_exprs, @@ -524,6 +537,7 @@ impl AggregateExpr for AggregateFunctionExpr { fn create_sliding_accumulator(&self) -> Result> { let args = AccumulatorArgs { data_type: &self.data_type, + schema: &self.schema, dfschema: &self.dfschema, ignore_nulls: self.ignore_nulls, sort_exprs: &self.sort_exprs, @@ -594,6 +608,7 @@ impl AggregateExpr for AggregateFunctionExpr { fn groups_accumulator_supported(&self) -> bool { let args = AccumulatorArgs { data_type: &self.data_type, + schema: &self.schema, dfschema: &self.dfschema, ignore_nulls: self.ignore_nulls, sort_exprs: &self.sort_exprs, @@ -608,6 +623,7 @@ impl AggregateExpr for AggregateFunctionExpr { fn create_groups_accumulator(&self) -> Result> { let args = AccumulatorArgs { data_type: &self.data_type, + schema: &self.schema, dfschema: &self.dfschema, ignore_nulls: self.ignore_nulls, sort_exprs: &self.sort_exprs, diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 2bd33a476c2f..70e11498c88f 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -30,7 +30,7 @@ use crate::{ use arrow::datatypes::Schema; use arrow_schema::{DataType, Field, SchemaRef}; -use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue, ToDFSchema}; +use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::{col, Expr, SortExpr}; use datafusion_expr::{ BuiltInWindowFunction, PartitionEvaluator, WindowFrame, WindowFunctionDefinition, @@ -127,7 +127,7 @@ pub fn create_window_expr( .collect::>(); let aggregate = AggregateExprBuilder::new(Arc::clone(fun), args.to_vec()) - .dfschema(Arc::new(input_schema.clone()).to_dfschema()?) + .schema(Arc::new(input_schema.clone())) .name(name) .order_by(order_by.to_vec()) .sort_exprs(sort_exprs) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index a79eafe43846..8932cb883e26 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -61,9 +61,7 @@ use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use datafusion::physical_plan::{ AggregateExpr, ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr, }; -use datafusion_common::{ - internal_err, not_impl_err, DataFusionError, Result, ToDFSchema, -}; +use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; use datafusion_expr::{AggregateUDF, ScalarUDF}; use crate::common::{byte_to_string, str_to_byte}; @@ -491,8 +489,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { // TODO: approx_percentile_cont and approx_percentile_cont_weight are not supported for UDAF from protobuf yet. // TODO: `order by` is not supported for UDAF yet - AggregateExprBuilder::new(agg_udf, input_phy_expr).dfschema(Arc::clone(&physical_schema).to_dfschema()?).name(name).with_ignore_nulls(agg_node.ignore_nulls).with_distinct(agg_node.distinct).build() - } + AggregateExprBuilder::new(agg_udf, input_phy_expr).schema(Arc::clone(&physical_schema)).name(name).with_ignore_nulls(agg_node.ignore_nulls).with_distinct(agg_node.distinct).build() } } }).transpose()?.ok_or_else(|| { proto_error("Invalid AggregateExpr, missing aggregate_function") diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 9c180e219b5b..213a5590b742 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -82,7 +82,7 @@ use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ - internal_err, not_impl_err, DataFusionError, Result, ToDFSchema, + internal_err, not_impl_err, DataFusionError, Result, }; use datafusion_expr::{ Accumulator, AccumulatorFactoryFunction, AggregateUDF, ColumnarValue, ScalarUDF, @@ -297,7 +297,7 @@ fn roundtrip_window() -> Result<()> { avg_udaf(), vec![cast(col("b", &schema)?, &schema, DataType::Float64)?], ) - .dfschema(Arc::clone(&schema).to_dfschema()?) + .schema(Arc::clone(&schema)) .name("avg(b)") .build()?, &[], @@ -313,7 +313,7 @@ fn roundtrip_window() -> Result<()> { let args = vec![cast(col("a", &schema)?, &schema, DataType::Float64)?]; let sum_expr = AggregateExprBuilder::new(sum_udaf(), args) - .dfschema(Arc::clone(&schema).to_dfschema()?) + .schema(Arc::clone(&schema)) .name("SUM(a) RANGE BETWEEN CURRENT ROW AND UNBOUNDED PRECEEDING") .build()?; @@ -347,17 +347,17 @@ fn rountrip_aggregate() -> Result<()> { vec![(col("a", &schema)?, "unused".to_string())]; let avg_expr = AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) - .dfschema(Arc::clone(&schema).to_dfschema()?) + .schema(Arc::clone(&schema)) .name("AVG(b)") .build()?; let nth_expr = AggregateExprBuilder::new(nth_value_udaf(), vec![col("b", &schema)?, lit(1u64)]) - .dfschema(Arc::clone(&schema).to_dfschema()?) + .schema(Arc::clone(&schema)) .name("NTH_VALUE(b, 1)") .build()?; let str_agg_expr = AggregateExprBuilder::new(string_agg_udaf(), vec![col("b", &schema)?, lit(1u64)]) - .dfschema(Arc::clone(&schema).to_dfschema()?) + .schema(Arc::clone(&schema)) .name("NTH_VALUE(b, 1)") .build()?; @@ -397,7 +397,7 @@ fn rountrip_aggregate_with_limit() -> Result<()> { let aggregates: Vec> = vec![ AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) - .dfschema(Arc::clone(&schema).to_dfschema()?) + .schema(Arc::clone(&schema)) .name("AVG(b)") .build()?, ]; @@ -464,7 +464,7 @@ fn roundtrip_aggregate_udaf() -> Result<()> { let aggregates: Vec> = vec![ AggregateExprBuilder::new(Arc::new(udaf), vec![col("b", &schema)?]) - .dfschema(Arc::clone(&schema).to_dfschema()?) + .schema(Arc::clone(&schema)) .name("example_agg") .build()?, ]; @@ -966,7 +966,7 @@ fn roundtrip_aggregate_udf_extension_codec() -> Result<()> { vec![Arc::new(Literal::new(ScalarValue::from(42)))]; let aggr_expr = AggregateExprBuilder::new(Arc::clone(&udaf), aggr_args.clone()) - .dfschema(Arc::clone(&schema).to_dfschema()?) + .schema(Arc::clone(&schema)) .name("aggregate_udf") .build()?; @@ -991,7 +991,7 @@ fn roundtrip_aggregate_udf_extension_codec() -> Result<()> { )?); let aggr_expr = AggregateExprBuilder::new(udaf, aggr_args.clone()) - .dfschema(Arc::clone(&schema).to_dfschema()?) + .schema(Arc::clone(&schema)) .name("aggregate_udf") .distinct() .ignore_nulls()