Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(expr): streaming generate_series ends with now() #17371

Merged
merged 14 commits into from
Jun 24, 2024
27 changes: 27 additions & 0 deletions e2e_test/streaming/now.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
system ok
psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev -c "
create materialized view mv as
select * from generate_series(
to_timestamp($(date +%s) - 10),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is clever but it would be better if we can support specifying NOW() - 10 second here. 😄

Copy link
Contributor

@st1page st1page Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not exactly same semantic.
generate_series(now() - 10 sec, now(), step) is equal to the statment with a temporal filter

create materialized view mv as
select * from (generate_series(${now_when_create_statment} - 10 sec, now(), step))
where generate_series > now() - 10 sec

So we do not have a current_now() or now_at_the_statment() function 😆

Copy link
Contributor

@xiangjinwu xiangjinwu Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we do not have a current_now() or now_at_the_statment() function 😆

PostgreSQL has 4 different "now"s: #2870

  • For start, if we really want time of create, it shall be timestamptz 'now'.
  • For stop, if we really want endless stream, it shall be timestamptz 'infinity'.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • For stop, if we really want endless stream, it shall be timestamptz 'infinity'.

But seems infinity cannot clearly convey the idea that the timestamp is emitted along with the current epoch🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we should've introduced something like streaming_now() for the use of dynamically updated now in streaming mode. So that now() won't be so confusing when it appears with 2 semantics in one statement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we use now() to express the semantics of a temporal filter, now() represents a constantly changing timestamp within a streaming query. Therefore, it becomes difficult to express a semantics of the current value of now() that does not change later.

now(),
interval '1 second'
);
"

statement ok
flush;

query I
select count(*) > 10 from mv;
----
t

sleep 2s

query I
select count(*) > 12 from mv;
----
t

statement ok
drop materialized view mv;
12 changes: 12 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -725,9 +725,21 @@ message RowIdGenNode {
uint64 row_id_index = 1;
}

message NowModeUpdateCurrent {}

message NowModeGenerateSeries {
data.Datum start_timestamp = 1;
data.Datum interval = 2;
}

message NowNode {
// Persists emitted 'now'.
catalog.Table state_table = 1;

oneof mode {
NowModeUpdateCurrent update_current = 101;
NowModeGenerateSeries generate_series = 102;
}
}

message ValuesNode {
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/array/stream_chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ impl StreamChunkBuilder {
}
}

/// Get the current number of rows in the builder.
pub fn size(&self) -> usize {
self.size
}

/// Append an iterator of output index and datum to the builder, return a chunk if the builder
/// is full.
///
Expand Down
7 changes: 4 additions & 3 deletions src/common/src/util/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,11 @@ impl EpochPair {
Self::new(curr, curr - EPOCH_INC_MIN_STEP_FOR_TEST)
}
}
/// As most unit tests initializ a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0.

