Skip to content

Commit

Permalink
feat(expr): streaming generate_series ends with now() (#17371)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Jun 24, 2024
1 parent cdbd982 commit da9bd03
Show file tree
Hide file tree
Showing 27 changed files with 838 additions and 139 deletions.
8 changes: 4 additions & 4 deletions ci/scripts/e2e-test-parallel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@ RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=i
echo "--- e2e, ci-3streaming-2serving-3fe, streaming"
RUST_LOG=$RUST_LOG \
risedev ci-start ci-3streaming-2serving-3fe
sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-streaming-${profile}"
sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-streaming-${profile}" --label "parallel"

kill_cluster

echo "--- e2e, ci-3streaming-2serving-3fe, batch"
RUST_LOG=$RUST_LOG \
risedev ci-start ci-3streaming-2serving-3fe
sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-batch-ddl-${profile}"
sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}"
sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-batch-ddl-${profile}" --label "parallel"
sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}" --label "parallel"

kill_cluster

echo "--- e2e, ci-3streaming-2serving-3fe, generated"
RUST_LOG=$RUST_LOG \
risedev ci-start ci-3streaming-2serving-3fe
sqllogictest "${host_args[@]}" -d dev './e2e_test/generated/**/*.slt' -j 16 --junit "parallel-generated-${profile}"
sqllogictest "${host_args[@]}" -d dev './e2e_test/generated/**/*.slt' -j 16 --junit "parallel-generated-${profile}" --label "parallel"

kill_cluster
48 changes: 48 additions & 0 deletions e2e_test/streaming/now.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# In madsim test, we cannot spawn process.
skipif madsim
# In parallel test, we cannot get the DB name.
skipif parallel
# TODO: Later if we introduce a new `now()`-like function that returns the time of statement execution,
# we'll be able to directly create MV without `./risedev psql` and so that we can remove these `skipif`.
system ok
./risedev psql -c "
create materialized view mv as
select * from generate_series(
to_timestamp($(date +%s)) - interval '10 second',
now(),
interval '1 second'
);
"

skipif madsim
skipif parallel
statement ok
flush;

skipif madsim
skipif parallel
query I
select count(*) >= 10 from mv;
----
t

skipif madsim
skipif parallel
sleep 2s

skipif madsim
skipif parallel
statement ok
flush;

skipif madsim
skipif parallel
query I
select count(*) >= 12 from mv;
----
t

skipif madsim
skipif parallel
statement ok
drop materialized view mv;
12 changes: 12 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -725,9 +725,21 @@ message RowIdGenNode {
uint64 row_id_index = 1;
}

message NowModeUpdateCurrent {}

message NowModeGenerateSeries {
data.Datum start_timestamp = 1;
data.Datum interval = 2;
}

message NowNode {
// Persists emitted 'now'.
catalog.Table state_table = 1;

oneof mode {
NowModeUpdateCurrent update_current = 101;
NowModeGenerateSeries generate_series = 102;
}
}

message ValuesNode {
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/array/stream_chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ impl StreamChunkBuilder {
}
}

/// Get the current number of rows in the builder.
pub fn size(&self) -> usize {
self.size
}

/// Append an iterator of output index and datum to the builder, return a chunk if the builder
/// is full.
///
Expand Down
7 changes: 4 additions & 3 deletions src/common/src/util/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,11 @@ impl EpochPair {
Self::new(curr, curr - EPOCH_INC_MIN_STEP_FOR_TEST)
}
}
/// As most unit tests initializ a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0.

