Skip to content

Commit

Permalink
perf(expr): new interface for expression directly returning scalar (#…
Browse files Browse the repository at this point in the history
…9049)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Apr 11, 2023
1 parent 27c767d commit 5b5b2e6
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 67 deletions.
9 changes: 9 additions & 0 deletions src/expr/benches/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@ fn bench_expr(c: &mut Criterion) {
.to_async(FuturesExecutor)
.iter(|| constant.eval(&input))
});
c.bench_function("extract(constant)", |bencher| {
let extract = build_from_pretty(format!(
"(extract:decimal HOUR:varchar ${}:timestamp)",
input_index_for_type(DataType::Timestamp)
));
bencher
.to_async(FuturesExecutor)
.iter(|| extract.eval(&input))
});

let sigs = func_sigs();
let sigs = sigs.sorted_by_cached_key(|sig| format!("{sig:?}"));
Expand Down
41 changes: 9 additions & 32 deletions src/expr/src/expr/expr_literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@
// limitations under the License.

use std::convert::TryFrom;
use std::sync::Arc;

use risingwave_common::array::{ArrayBuilder, ArrayBuilderImpl, ArrayRef, DataChunk};
use risingwave_common::for_all_variants;
use risingwave_common::array::DataChunk;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{literal_type_match, DataType, Datum, Scalar, ScalarImpl};
use risingwave_common::types::{literal_type_match, DataType, Datum};
use risingwave_common::util::value_encoding::deserialize_datum;
use risingwave_pb::expr::expr_node::{RexNode, Type};
use risingwave_pb::expr::ExprNode;

use super::ValueImpl;
use crate::expr::Expression;
use crate::{bail, ensure, ExprError, Result};

Expand All @@ -39,33 +38,11 @@ impl Expression for LiteralExpression {
self.return_type.clone()
}