/// As most unit tests initialize a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0.
/// This method is to turn a a random epoch into a well shifted value.
pub const fn test_epoch(value: u64) -> u64 {
value << EPOCH_AVAILABLE_BITS
pub const fn test_epoch(value_millis: u64) -> u64 {
value_millis << EPOCH_AVAILABLE_BITS
}

/// There are numerous operations in our system's unit tests that involve incrementing or decrementing the epoch.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
- sql: |
select * from generate_series(
'2024-06-21 17:36:00'::timestamptz,
now(),
interval '1 hour'
);
expected_outputs:
- logical_plan
- optimized_logical_plan_for_stream
- stream_plan
- sql: |
select * from generate_series(
'2024-06-21 17:36:00'::timestamp, -- `timestamp` type is not supported
now(),
interval '1 hour'
);
expected_outputs:
- binder_error
- sql: |
select * from generate_series(
now() - interval '1 hour',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Shouldn't this be able to fold into constant?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the now() can be even more confusing🤣

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need something like session_now().

now(),
interval '1 hour'
);
expected_outputs:
- stream_error
- sql: |
select * from unnest(array[now(), now()]);
expected_outputs:
- stream_error
6 changes: 3 additions & 3 deletions src/frontend/planner_test/tests/testdata/output/expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@
Failed to bind expression: v1 >= now()
Caused by:
Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: Some(GroupBy). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information
Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: Some(GroupBy). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information
- name: forbid now in select for stream
sql: |
create table t (v1 timestamp with time zone, v2 timestamp with time zone);
Expand All @@ -552,7 +552,7 @@
Failed to bind expression: now()
Caused by:
Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: None. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information
Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: None. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information
- name: forbid now in agg filter for stream
sql: |
create table t (v1 timestamp with time zone, v2 int);
Expand All @@ -561,7 +561,7 @@
Failed to bind expression: sum(v2) FILTER(WHERE v1 >= now())
Caused by:
Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: Some(Filter). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information
Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: Some(Filter). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information
- name: typo pg_teminate_backend
sql: |
select pg_teminate_backend(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- sql: |
select * from generate_series(
'2024-06-21 17:36:00'::timestamptz,
now(),
interval '1 hour'
);
logical_plan: |-
LogicalProject { exprs: [generate_series] }
└─LogicalTableFunction { table_function: GenerateSeries('2024-06-21 17:36:00':Varchar::Timestamptz, Now, '01:00:00':Interval) }
optimized_logical_plan_for_stream: 'LogicalNow { output: [ts] }'
stream_plan: |-
StreamMaterialize { columns: [generate_series], stream_key: [generate_series], pk_columns: [generate_series], pk_conflict: NoCheck, watermark_columns: [generate_series] }
└─StreamNow { output: [ts] }
- sql: |
select * from generate_series(
'2024-06-21 17:36:00'::timestamp, -- `timestamp` type is not supported
now(),
interval '1 hour'
);
binder_error: function generate_series(timestamp without time zone, timestamp with time zone, interval) does not exist
- sql: |
select * from generate_series(
now() - interval '1 hour',
now(),
interval '1 hour'
);
stream_error: |-
Not supported: General `now()` function
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message does not seem to be good enough. 😕

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestions?

HINT: If you are trying to use `generate_series` with `now()`, please kindly check whether you are using it in a correct way.
- sql: |
select * from unnest(array[now(), now()]);
stream_error: |-
Not supported: General `now()` function
HINT: If you are trying to use `generate_series` with `now()`, please kindly check whether you are using it in a correct way.
8 changes: 6 additions & 2 deletions src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1572,11 +1572,15 @@ impl Binder {
if self.is_for_stream()
&& !matches!(
self.context.clause,
Some(Clause::Where) | Some(Clause::Having) | Some(Clause::JoinOn)
Some(Clause::Where)
| Some(Clause::Having)
| Some(Clause::JoinOn)
| Some(Clause::From)
)
{
return Err(ErrorCode::InvalidInputSyntax(format!(
"For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: {:?}. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information",
"For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: {:?}. \
Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information",
self.context.clause
))
.into());
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ macro_rules! impl_has_variant {
};
}

impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction}
impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction, Now}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct InequalityInputPair {
Expand Down
54 changes: 47 additions & 7 deletions src/frontend/src/expr/type_inference/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_common::types::{DataType, StructType};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::aggregate::AggKind;
pub use risingwave_expr::sig::*;
use risingwave_pb::expr::table_function::PbType as PbTableFuncType;

use super::{align_types, cast_ok_base, CastContext};
use crate::error::{ErrorCode, Result};
Expand All @@ -36,13 +37,24 @@ pub fn infer_type_with_sigmap(
sig_map: &FunctionRegistry,
) -> Result<DataType> {
// special cases
if let FuncName::Scalar(func_type) = func_name
&& let Some(res) = infer_type_for_special(func_type, inputs).transpose()
{
return res;
}
if let FuncName::Aggregate(AggKind::Grouping) = func_name {
return Ok(DataType::Int32);
match &func_name {
FuncName::Scalar(func_type) => {
if let Some(res) = infer_type_for_special(*func_type, inputs).transpose() {
return res;
}
}
FuncName::Table(func_type) => {
if let Some(res) = infer_type_for_special_table_function(*func_type, inputs).transpose()
{
return res;
}
}
FuncName::Aggregate(agg_kind) => {
if *agg_kind == AggKind::Grouping {
return Ok(DataType::Int32);
}
}
_ => {}
}

let actuals = inputs
Expand Down Expand Up @@ -634,6 +646,34 @@ fn infer_type_for_special(
}
}

fn infer_type_for_special_table_function(
func_type: PbTableFuncType,
inputs: &mut [ExprImpl],
) -> Result<Option<DataType>> {
match func_type {
PbTableFuncType::GenerateSeries => {
if inputs.len() < 3 {
// let signature map handle this
return Ok(None);
}
match (
inputs[0].return_type(),
inputs[1].return_type(),
inputs[2].return_type(),
) {
(DataType::Timestamptz, DataType::Timestamptz, DataType::Interval) => {
// This is to allow `generate_series('2024-06-20 00:00:00'::timestamptz, now(), interval '1 day')`,
// which in streaming mode will be further converted to `StreamNow`.
Ok(Some(DataType::Timestamptz))
}
// let signature map handle the rest
_ => Ok(None),
}
}
_ => Ok(None),
}
}

/// From all available functions in `sig_map`, find and return the best matching `FuncSign` for the
/// provided `func_name` and `inputs`. This not only support exact function signature match, but can
/// also match `substr(varchar, smallint)` or even `substr(varchar, unknown)` to `substr(varchar,
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
)
});

static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Convert GENERATE_SERIES Ends With NOW",
vec![GenerateSeriesWithNowRule::create()],
ApplyOrder::TopDown,
)
});

static TABLE_FUNCTION_TO_PROJECT_SET: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Table Function To Project Set",
Expand Down Expand Up @@ -572,6 +580,9 @@ impl LogicalOptimizer {
}
plan = plan.optimize_by_rules(&SET_OPERATION_MERGE);
plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN);
// Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode.
// Should be applied before converting table function to project set.
plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW);
// In order to unnest a table function, we need to convert it into a `project_set` first.
plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_PROJECT_SET);

Expand Down
5 changes: 0 additions & 5 deletions src/frontend/src/optimizer/plan_node/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@ pub fn stream_enforce_eowc_requirement(
}
Ok(StreamEowcSort::new(plan, watermark_col_idx).into())
}
} else if !emit_on_window_close && plan.emit_on_window_close() {
Err(ErrorCode::InternalError(
"Some bad thing happened, the generated plan is not correct.".to_string(),
)
.into())
} else {
Ok(plan)
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ mod cte_ref;
pub use cte_ref::*;
mod recursive_union;
pub use recursive_union::*;
mod now;
pub use now::*;

pub trait DistillUnit {
fn distill_with_name<'a>(&self, name: impl Into<Cow<'a, str>>) -> XmlNode<'a>;
Expand Down
Loading
Loading