Skip to content

Commit

Permalink
Provide DataFrame API for map and move map to functions-array (#…
Browse files Browse the repository at this point in the history
…11560)

* move map to `functions-array` and implement dataframe api

* add benchmark for dataframe api

* fix format

* add roundtrip_expr_api test
  • Loading branch information
goldmedal authored Jul 22, 2024
1 parent 5c65efc commit 51da92f
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 109 deletions.
5 changes: 5 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,8 @@ name = "topk_aggregate"
[[bench]]
harness = false
name = "parquet_statistic"

[[bench]]
harness = false
name = "map_query_sql"
required-features = ["array_expressions"]
93 changes: 93 additions & 0 deletions datafusion/core/benches/map_query_sql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use parking_lot::Mutex;
use rand::prelude::ThreadRng;
use rand::Rng;
use tokio::runtime::Runtime;

use datafusion::prelude::SessionContext;
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
use datafusion_functions_array::map::map;

mod data_utils;

fn build_keys(rng: &mut ThreadRng) -> Vec<String> {
let mut keys = vec![];
for _ in 0..1000 {
keys.push(rng.gen_range(0..9999).to_string());
}
keys
}

fn build_values(rng: &mut ThreadRng) -> Vec<i32> {
let mut values = vec![];
for _ in 0..1000 {
values.push(rng.gen_range(0..9999));
}
values
}

fn t_batch(num: i32) -> RecordBatch {
let value: Vec<i32> = (0..num).collect();
let c1: ArrayRef = Arc::new(Int32Array::from(value));
RecordBatch::try_from_iter(vec![("c1", c1)]).unwrap()
}

fn create_context(num: i32) -> datafusion_common::Result<Arc<Mutex<SessionContext>>> {
let ctx = SessionContext::new();
ctx.register_batch("t", t_batch(num))?;
Ok(Arc::new(Mutex::new(ctx)))
}

fn criterion_benchmark(c: &mut Criterion) {
let ctx = create_context(1).unwrap();
let rt = Runtime::new().unwrap();
let df = rt.block_on(ctx.lock().table("t")).unwrap();

let mut rng = rand::thread_rng();
let keys = build_keys(&mut rng);
let values = build_values(&mut rng);
let mut key_buffer = Vec::new();
let mut value_buffer = Vec::new();

for i in 0..1000 {
key_buffer.push(Expr::Literal(ScalarValue::Utf8(Some(keys[i].clone()))));
value_buffer.push(Expr::Literal(ScalarValue::Int32(Some(values[i]))));
}
c.bench_function("map_1000_1", |b| {
b.iter(|| {
black_box(
rt.block_on(
df.clone()
.select(vec![map(key_buffer.clone(), value_buffer.clone())])
.unwrap()
.collect(),
)
.unwrap(),
);
});
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
22 changes: 22 additions & 0 deletions datafusion/core/tests/dataframe/dataframe_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::expr::Alias;
use datafusion_expr::ExprSchemable;
use datafusion_functions_aggregate::expr_fn::{approx_median, approx_percentile_cont};
use datafusion_functions_array::map::map;

fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -1087,3 +1088,24 @@ async fn test_fn_array_to_string() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_fn_map() -> Result<()> {
let expr = map(
vec![lit("a"), lit("b"), lit("c")],
vec![lit(1), lit(2), lit(3)],
);
let expected = [
"+---------------------------------------------------------------------------------------+",
"| map(make_array(Utf8(\"a\"),Utf8(\"b\"),Utf8(\"c\")),make_array(Int32(1),Int32(2),Int32(3))) |",
"+---------------------------------------------------------------------------------------+",
"| {a: 1, b: 2, c: 3} |",
"| {a: 1, b: 2, c: 3} |",
"| {a: 1, b: 2, c: 3} |",
"| {a: 1, b: 2, c: 3} |",
"+---------------------------------------------------------------------------------------+",
];
assert_fn_batches!(expr, expected);

Ok(())
}
37 changes: 36 additions & 1 deletion datafusion/functions-array/benches/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@

extern crate criterion;

use arrow_array::{Int32Array, ListArray, StringArray};
use arrow_buffer::{OffsetBuffer, ScalarBuffer};
use arrow_schema::{DataType, Field};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use rand::prelude::ThreadRng;
use rand::Rng;
use std::sync::Arc;

use datafusion_common::ScalarValue;
use datafusion_expr::planner::ExprPlanner;
use datafusion_expr::Expr;
use datafusion_expr::{ColumnarValue, Expr};
use datafusion_functions_array::map::map_udf;
use datafusion_functions_array::planner::ArrayFunctionPlanner;

fn keys(rng: &mut ThreadRng) -> Vec<String> {
Expand Down Expand Up @@ -63,6 +68,36 @@ fn criterion_benchmark(c: &mut Criterion) {
);
});
});