async fn eval(&self, input: &DataChunk) -> Result<ArrayRef> {
let mut array_builder = self.return_type.create_array_builder(input.capacity());
let capacity = input.capacity();
let builder = &mut array_builder;
let literal = &self.literal;

macro_rules! array_impl_literal_append {
($( { $variant_name:ident, $suffix_name:ident, $array:ty, $builder:ty } ),*) => {
match (builder, literal) {
$(
(ArrayBuilderImpl::$variant_name(inner), Some(ScalarImpl::$variant_name(v))) => {
inner.append_n(capacity, Some(v.as_scalar_ref()));
}
(ArrayBuilderImpl::$variant_name(inner), None) => {
inner.append_n(capacity, None);
}
)*
(_, _) => $crate::bail!(
"Do not support values in insert values executor".to_string()
),
}
};
}

for_all_variants! { array_impl_literal_append }

Ok(Arc::new(array_builder.finish()))
async fn eval_v2(&self, input: &DataChunk) -> Result<ValueImpl> {
Ok(ValueImpl::Scalar {
value: self.literal.clone(),
capacity: input.capacity(),
})
}

async fn eval_row(&self, _input: &OwnedRow) -> Result<Datum> {
Expand Down Expand Up @@ -126,7 +103,7 @@ mod tests {
use risingwave_common::array::{I32Array, StructValue};
use risingwave_common::array_nonnull;
use risingwave_common::types::test_utils::IntervalTestExt;
use risingwave_common::types::{Decimal, Interval, IntoOrdered};
use risingwave_common::types::{Decimal, Interval, IntoOrdered, Scalar, ScalarImpl};
use risingwave_common::util::value_encoding::serialize_datum;
use risingwave_pb::data::data_type::{IntervalType, TypeName};
use risingwave_pb::data::{PbDataType, PbDatum};
Expand Down
35 changes: 29 additions & 6 deletions src/expr/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ pub(crate) mod data_types;
pub(crate) mod template;
pub(crate) mod template_fast;
pub mod test_utils;
mod value;

use std::sync::Arc;

use futures_util::TryFutureExt;
use risingwave_common::array::{ArrayRef, DataChunk};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, Datum};
Expand All @@ -76,9 +78,14 @@ pub use self::agg::AggKind;
pub use self::build::*;
pub use self::expr_input_ref::InputRefExpression;
pub use self::expr_literal::LiteralExpression;
pub use self::value::{ValueImpl, ValueRef};
use super::{ExprError, Result};

/// Instance of an expression
/// Interface of an expression.
///
/// There're two functions to evaluate an expression: `eval` and `eval_v2`, exactly one of them
/// should be implemented. Prefer calling and implementing `eval_v2` instead of `eval` if possible,
/// to gain the performance benefit of scalar expression.
#[async_trait::async_trait]
pub trait Expression: std::fmt::Debug + Sync + Send {
/// Get the return data type.
Expand All @@ -94,14 +101,30 @@ pub trait Expression: std::fmt::Debug + Sync + Send {
Ok(res)
}

/// Evaluate the expression
/// Evaluate the expression in vectorized execution. Returns an array.
///
/// # Arguments
/// The default implementation calls `eval_v2` and always converts the result to an array.
async fn eval(&self, input: &DataChunk) -> Result<ArrayRef> {
let value = self.eval_v2(input).await?;
Ok(match value {
ValueImpl::Array(array) => array,
ValueImpl::Scalar { value, capacity } => {
let mut builder = self.return_type().create_array_builder(capacity);
builder.append_datum_n(capacity, value);
builder.finish().into()
}
})
}

/// Evaluate the expression in vectorized execution. Returns a value that can be either an
/// array, or a scalar if all values in the array are the same.
///
/// * `input` - input data of the Project Executor
async fn eval(&self, input: &DataChunk) -> Result<ArrayRef>;
/// The default implementation calls `eval` and puts the result into the `Array` variant.
async fn eval_v2(&self, input: &DataChunk) -> Result<ValueImpl> {
self.eval(input).map_ok(ValueImpl::Array).await
}

/// Evaluate the expression in row-based execution.
/// Evaluate the expression in row-based execution. Returns a nullable scalar.
async fn eval_row(&self, input: &OwnedRow) -> Result<Datum>;

/// Evaluate if the expression is constant.
Expand Down
81 changes: 52 additions & 29 deletions src/expr/src/expr/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,72 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use itertools::multizip;
use itertools::{multizip, Itertools};
use paste::paste;
use risingwave_common::array::{Array, ArrayBuilder, ArrayImpl, ArrayRef, DataChunk, Utf8Array};
use risingwave_common::array::{Array, ArrayBuilder, DataChunk, Utf8Array};
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{option_as_scalar_ref, DataType, Datum, Scalar};
use risingwave_common::util::iter_util::ZipEqDebug;

use crate::expr::{BoxedExpression, Expression};
use crate::expr::{BoxedExpression, Expression, ValueImpl, ValueRef};

macro_rules! gen_eval {
{ ($macro:ident, $macro_row:ident), $ty_name:ident, $OA:ty, $($arg:ident,)* } => {
fn eval<'a, 'b, 'async_trait>(&'a self, data_chunk: &'b DataChunk)
-> Pin<Box<dyn Future<Output = $crate::Result<ArrayRef>> + Send + 'async_trait>>
fn eval_v2<'a, 'b, 'async_trait>(&'a self, data_chunk: &'b DataChunk)
-> Pin<Box<dyn Future<Output = $crate::Result<ValueImpl>> + Send + 'async_trait>>
where
'a: 'async_trait,
'b: 'async_trait,
{
Box::pin(async move { paste! {
$(
let [<ret_ $arg:lower>] = self.[<expr_ $arg:lower>].eval_checked(data_chunk).await?;
let [<arr_ $arg:lower>]: &$arg = [<ret_ $arg:lower>].as_ref().into();
let [<ret_ $arg:lower>] = self.[<expr_ $arg:lower>].eval_v2(data_chunk).await?;
let [<val_ $arg:lower>]: ValueRef<'_, $arg> = (&[<ret_ $arg:lower>]).into();
)*

let bitmap = data_chunk.visibility();
let mut output_array = <$OA as Array>::Builder::with_meta(data_chunk.capacity(), (&self.return_type).into());
Ok(Arc::new(match bitmap {
Some(bitmap) => {
for (($([<v_ $arg:lower>], )*), visible) in multizip(($([<arr_ $arg:lower>].iter(), )*)).zip_eq_debug(bitmap.iter()) {
if !visible {
output_array.append_null();
continue;
}
$macro!(self, output_array, $([<v_ $arg:lower>],)*)
Ok(match ($([<val_ $arg:lower>], )*) {
// If all arguments are scalar, we can directly compute the result.
($(ValueRef::Scalar { value: [<scalar_ref_ $arg:lower>], capacity: [<cap_ $arg:lower>] }, )*) => {
let output_scalar = $macro_row!(self, $([<scalar_ref_ $arg:lower>],)*);
let output_datum = output_scalar.map(|s| s.to_scalar_value());
let capacity = data_chunk.capacity();

if cfg!(debug_assertions) {
let all_capacities = [capacity, $([<cap_ $arg:lower>], )*];
assert!(all_capacities.into_iter().all_equal(), "capacities mismatched: {:?}", all_capacities);
}
output_array.finish().into()

ValueImpl::Scalar { value: output_datum, capacity }
}
None => {
for ($([<v_ $arg:lower>], )*) in multizip(($([<arr_ $arg:lower>].iter(), )*)) {
$macro!(self, output_array, $([<v_ $arg:lower>],)*)
}
output_array.finish().into()

// Otherwise, fallback to array computation.
($([<val_ $arg:lower>], )*) => {
let bitmap = data_chunk.visibility();
let mut output_array = <$OA as Array>::Builder::with_meta(data_chunk.capacity(), (&self.return_type).into());
let array = match bitmap {
Some(bitmap) => {
// TODO: use `izip` here.
for (($([<v_ $arg:lower>], )*), visible) in multizip(($([<val_ $arg:lower>].iter(), )*)).zip_eq_debug(bitmap.iter()) {
if !visible {
output_array.append_null();
continue;
}
$macro!(self, output_array, $([<v_ $arg:lower>],)*)
}
output_array.finish().into()
}
None => {
// TODO: use `izip` here.
for ($([<v_ $arg:lower>], )*) in multizip(($([<val_ $arg:lower>].iter(), )*)) {
$macro!(self, output_array, $([<v_ $arg:lower>],)*)
}
output_array.finish().into()
}
};

ValueImpl::Array(Arc::new(array))
}
}))
})
}})
}

Expand Down Expand Up @@ -141,8 +164,8 @@ macro_rules! gen_expr_normal {
F: Fn($($arg::RefItem<'_>, )*) -> $crate::Result<OA::OwnedItem> + Sync + Send,
> Expression for $ty_name<$($arg, )* OA, F>
where
$(for<'a> &'a $arg: std::convert::From<&'a ArrayImpl>,)*
for<'a> &'a OA: std::convert::From<&'a ArrayImpl>,
$(for<'a> ValueRef<'a, $arg>: std::convert::From<&'a ValueImpl>,)*
for<'a> ValueRef<'a, OA>: std::convert::From<&'a ValueImpl>,
{
fn return_type(&self) -> DataType {
self.return_type.clone()
Expand Down Expand Up @@ -227,7 +250,7 @@ macro_rules! gen_expr_bytes {
F: Fn($($arg::RefItem<'_>, )* &mut dyn std::fmt::Write) -> $crate::Result<()> + Sync + Send,
> Expression for $ty_name<$($arg, )* F>
where
$(for<'a> &'a $arg: std::convert::From<&'a ArrayImpl>,)*
$(for<'a> ValueRef<'a, $arg>: std::convert::From<&'a ValueImpl>,)*
{
fn return_type(&self) -> DataType {
self.return_type.clone()
Expand Down Expand Up @@ -303,8 +326,8 @@ macro_rules! gen_expr_nullable {
F: Fn($(Option<$arg::RefItem<'_>>, )*) -> $crate::Result<Option<OA::OwnedItem>> + Sync + Send,
> Expression for $ty_name<$($arg, )* OA, F>
where
$(for<'a> &'a $arg: std::convert::From<&'a ArrayImpl>,)*
for<'a> &'a OA: std::convert::From<&'a ArrayImpl>,
$(for<'a> ValueRef<'a, $arg>: std::convert::From<&'a ValueImpl>,)*
for<'a> ValueRef<'a, OA>: std::convert::From<&'a ValueImpl>,
{
fn return_type(&self) -> DataType {
self.return_type.clone()
Expand Down
75 changes: 75 additions & 0 deletions src/expr/src/expr/value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use either::Either;
use risingwave_common::array::*;
use risingwave_common::for_all_variants;
use risingwave_common::types::{Datum, Scalar};

/// The type-erased return value of an expression.
///
/// It can be either an array, or a scalar if all values in the array are the same.
#[derive(Debug, Clone)]
pub enum ValueImpl {
Array(ArrayRef),
Scalar { value: Datum, capacity: usize },
}

/// The generic reference type of [`ValueImpl`]. Used as the arguments of expressions.
#[derive(Debug, Clone, Copy)]
pub enum ValueRef<'a, A: Array> {
Array(&'a A),
Scalar {
value: Option<<A as Array>::RefItem<'a>>,
capacity: usize,
},
}

impl<'a, A: Array> ValueRef<'a, A> {
/// Iterates over all scalars in this value.
pub fn iter(self) -> impl Iterator<Item = Option<A::RefItem<'a>>> + 'a {
match self {
Self::Array(array) => Either::Left(array.iter()),
Self::Scalar { value, capacity } => {
Either::Right(std::iter::repeat(value).take(capacity))
}
}
}
}

macro_rules! impl_convert {
($( { $variant_name:ident, $suffix_name:ident, $array:ty, $builder:ty } ),*) => {
$(
paste::paste! {
/// Converts a type-erased value to a reference of a specific array type.
impl<'a> From<&'a ValueImpl> for ValueRef<'a, $array> {
fn from(value: &'a ValueImpl) -> Self {
match value {
ValueImpl::Array(array) => {
let array = array.[<as_ $suffix_name>]();
ValueRef::Array(array)
},
ValueImpl::Scalar { value, capacity } => {
let value = value.as_ref().map(|v| v.[<as_ $suffix_name>]().as_scalar_ref());
ValueRef::Scalar { value, capacity: *capacity }
},
}
}
}
}
)*
};
}

for_all_variants! { impl_convert }

0 comments on commit 5b5b2e6

Please sign in to comment.