Skip to content

Commit

Permalink
feat: introduce PROCTIME() (#9088)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su authored Apr 12, 2023
1 parent 1cee797 commit fd5334d
Show file tree
Hide file tree
Showing 23 changed files with 267 additions and 44 deletions.
1 change: 1 addition & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ message ExprNode {
VNODE = 1101;
// Non-deterministic functions
NOW = 2022;
PROCTIME = 2023;
// User defined functions
UDF = 3000;
}
Expand Down
3 changes: 3 additions & 0 deletions src/expr/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ pub enum ExprError {

#[error("not a constant")]
NotConstant,

#[error("Context not found")]
Context,
}

impl From<ExprError> for RwError {
Expand Down
2 changes: 2 additions & 0 deletions src/expr/src/expr/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use super::expr_regexp::RegexpMatchExpression;
use super::expr_some_all::SomeAllExpression;
use super::expr_udf::UdfExpression;
use super::expr_vnode::VnodeExpression;
use crate::expr::expr_proctime::ProcTimeExpression;
use crate::expr::{BoxedExpression, Expression, InputRefExpression, LiteralExpression};
use crate::sig::func::FUNC_SIG_MAP;
use crate::{bail, ExprError, Result};
Expand Down Expand Up @@ -81,6 +82,7 @@ pub fn build_from_prost(prost: &ExprNode) -> Result<BoxedExpression> {
}
E::Vnode => VnodeExpression::try_from(prost).map(Expression::boxed),
E::Udf => UdfExpression::try_from(prost).map(Expression::boxed),
E::Proctime => ProcTimeExpression::try_from(prost).map(Expression::boxed),
_ => Err(ExprError::UnsupportedFunction(format!(
"{:?}",
prost.get_expr_type()
Expand Down
103 changes: 103 additions & 0 deletions src/expr/src/expr/expr_proctime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::array::DataChunk;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Datum, ScalarImpl};
use risingwave_pb::expr::expr_node::{RexNode, Type};
use risingwave_pb::expr::ExprNode;

use super::{Expression, ValueImpl, CONTEXT};
use crate::{bail, ensure, ExprError, Result};

#[derive(Debug)]
pub struct ProcTimeExpression;

impl ProcTimeExpression {
pub fn new() -> Self {
ProcTimeExpression
}
}

impl<'a> TryFrom<&'a ExprNode> for ProcTimeExpression {
type Error = ExprError;

fn try_from(prost: &'a ExprNode) -> Result<Self> {
ensure!(prost.get_expr_type().unwrap() == Type::Proctime);
ensure!(DataType::from(prost.get_return_type().unwrap()) == DataType::Timestamptz);
let RexNode::FuncCall(func_call_node) = prost.get_rex_node().unwrap() else {
bail!("Expected RexNode::FuncCall");
};
ensure!(func_call_node.get_children().is_empty());

Ok(ProcTimeExpression::new())
}
}

#[async_trait::async_trait]
impl Expression for ProcTimeExpression {
fn return_type(&self) -> DataType {
DataType::Timestamptz
}

async fn eval_v2(&self, input: &DataChunk) -> Result<ValueImpl> {
let proctime = CONTEXT
.try_with(|context| context.get_physical_time())
.map_err(|_| ExprError::Context)?;
let datum = Some(ScalarImpl::Int64(proctime as i64));

Ok(ValueImpl::Scalar {
value: datum,
capacity: input.capacity(),
})
}

async fn eval_row(&self, _input: &OwnedRow) -> Result<Datum> {
let proctime = CONTEXT
.try_with(|context| context.get_physical_time())
.map_err(|_| ExprError::Context)?;
let datum = Some(ScalarImpl::Int64(proctime as i64));

Ok(datum)
}
}

#[cfg(test)]
mod tests {
use risingwave_common::array::DataChunk;
use risingwave_common::types::ScalarRefImpl;
use risingwave_common::util::epoch::Epoch;

use super::*;
use crate::expr::{ExprContext, CONTEXT};

#[tokio::test]
async fn test_expr_proctime() {
let proctime_expr = ProcTimeExpression::new();
let epoch = Epoch::now();
let time = epoch.physical_time();
let time_datum = Some(ScalarRefImpl::Int64(time as i64));
let context = ExprContext::new(epoch);
let chunk = DataChunk::new_dummy(3);

let array = CONTEXT
.scope(context, proctime_expr.eval(&chunk))
.await
.unwrap();

for datum_ref in array.iter() {
assert_eq!(datum_ref, time_datum)
}
}
}
23 changes: 23 additions & 0 deletions src/expr/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod expr_jsonb_access;
mod expr_literal;
mod expr_nested_construct;
mod expr_now;
mod expr_proctime;
pub mod expr_regexp;
mod expr_some_all;
mod expr_to_char_const_tmpl;
Expand All @@ -72,6 +73,7 @@ use futures_util::TryFutureExt;
use risingwave_common::array::{ArrayRef, DataChunk};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::epoch::Epoch;
use static_assertions::const_assert;

pub use self::agg::AggKind;
Expand Down Expand Up @@ -190,3 +192,24 @@ pub type ExpressionRef = Arc<dyn Expression>;
/// See also <https://github.com/risingwavelabs/risingwave/issues/4625>.
#[allow(dead_code)]
const STRICT_MODE: bool = false;

/// The context used by expressions.
#[derive(Clone)]
pub struct ExprContext {
/// The epoch that an executor currently in.
curr_epoch: Epoch,
}

impl ExprContext {
pub fn new(curr_epoch: Epoch) -> Self {
Self { curr_epoch }
}

pub fn get_physical_time(&self) -> u64 {
self.curr_epoch.physical_time()
}
}

tokio::task_local! {
pub static CONTEXT: ExprContext;
}
2 changes: 1 addition & 1 deletion src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ impl TestCase {
let mut ret = TestCaseResult::default();

let bound = {
let mut binder = Binder::new(&session, vec![]);
let mut binder = Binder::new(&session);
match binder.bind(stmt.clone()) {
Ok(bound) => bound,
Err(err) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
└─StreamSource
- name: source with generated columns
sql: |
create source s1 (v1 int as v2-1, v2 int, v3 int as v2+1) with (connector = 'kinesis') ROW FORMAT JSON;;
create source s1 (v1 int as v2-1, v2 int, v3 int as v2+1) with (connector = 'kinesis') ROW FORMAT JSON;
select v3 from s1
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [(v2 + 1:Int32) as $expr1] }
└─BatchSource { source: "s1", columns: ["v2", "_row_id"], filter: (None, None) }
- name: select proctime()
sql: |
select proctime();
binder_error: 'Invalid input syntax: Function `PROCTIME()` is only allowed in CREATE TABLE/SOURCE. Is `NOW()` what you want?'
4 changes: 1 addition & 3 deletions src/frontend/planner_test/tests/testdata/nexmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -837,9 +837,7 @@
TUMBLE_END(B.p_time, INTERVAL '10' SECOND) as endtime
FROM (SELECT *, PROCTIME() as p_time FROM bid) B
GROUP BY B.bidder, TUMBLE(B.p_time, INTERVAL '10' SECOND);
binder_error: |-
Feature is not yet implemented: unsupported function: "proctime"
Tracking issue: https://github.com/risingwavelabs/risingwave/issues/112
binder_error: 'Invalid input syntax: Function `PROCTIME()` is only allowed in CREATE TABLE/SOURCE. Is `NOW()` what you want?'
- id: nexmark_q13
before:
- create_tables
Expand Down
4 changes: 1 addition & 3 deletions src/frontend/planner_test/tests/testdata/nexmark_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -852,9 +852,7 @@
TUMBLE_END(B.p_time, INTERVAL '10' SECOND) as endtime
FROM (SELECT *, PROCTIME() as p_time FROM bid) B
GROUP BY B.bidder, TUMBLE(B.p_time, INTERVAL '10' SECOND);
binder_error: |-
Feature is not yet implemented: unsupported function: "proctime"
Tracking issue: https://github.com/risingwavelabs/risingwave/issues/112
binder_error: 'Invalid input syntax: Function `PROCTIME()` is only allowed in CREATE TABLE/SOURCE. Is `NOW()` what you want?'
- id: nexmark_q13
before:
- create_sources
Expand Down
27 changes: 24 additions & 3 deletions src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,12 @@ impl Binder {
fn raw_literal(literal: ExprImpl) -> Handle {
Box::new(move |_binder, _inputs| Ok(literal.clone()))
}

fn now() -> Handle {
Box::new(move |binder, mut inputs| {
binder.ensure_now_function_allowed()?;
if !binder.in_streaming {
// `now()` in batch query will be convert to the binder time.
if binder.is_for_batch() {
inputs.push(ExprImpl::from(Literal::new(
Some(ScalarImpl::Int64((binder.bind_timestamp_ms * 1000) as i64)),
DataType::Timestamptz,
Expand All @@ -312,10 +314,18 @@ impl Binder {
raw_call(ExprType::Now)(binder, inputs)
})
}

fn pi() -> Handle {
raw_literal(ExprImpl::literal_f64(std::f64::consts::PI))
}

fn proctime() -> Handle {
Box::new(move |binder, inputs| {
binder.ensure_proctime_function_allowed()?;
raw_call(ExprType::Proctime)(binder, inputs)
})
}

static HANDLES: LazyLock<HashMap<&'static str, Handle>> = LazyLock::new(|| {
[
(
Expand Down Expand Up @@ -559,7 +569,8 @@ impl Binder {
)))),
// non-deterministic
("now", now()),
("current_timestamp", now())
("current_timestamp", now()),
("proctime", proctime())
]
.into_iter()
.collect()
Expand Down Expand Up @@ -665,7 +676,7 @@ impl Binder {
}

fn ensure_now_function_allowed(&self) -> Result<()> {
if self.in_streaming
if self.is_for_stream()
&& !matches!(
self.context.clause,
Some(Clause::Where) | Some(Clause::Having)
Expand All @@ -680,6 +691,16 @@ impl Binder {
Ok(())
}

fn ensure_proctime_function_allowed(&self) -> Result<()> {
if !self.is_for_ddl() {
return Err(ErrorCode::InvalidInputSyntax(
"Function `PROCTIME()` is only allowed in CREATE TABLE/SOURCE. Is `NOW()` what you want?".to_string(),
)
.into());
}
Ok(())
}

fn ensure_aggregate_allowed(&self) -> Result<()> {
if let Some(clause) = self.context.clause {
match clause {
Expand Down
Loading

0 comments on commit fd5334d

Please sign in to comment.