From 496b1e06ae7e84de88c092df4ca2f704c8e822d6 Mon Sep 17 00:00:00 2001 From: Jax Liu Date: Mon, 22 Jul 2024 20:36:58 +0800 Subject: [PATCH] Provide DataFrame API for `map` and move `map` to `functions-array` (#11560) * move map to `functions-array` and implement dataframe api * add benchmark for dataframe api * fix format * add roundtrip_expr_api test --- datafusion/core/Cargo.toml | 5 + datafusion/core/benches/map_query_sql.rs | 93 +++++++++++++++++++ .../tests/dataframe/dataframe_functions.rs | 22 +++++ datafusion/functions-array/benches/map.rs | 37 +++++++- datafusion/functions-array/src/lib.rs | 3 + .../src/core => functions-array/src}/map.rs | 35 ++++--- datafusion/functions-array/src/planner.rs | 6 +- datafusion/functions/Cargo.toml | 5 - datafusion/functions/benches/map.rs | 80 ---------------- datafusion/functions/src/core/mod.rs | 7 -- .../tests/cases/roundtrip_logical_plan.rs | 5 + 11 files changed, 189 insertions(+), 109 deletions(-) create mode 100644 datafusion/core/benches/map_query_sql.rs rename datafusion/{functions/src/core => functions-array/src}/map.rs (83%) delete mode 100644 datafusion/functions/benches/map.rs diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index c937a6f6e59a9..4301396b231fe 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -217,3 +217,8 @@ name = "topk_aggregate" [[bench]] harness = false name = "parquet_statistic" + +[[bench]] +harness = false +name = "map_query_sql" +required-features = ["array_expressions"] diff --git a/datafusion/core/benches/map_query_sql.rs b/datafusion/core/benches/map_query_sql.rs new file mode 100644 index 0000000000000..b6ac8b6b647a1 --- /dev/null +++ b/datafusion/core/benches/map_query_sql.rs @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_array::{ArrayRef, Int32Array, RecordBatch}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use parking_lot::Mutex; +use rand::prelude::ThreadRng; +use rand::Rng; +use tokio::runtime::Runtime; + +use datafusion::prelude::SessionContext; +use datafusion_common::ScalarValue; +use datafusion_expr::Expr; +use datafusion_functions_array::map::map; + +mod data_utils; + +fn build_keys(rng: &mut ThreadRng) -> Vec { + let mut keys = vec![]; + for _ in 0..1000 { + keys.push(rng.gen_range(0..9999).to_string()); + } + keys +} + +fn build_values(rng: &mut ThreadRng) -> Vec { + let mut values = vec![]; + for _ in 0..1000 { + values.push(rng.gen_range(0..9999)); + } + values +} + +fn t_batch(num: i32) -> RecordBatch { + let value: Vec = (0..num).collect(); + let c1: ArrayRef = Arc::new(Int32Array::from(value)); + RecordBatch::try_from_iter(vec![("c1", c1)]).unwrap() +} + +fn create_context(num: i32) -> datafusion_common::Result>> { + let ctx = SessionContext::new(); + ctx.register_batch("t", t_batch(num))?; + Ok(Arc::new(Mutex::new(ctx))) +} + +fn criterion_benchmark(c: &mut Criterion) { + let ctx = create_context(1).unwrap(); + let rt = Runtime::new().unwrap(); + let df = rt.block_on(ctx.lock().table("t")).unwrap(); + + let mut rng = rand::thread_rng(); + let keys = build_keys(&mut rng); + let values = build_values(&mut rng); + let mut key_buffer = Vec::new(); + let mut value_buffer = Vec::new(); + + for i in 0..1000 { + key_buffer.push(Expr::Literal(ScalarValue::Utf8(Some(keys[i].clone())))); + value_buffer.push(Expr::Literal(ScalarValue::Int32(Some(values[i])))); + } + c.bench_function("map_1000_1", |b| { + b.iter(|| { + black_box( + rt.block_on( + df.clone() + .select(vec![map(key_buffer.clone(), value_buffer.clone())]) + .unwrap() + .collect(), + ) + .unwrap(), + ); + }); + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/core/tests/dataframe/dataframe_functions.rs b/datafusion/core/tests/dataframe/dataframe_functions.rs index 1c55c48fea40d..f7b02196d8ed5 100644 --- a/datafusion/core/tests/dataframe/dataframe_functions.rs +++ b/datafusion/core/tests/dataframe/dataframe_functions.rs @@ -34,6 +34,7 @@ use datafusion_common::{DFSchema, ScalarValue}; use datafusion_expr::expr::Alias; use datafusion_expr::ExprSchemable; use datafusion_functions_aggregate::expr_fn::{approx_median, approx_percentile_cont}; +use datafusion_functions_array::map::map; fn test_schema() -> SchemaRef { Arc::new(Schema::new(vec![ @@ -1087,3 +1088,24 @@ async fn test_fn_array_to_string() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_fn_map() -> Result<()> { + let expr = map( + vec![lit("a"), lit("b"), lit("c")], + vec![lit(1), lit(2), lit(3)], + ); + let expected = [ + "+---------------------------------------------------------------------------------------+", + "| map(make_array(Utf8(\"a\"),Utf8(\"b\"),Utf8(\"c\")),make_array(Int32(1),Int32(2),Int32(3))) |", + "+---------------------------------------------------------------------------------------+", + "| {a: 1, b: 2, c: 3} |", + "| {a: 1, b: 2, c: 3} |", + "| {a: 1, b: 2, c: 3} |", + "| {a: 1, b: 2, c: 3} |", + "+---------------------------------------------------------------------------------------+", + ]; + assert_fn_batches!(expr, expected); + + Ok(()) +} diff --git a/datafusion/functions-array/benches/map.rs b/datafusion/functions-array/benches/map.rs index 2e9b45266abc6..c2e0e641e80d2 100644 --- a/datafusion/functions-array/benches/map.rs +++ b/datafusion/functions-array/benches/map.rs @@ -17,13 +17,18 @@ extern crate criterion; +use arrow_array::{Int32Array, ListArray, StringArray}; +use arrow_buffer::{OffsetBuffer, ScalarBuffer}; +use arrow_schema::{DataType, Field}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use rand::prelude::ThreadRng; use rand::Rng; +use std::sync::Arc; use datafusion_common::ScalarValue; use datafusion_expr::planner::ExprPlanner; -use datafusion_expr::Expr; +use datafusion_expr::{ColumnarValue, Expr}; +use datafusion_functions_array::map::map_udf; use datafusion_functions_array::planner::ArrayFunctionPlanner; fn keys(rng: &mut ThreadRng) -> Vec { @@ -63,6 +68,36 @@ fn criterion_benchmark(c: &mut Criterion) { ); }); }); + + c.bench_function("map_1000", |b| { + let mut rng = rand::thread_rng(); + let field = Arc::new(Field::new("item", DataType::Utf8, true)); + let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000])); + let key_list = ListArray::new( + field, + offsets, + Arc::new(StringArray::from(keys(&mut rng))), + None, + ); + let field = Arc::new(Field::new("item", DataType::Int32, true)); + let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000])); + let value_list = ListArray::new( + field, + offsets, + Arc::new(Int32Array::from(values(&mut rng))), + None, + ); + let keys = ColumnarValue::Scalar(ScalarValue::List(Arc::new(key_list))); + let values = ColumnarValue::Scalar(ScalarValue::List(Arc::new(value_list))); + + b.iter(|| { + black_box( + map_udf() + .invoke(&[keys.clone(), values.clone()]) + .expect("map should work on valid values"), + ); + }); + }); } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/functions-array/src/lib.rs b/datafusion/functions-array/src/lib.rs index 9717d29883fd5..f68f59dcd6a12 100644 --- a/datafusion/functions-array/src/lib.rs +++ b/datafusion/functions-array/src/lib.rs @@ -41,6 +41,7 @@ pub mod extract; pub mod flatten; pub mod length; pub mod make_array; +pub mod map; pub mod planner; pub mod position; pub mod range; @@ -53,6 +54,7 @@ pub mod set_ops; pub mod sort; pub mod string; pub mod utils; + use datafusion_common::Result; use datafusion_execution::FunctionRegistry; use datafusion_expr::ScalarUDF; @@ -140,6 +142,7 @@ pub fn all_default_array_functions() -> Vec> { replace::array_replace_n_udf(), replace::array_replace_all_udf(), replace::array_replace_udf(), + map::map_udf(), ] } diff --git a/datafusion/functions/src/core/map.rs b/datafusion/functions-array/src/map.rs similarity index 83% rename from datafusion/functions/src/core/map.rs rename to datafusion/functions-array/src/map.rs index 2deef242f8a02..e218b501dcf16 100644 --- a/datafusion/functions/src/core/map.rs +++ b/datafusion/functions-array/src/map.rs @@ -15,17 +15,26 @@ // specific language governing permissions and limitations // under the License. +use crate::make_array::make_array; +use arrow::array::ArrayData; +use arrow_array::{Array, ArrayRef, MapArray, StructArray}; +use arrow_buffer::{Buffer, ToByteSlice}; +use arrow_schema::{DataType, Field, SchemaBuilder}; +use datafusion_common::{exec_err, ScalarValue}; +use datafusion_expr::expr::ScalarFunction; +use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, Volatility}; use std::any::Any; use std::collections::VecDeque; use std::sync::Arc; -use arrow::array::{Array, ArrayData, ArrayRef, MapArray, StructArray}; -use arrow::datatypes::{DataType, Field, SchemaBuilder}; -use arrow_buffer::{Buffer, ToByteSlice}; +/// Returns a map created from a key list and a value list +pub fn map(keys: Vec, values: Vec) -> Expr { + let keys = make_array(keys); + let values = make_array(values); + Expr::ScalarFunction(ScalarFunction::new_udf(map_udf(), vec![keys, values])) +} -use datafusion_common::Result; -use datafusion_common::{exec_err, ScalarValue}; -use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; +create_func!(MapFunc, map_udf); /// Check if we can evaluate the expr to constant directly. /// @@ -39,7 +48,7 @@ fn can_evaluate_to_const(args: &[ColumnarValue]) -> bool { .all(|arg| matches!(arg, ColumnarValue::Scalar(_))) } -fn make_map_batch(args: &[ColumnarValue]) -> Result { +fn make_map_batch(args: &[ColumnarValue]) -> datafusion_common::Result { if args.len() != 2 { return exec_err!( "make_map requires exactly 2 arguments, got {} instead", @@ -54,7 +63,9 @@ fn make_map_batch(args: &[ColumnarValue]) -> Result { make_map_batch_internal(key, value, can_evaluate_to_const) } -fn get_first_array_ref(columnar_value: &ColumnarValue) -> Result { +fn get_first_array_ref( + columnar_value: &ColumnarValue, +) -> datafusion_common::Result { match columnar_value { ColumnarValue::Scalar(value) => match value { ScalarValue::List(array) => Ok(array.value(0)), @@ -70,7 +81,7 @@ fn make_map_batch_internal( keys: ArrayRef, values: ArrayRef, can_evaluate_to_const: bool, -) -> Result { +) -> datafusion_common::Result { if keys.null_count() > 0 { return exec_err!("map key cannot be null"); } @@ -150,7 +161,7 @@ impl ScalarUDFImpl for MapFunc { &self.signature } - fn return_type(&self, arg_types: &[DataType]) -> Result { + fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { if arg_types.len() % 2 != 0 { return exec_err!( "map requires an even number of arguments, got {} instead", @@ -175,12 +186,12 @@ impl ScalarUDFImpl for MapFunc { )) } - fn invoke(&self, args: &[ColumnarValue]) -> Result { + fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result { make_map_batch(args) } } -fn get_element_type(data_type: &DataType) -> Result<&DataType> { +fn get_element_type(data_type: &DataType) -> datafusion_common::Result<&DataType> { match data_type { DataType::List(element) => Ok(element.data_type()), DataType::LargeList(element) => Ok(element.data_type()), diff --git a/datafusion/functions-array/src/planner.rs b/datafusion/functions-array/src/planner.rs index fbb541d9b151e..c63c2c83e66e8 100644 --- a/datafusion/functions-array/src/planner.rs +++ b/datafusion/functions-array/src/planner.rs @@ -27,6 +27,7 @@ use datafusion_expr::{ use datafusion_functions::expr_fn::get_field; use datafusion_functions_aggregate::nth_value::nth_value_udaf; +use crate::map::map_udf; use crate::{ array_has::array_has_all, expr_fn::{array_append, array_concat, array_prepend}, @@ -111,10 +112,7 @@ impl ExprPlanner for ArrayFunctionPlanner { let values = make_array(values.into_iter().map(|(_, e)| e).collect()); Ok(PlannerResult::Planned(Expr::ScalarFunction( - ScalarFunction::new_udf( - datafusion_functions::core::map(), - vec![keys, values], - ), + ScalarFunction::new_udf(map_udf(), vec![keys, values]), ))) } } diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index b143080b19626..0281676cabf2d 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -141,8 +141,3 @@ required-features = ["string_expressions"] harness = false name = "upper" required-features = ["string_expressions"] - -[[bench]] -harness = false -name = "map" -required-features = ["core_expressions"] diff --git a/datafusion/functions/benches/map.rs b/datafusion/functions/benches/map.rs deleted file mode 100644 index 811c21a41b46d..0000000000000 --- a/datafusion/functions/benches/map.rs +++ /dev/null @@ -1,80 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -extern crate criterion; - -use arrow::array::{Int32Array, ListArray, StringArray}; -use arrow::datatypes::{DataType, Field}; -use arrow_buffer::{OffsetBuffer, ScalarBuffer}; -use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use datafusion_common::ScalarValue; -use datafusion_expr::ColumnarValue; -use datafusion_functions::core::map; -use rand::prelude::ThreadRng; -use rand::Rng; -use std::sync::Arc; - -fn keys(rng: &mut ThreadRng) -> Vec { - let mut keys = vec![]; - for _ in 0..1000 { - keys.push(rng.gen_range(0..9999).to_string()); - } - keys -} - -fn values(rng: &mut ThreadRng) -> Vec { - let mut values = vec![]; - for _ in 0..1000 { - values.push(rng.gen_range(0..9999)); - } - values -} - -fn criterion_benchmark(c: &mut Criterion) { - c.bench_function("map_1000", |b| { - let mut rng = rand::thread_rng(); - let field = Arc::new(Field::new("item", DataType::Utf8, true)); - let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000])); - let key_list = ListArray::new( - field, - offsets, - Arc::new(StringArray::from(keys(&mut rng))), - None, - ); - let field = Arc::new(Field::new("item", DataType::Int32, true)); - let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000])); - let value_list = ListArray::new( - field, - offsets, - Arc::new(Int32Array::from(values(&mut rng))), - None, - ); - let keys = ColumnarValue::Scalar(ScalarValue::List(Arc::new(key_list))); - let values = ColumnarValue::Scalar(ScalarValue::List(Arc::new(value_list))); - - b.iter(|| { - black_box( - map() - .invoke(&[keys.clone(), values.clone()]) - .expect("map should work on valid values"), - ); - }); - }); -} - -criterion_group!(benches, criterion_benchmark); -criterion_main!(benches); diff --git a/datafusion/functions/src/core/mod.rs b/datafusion/functions/src/core/mod.rs index ee0309e593820..8c51213972843 100644 --- a/datafusion/functions/src/core/mod.rs +++ b/datafusion/functions/src/core/mod.rs @@ -25,7 +25,6 @@ pub mod arrowtypeof; pub mod coalesce; pub mod expr_ext; pub mod getfield; -pub mod map; pub mod named_struct; pub mod nullif; pub mod nvl; @@ -43,7 +42,6 @@ make_udf_function!(r#struct::StructFunc, STRUCT, r#struct); make_udf_function!(named_struct::NamedStructFunc, NAMED_STRUCT, named_struct); make_udf_function!(getfield::GetFieldFunc, GET_FIELD, get_field); make_udf_function!(coalesce::CoalesceFunc, COALESCE, coalesce); -make_udf_function!(map::MapFunc, MAP, map); pub mod expr_fn { use datafusion_expr::{Expr, Literal}; @@ -80,10 +78,6 @@ pub mod expr_fn { coalesce, "Returns `coalesce(args...)`, which evaluates to the value of the first expr which is not NULL", args, - ),( - map, - "Returns a map created from a key list and a value list", - args, )); #[doc = "Returns the value of the field with the given name from the struct"] @@ -101,6 +95,5 @@ pub fn functions() -> Vec> { arrow_typeof(), named_struct(), coalesce(), - map(), ] } diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index b125b5f664238..f722d4cd86f85 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -43,6 +43,7 @@ use datafusion::functions_aggregate::expr_fn::{ count_distinct, covar_pop, covar_samp, first_value, grouping, median, stddev, stddev_pop, sum, var_pop, var_sample, }; +use datafusion::functions_array::map::map; use datafusion::prelude::*; use datafusion::test_util::{TestTableFactory, TestTableProvider}; use datafusion_common::config::TableOptions; @@ -727,6 +728,10 @@ async fn roundtrip_expr_api() -> Result<()> { bool_or(lit(true)), array_agg(lit(1)), array_agg(lit(1)).distinct().build().unwrap(), + map( + vec![lit(1), lit(2), lit(3)], + vec![lit(10), lit(20), lit(30)], + ), ]; // ensure expressions created with the expr api can be round tripped