/// As most unit tests initialize a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0.
/// This method is to turn a a random epoch into a well shifted value.
pub const fn test_epoch(value: u64) -> u64 {
value << EPOCH_AVAILABLE_BITS
pub const fn test_epoch(value_millis: u64) -> u64 {
value_millis << EPOCH_AVAILABLE_BITS
}

/// There are numerous operations in our system's unit tests that involve incrementing or decrementing the epoch.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
- sql: |
select * from generate_series(
'2024-06-21 17:36:00'::timestamptz,
now(),
interval '1 hour'
);
expected_outputs:
- logical_plan
- optimized_logical_plan_for_stream
- stream_plan
- sql: |
select * from generate_series(
'2024-06-21 17:36:00'::timestamp, -- `timestamp` type is not supported
now(),
interval '1 hour'
);
expected_outputs:
- binder_error
- sql: |
select * from generate_series(
now() - interval '1 hour',
now(),
interval '1 hour'
);
expected_outputs:
- stream_error
- sql: |
select * from unnest(array[now(), now()]);
expected_outputs:
- stream_error
6 changes: 3 additions & 3 deletions src/frontend/planner_test/tests/testdata/output/expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@
Failed to bind expression: v1 >= now()
Caused by:
Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: Some(GroupBy). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information
Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: Some(GroupBy). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information
- name: forbid now in select for stream
sql: |
create table t (v1 timestamp with time zone, v2 timestamp with time zone);
Expand All @@ -552,7 +552,7 @@
Failed to bind expression: now()
Caused by:
Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: None. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information
Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: None. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information
- name: forbid now in agg filter for stream
sql: |
create table t (v1 timestamp with time zone, v2 int);
Expand All @@ -561,7 +561,7 @@
Failed to bind expression: sum(v2) FILTER(WHERE v1 >= now())
Caused by:
Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: Some(Filter). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information
Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: Some(Filter). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information
- name: typo pg_teminate_backend
sql: |
select pg_teminate_backend(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- sql: |
select * from generate_series(
'2024-06-21 17:36:00'::timestamptz,
now(),
interval '1 hour'
);
logical_plan: |-
LogicalProject { exprs: [generate_series] }
└─LogicalTableFunction { table_function: GenerateSeries('2024-06-21 17:36:00':Varchar::Timestamptz, Now, '01:00:00':Interval) }
optimized_logical_plan_for_stream: 'LogicalNow { output: [ts] }'
stream_plan: |-
StreamMaterialize { columns: [generate_series], stream_key: [generate_series], pk_columns: [generate_series], pk_conflict: NoCheck, watermark_columns: [generate_series] }
└─StreamNow { output: [ts] }
- sql: |
select * from generate_series(
'2024-06-21 17:36:00'::timestamp, -- `timestamp` type is not supported
now(),
interval '1 hour'
);
binder_error: function generate_series(timestamp without time zone, timestamp with time zone, interval) does not exist
- sql: |
select * from generate_series(
now() - interval '1 hour',
now(),
interval '1 hour'
);
stream_error: |-
Not supported: General `now()` function in streaming queries
HINT: Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns.
- sql: |
select * from unnest(array[now(), now()]);
stream_error: |-
Not supported: General `now()` function in streaming queries
HINT: Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns.
8 changes: 6 additions & 2 deletions src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1572,11 +1572,15 @@ impl Binder {
if self.is_for_stream()
&& !matches!(
self.context.clause,
Some(Clause::Where) | Some(Clause::Having) | Some(Clause::JoinOn)
Some(Clause::Where)
| Some(Clause::Having)
| Some(Clause::JoinOn)
| Some(Clause::From)
)
{
return Err(ErrorCode::InvalidInputSyntax(format!(
"For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: {:?}. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information",
"For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: {:?}. \
Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information",
self.context.clause
))
.into());
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ macro_rules! impl_has_variant {
};
}

impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction}
impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction, Now}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct InequalityInputPair {
Expand Down
54 changes: 47 additions & 7 deletions src/frontend/src/expr/type_inference/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_common::types::{DataType, StructType};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::aggregate::AggKind;
pub use risingwave_expr::sig::*;
use risingwave_pb::expr::table_function::PbType as PbTableFuncType;

use super::{align_types, cast_ok_base, CastContext};
use crate::error::{ErrorCode, Result};
Expand All @@ -36,13 +37,24 @@ pub fn infer_type_with_sigmap(
sig_map: &FunctionRegistry,
) -> Result<DataType> {
// special cases
if let FuncName::Scalar(func_type) = func_name
&& let Some(res) = infer_type_for_special(func_type, inputs).transpose()
{
return res;
}
if let FuncName::Aggregate(AggKind::Grouping) = func_name {
return Ok(DataType::Int32);
match &func_name {
FuncName::Scalar(func_type) => {
if let Some(res) = infer_type_for_special(*func_type, inputs).transpose() {
return res;
}
}
FuncName::Table(func_type) => {
if let Some(res) = infer_type_for_special_table_function(*func_type, inputs).transpose()
{
return res;
}
}
FuncName::Aggregate(agg_kind) => {
if *agg_kind == AggKind::Grouping {
return Ok(DataType::Int32);
}
}
_ => {}
}

let actuals = inputs
Expand Down Expand Up @@ -634,6 +646,34 @@ fn infer_type_for_special(
}
}

fn infer_type_for_special_table_function(
func_type: PbTableFuncType,
inputs: &mut [ExprImpl],
) -> Result<Option<DataType>> {
match func_type {
PbTableFuncType::GenerateSeries => {
if inputs.len() < 3 {
// let signature map handle this
return Ok(None);
}
match (
inputs[0].return_type(),
inputs[1].return_type(),
inputs[2].return_type(),
) {
(DataType::Timestamptz, DataType::Timestamptz, DataType::Interval) => {
// This is to allow `generate_series('2024-06-20 00:00:00'::timestamptz, now(), interval '1 day')`,
// which in streaming mode will be further converted to `StreamNow`.
Ok(Some(DataType::Timestamptz))
}
// let signature map handle the rest
_ => Ok(None),
}
}
_ => Ok(None),
}
}

/// From all available functions in `sig_map`, find and return the best matching `FuncSign` for the
/// provided `func_name` and `inputs`. This not only support exact function signature match, but can
/// also match `substr(varchar, smallint)` or even `substr(varchar, unknown)` to `substr(varchar,
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ static DAG_TO_TREE: LazyLock<OptimizationStage> = LazyLock::new(|| {
)
});

static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Convert GENERATE_SERIES Ends With NOW",
vec![GenerateSeriesWithNowRule::create()],
ApplyOrder::TopDown,
)
});

static TABLE_FUNCTION_TO_PROJECT_SET: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Table Function To Project Set",
Expand Down Expand Up @@ -572,6 +580,9 @@ impl LogicalOptimizer {
}
plan = plan.optimize_by_rules(&SET_OPERATION_MERGE);
plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN);
// Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode.
// Should be applied before converting table function to project set.
plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW);
// In order to unnest a table function, we need to convert it into a `project_set` first.
plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_PROJECT_SET);

Expand Down
5 changes: 0 additions & 5 deletions src/frontend/src/optimizer/plan_node/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@ pub fn stream_enforce_eowc_requirement(
}
Ok(StreamEowcSort::new(plan, watermark_col_idx).into())
}
} else if !emit_on_window_close && plan.emit_on_window_close() {
Err(ErrorCode::InternalError(
"Some bad thing happened, the generated plan is not correct.".to_string(),
)
.into())
} else {
Ok(plan)
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ mod cte_ref;
pub use cte_ref::*;
mod recursive_union;
pub use recursive_union::*;
mod now;
pub use now::*;

pub trait DistillUnit {
fn distill_with_name<'a>(&self, name: impl Into<Cow<'a, str>>) -> XmlNode<'a>;
Expand Down
Loading

0 comments on commit da9bd03

Please sign in to comment.