c.bench_function("map_1000", |b| {
let mut rng = rand::thread_rng();
let field = Arc::new(Field::new("item", DataType::Utf8, true));
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
let key_list = ListArray::new(
field,
offsets,
Arc::new(StringArray::from(keys(&mut rng))),
None,
);
let field = Arc::new(Field::new("item", DataType::Int32, true));
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
let value_list = ListArray::new(
field,
offsets,
Arc::new(Int32Array::from(values(&mut rng))),
None,
);
let keys = ColumnarValue::Scalar(ScalarValue::List(Arc::new(key_list)));
let values = ColumnarValue::Scalar(ScalarValue::List(Arc::new(value_list)));

b.iter(|| {
black_box(
map_udf()
.invoke(&[keys.clone(), values.clone()])
.expect("map should work on valid values"),
);
});
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
3 changes: 3 additions & 0 deletions datafusion/functions-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub mod extract;
pub mod flatten;
pub mod length;
pub mod make_array;
pub mod map;
pub mod planner;
pub mod position;
pub mod range;
Expand All @@ -53,6 +54,7 @@ pub mod set_ops;
pub mod sort;
pub mod string;
pub mod utils;

use datafusion_common::Result;
use datafusion_execution::FunctionRegistry;
use datafusion_expr::ScalarUDF;
Expand Down Expand Up @@ -140,6 +142,7 @@ pub fn all_default_array_functions() -> Vec<Arc<ScalarUDF>> {
replace::array_replace_n_udf(),
replace::array_replace_all_udf(),
replace::array_replace_udf(),
map::map_udf(),
]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,26 @@
// specific language governing permissions and limitations
// under the License.

use crate::make_array::make_array;
use arrow::array::ArrayData;
use arrow_array::{Array, ArrayRef, MapArray, StructArray};
use arrow_buffer::{Buffer, ToByteSlice};
use arrow_schema::{DataType, Field, SchemaBuilder};
use datafusion_common::{exec_err, ScalarValue};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, Volatility};
use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;

use arrow::array::{Array, ArrayData, ArrayRef, MapArray, StructArray};
use arrow::datatypes::{DataType, Field, SchemaBuilder};
use arrow_buffer::{Buffer, ToByteSlice};
/// Returns a map created from a key list and a value list
pub fn map(keys: Vec<Expr>, values: Vec<Expr>) -> Expr {
let keys = make_array(keys);
let values = make_array(values);
Expr::ScalarFunction(ScalarFunction::new_udf(map_udf(), vec![keys, values]))
}

use datafusion_common::Result;
use datafusion_common::{exec_err, ScalarValue};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
create_func!(MapFunc, map_udf);

/// Check if we can evaluate the expr to constant directly.
///
Expand All @@ -39,7 +48,7 @@ fn can_evaluate_to_const(args: &[ColumnarValue]) -> bool {
.all(|arg| matches!(arg, ColumnarValue::Scalar(_)))
}

fn make_map_batch(args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn make_map_batch(args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarValue> {
if args.len() != 2 {
return exec_err!(
"make_map requires exactly 2 arguments, got {} instead",
Expand All @@ -54,7 +63,9 @@ fn make_map_batch(args: &[ColumnarValue]) -> Result<ColumnarValue> {
make_map_batch_internal(key, value, can_evaluate_to_const)
}

fn get_first_array_ref(columnar_value: &ColumnarValue) -> Result<ArrayRef> {
fn get_first_array_ref(
columnar_value: &ColumnarValue,
) -> datafusion_common::Result<ArrayRef> {
match columnar_value {
ColumnarValue::Scalar(value) => match value {
ScalarValue::List(array) => Ok(array.value(0)),
Expand All @@ -70,7 +81,7 @@ fn make_map_batch_internal(
keys: ArrayRef,
values: ArrayRef,
can_evaluate_to_const: bool,
) -> Result<ColumnarValue> {
) -> datafusion_common::Result<ColumnarValue> {
if keys.null_count() > 0 {
return exec_err!("map key cannot be null");
}
Expand Down Expand Up @@ -150,7 +161,7 @@ impl ScalarUDFImpl for MapFunc {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
if arg_types.len() % 2 != 0 {
return exec_err!(
"map requires an even number of arguments, got {} instead",
Expand All @@ -175,12 +186,12 @@ impl ScalarUDFImpl for MapFunc {
))
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result<ColumnarValue> {
make_map_batch(args)
}
}

fn get_element_type(data_type: &DataType) -> Result<&DataType> {
fn get_element_type(data_type: &DataType) -> datafusion_common::Result<&DataType> {
match data_type {
DataType::List(element) => Ok(element.data_type()),
DataType::LargeList(element) => Ok(element.data_type()),
Expand Down
6 changes: 2 additions & 4 deletions datafusion/functions-array/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion_expr::{
use datafusion_functions::expr_fn::get_field;
use datafusion_functions_aggregate::nth_value::nth_value_udaf;

use crate::map::map_udf;
use crate::{
array_has::array_has_all,
expr_fn::{array_append, array_concat, array_prepend},
Expand Down Expand Up @@ -111,10 +112,7 @@ impl ExprPlanner for ArrayFunctionPlanner {
let values = make_array(values.into_iter().map(|(_, e)| e).collect());

Ok(PlannerResult::Planned(Expr::ScalarFunction(
ScalarFunction::new_udf(
datafusion_functions::core::map(),
vec![keys, values],
),
ScalarFunction::new_udf(map_udf(), vec![keys, values]),
)))
}
}
Expand Down
5 changes: 0 additions & 5 deletions datafusion/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,3 @@ required-features = ["string_expressions"]
harness = false
name = "upper"
required-features = ["string_expressions"]

[[bench]]
harness = false
name = "map"
required-features = ["core_expressions"]
Loading

0 comments on commit 51da92f

Please sign in to comment.