diff --git a/ci/scripts/e2e-test-parallel.sh b/ci/scripts/e2e-test-parallel.sh index 5f16a4c817871..c366ef07fc209 100755 --- a/ci/scripts/e2e-test-parallel.sh +++ b/ci/scripts/e2e-test-parallel.sh @@ -38,21 +38,21 @@ RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=i echo "--- e2e, ci-3streaming-2serving-3fe, streaming" RUST_LOG=$RUST_LOG \ risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-streaming-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-streaming-${profile}" --label "parallel" kill_cluster echo "--- e2e, ci-3streaming-2serving-3fe, batch" RUST_LOG=$RUST_LOG \ risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-batch-ddl-${profile}" -sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-batch-ddl-${profile}" --label "parallel" +sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}" --label "parallel" kill_cluster echo "--- e2e, ci-3streaming-2serving-3fe, generated" RUST_LOG=$RUST_LOG \ risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/generated/**/*.slt' -j 16 --junit "parallel-generated-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/generated/**/*.slt' -j 16 --junit "parallel-generated-${profile}" --label "parallel" kill_cluster diff --git a/e2e_test/streaming/now.slt b/e2e_test/streaming/now.slt new file mode 100644 index 0000000000000..ad086f2202e7a --- /dev/null +++ b/e2e_test/streaming/now.slt @@ -0,0 +1,48 @@ +# In madsim test, we cannot spawn process. +skipif madsim +# In parallel test, we cannot get the DB name. +skipif parallel +# TODO: Later if we introduce a new `now()`-like function that returns the time of statement execution, +# we'll be able to directly create MV without `./risedev psql` and so that we can remove these `skipif`. +system ok +./risedev psql -c " +create materialized view mv as +select * from generate_series( + to_timestamp($(date +%s)) - interval '10 second', + now(), + interval '1 second' +); +" + +skipif madsim +skipif parallel +statement ok +flush; + +skipif madsim +skipif parallel +query I +select count(*) >= 10 from mv; +---- +t + +skipif madsim +skipif parallel +sleep 2s + +skipif madsim +skipif parallel +statement ok +flush; + +skipif madsim +skipif parallel +query I +select count(*) >= 12 from mv; +---- +t + +skipif madsim +skipif parallel +statement ok +drop materialized view mv; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 223a23813f68d..a43e34bde8df3 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -725,9 +725,21 @@ message RowIdGenNode { uint64 row_id_index = 1; } +message NowModeUpdateCurrent {} + +message NowModeGenerateSeries { + data.Datum start_timestamp = 1; + data.Datum interval = 2; +} + message NowNode { // Persists emitted 'now'. catalog.Table state_table = 1; + + oneof mode { + NowModeUpdateCurrent update_current = 101; + NowModeGenerateSeries generate_series = 102; + } } message ValuesNode { diff --git a/src/common/src/array/stream_chunk_builder.rs b/src/common/src/array/stream_chunk_builder.rs index a13cc3676792a..c44d313fffffe 100644 --- a/src/common/src/array/stream_chunk_builder.rs +++ b/src/common/src/array/stream_chunk_builder.rs @@ -104,6 +104,11 @@ impl StreamChunkBuilder { } } + /// Get the current number of rows in the builder. + pub fn size(&self) -> usize { + self.size + } + /// Append an iterator of output index and datum to the builder, return a chunk if the builder /// is full. /// diff --git a/src/common/src/util/epoch.rs b/src/common/src/util/epoch.rs index a067c689f669d..56dbdf6c54da9 100644 --- a/src/common/src/util/epoch.rs +++ b/src/common/src/util/epoch.rs @@ -174,10 +174,11 @@ impl EpochPair { Self::new(curr, curr - EPOCH_INC_MIN_STEP_FOR_TEST) } } -/// As most unit tests initializ a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0. + +/// As most unit tests initialize a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0. /// This method is to turn a a random epoch into a well shifted value. -pub const fn test_epoch(value: u64) -> u64 { - value << EPOCH_AVAILABLE_BITS +pub const fn test_epoch(value_millis: u64) -> u64 { + value_millis << EPOCH_AVAILABLE_BITS } /// There are numerous operations in our system's unit tests that involve incrementing or decrementing the epoch. diff --git a/src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml b/src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml new file mode 100644 index 0000000000000..e121aba41ff6f --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml @@ -0,0 +1,30 @@ +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamptz, + now(), + interval '1 hour' + ); + expected_outputs: + - logical_plan + - optimized_logical_plan_for_stream + - stream_plan +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamp, -- `timestamp` type is not supported + now(), + interval '1 hour' + ); + expected_outputs: + - binder_error +- sql: | + select * from generate_series( + now() - interval '1 hour', + now(), + interval '1 hour' + ); + expected_outputs: + - stream_error +- sql: | + select * from unnest(array[now(), now()]); + expected_outputs: + - stream_error diff --git a/src/frontend/planner_test/tests/testdata/output/expr.yaml b/src/frontend/planner_test/tests/testdata/output/expr.yaml index 4ba572a54e600..f88f7c4d69b76 100644 --- a/src/frontend/planner_test/tests/testdata/output/expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/expr.yaml @@ -543,7 +543,7 @@ Failed to bind expression: v1 >= now() Caused by: - Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: Some(GroupBy). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information + Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: Some(GroupBy). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information - name: forbid now in select for stream sql: | create table t (v1 timestamp with time zone, v2 timestamp with time zone); @@ -552,7 +552,7 @@ Failed to bind expression: now() Caused by: - Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: None. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information + Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: None. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information - name: forbid now in agg filter for stream sql: | create table t (v1 timestamp with time zone, v2 int); @@ -561,7 +561,7 @@ Failed to bind expression: sum(v2) FILTER(WHERE v1 >= now()) Caused by: - Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: Some(Filter). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information + Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: Some(Filter). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information - name: typo pg_teminate_backend sql: | select pg_teminate_backend(1); diff --git a/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml b/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml new file mode 100644 index 0000000000000..4c8d71f987351 --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml @@ -0,0 +1,35 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamptz, + now(), + interval '1 hour' + ); + logical_plan: |- + LogicalProject { exprs: [generate_series] } + └─LogicalTableFunction { table_function: GenerateSeries('2024-06-21 17:36:00':Varchar::Timestamptz, Now, '01:00:00':Interval) } + optimized_logical_plan_for_stream: 'LogicalNow { output: [ts] }' + stream_plan: |- + StreamMaterialize { columns: [generate_series], stream_key: [generate_series], pk_columns: [generate_series], pk_conflict: NoCheck, watermark_columns: [generate_series] } + └─StreamNow { output: [ts] } +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamp, -- `timestamp` type is not supported + now(), + interval '1 hour' + ); + binder_error: function generate_series(timestamp without time zone, timestamp with time zone, interval) does not exist +- sql: | + select * from generate_series( + now() - interval '1 hour', + now(), + interval '1 hour' + ); + stream_error: |- + Not supported: General `now()` function in streaming queries + HINT: Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns. +- sql: | + select * from unnest(array[now(), now()]); + stream_error: |- + Not supported: General `now()` function in streaming queries + HINT: Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns. diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 09c05a29695dc..b9a3b27825abf 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -1572,11 +1572,15 @@ impl Binder { if self.is_for_stream() && !matches!( self.context.clause, - Some(Clause::Where) | Some(Clause::Having) | Some(Clause::JoinOn) + Some(Clause::Where) + | Some(Clause::Having) + | Some(Clause::JoinOn) + | Some(Clause::From) ) { return Err(ErrorCode::InvalidInputSyntax(format!( - "For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: {:?}. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information", + "For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: {:?}. \ + Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information", self.context.clause )) .into()); diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index 9dd5f7be1d53d..89142d0e9b237 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -413,7 +413,7 @@ macro_rules! impl_has_variant { }; } -impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction} +impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction, Now} #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct InequalityInputPair { diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index 9f28dfeb74c8c..0fecee8ab45c0 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -20,6 +20,7 @@ use risingwave_common::types::{DataType, StructType}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::aggregate::AggKind; pub use risingwave_expr::sig::*; +use risingwave_pb::expr::table_function::PbType as PbTableFuncType; use super::{align_types, cast_ok_base, CastContext}; use crate::error::{ErrorCode, Result}; @@ -36,13 +37,24 @@ pub fn infer_type_with_sigmap( sig_map: &FunctionRegistry, ) -> Result { // special cases - if let FuncName::Scalar(func_type) = func_name - && let Some(res) = infer_type_for_special(func_type, inputs).transpose() - { - return res; - } - if let FuncName::Aggregate(AggKind::Grouping) = func_name { - return Ok(DataType::Int32); + match &func_name { + FuncName::Scalar(func_type) => { + if let Some(res) = infer_type_for_special(*func_type, inputs).transpose() { + return res; + } + } + FuncName::Table(func_type) => { + if let Some(res) = infer_type_for_special_table_function(*func_type, inputs).transpose() + { + return res; + } + } + FuncName::Aggregate(agg_kind) => { + if *agg_kind == AggKind::Grouping { + return Ok(DataType::Int32); + } + } + _ => {} } let actuals = inputs @@ -634,6 +646,34 @@ fn infer_type_for_special( } } +fn infer_type_for_special_table_function( + func_type: PbTableFuncType, + inputs: &mut [ExprImpl], +) -> Result> { + match func_type { + PbTableFuncType::GenerateSeries => { + if inputs.len() < 3 { + // let signature map handle this + return Ok(None); + } + match ( + inputs[0].return_type(), + inputs[1].return_type(), + inputs[2].return_type(), + ) { + (DataType::Timestamptz, DataType::Timestamptz, DataType::Interval) => { + // This is to allow `generate_series('2024-06-20 00:00:00'::timestamptz, now(), interval '1 day')`, + // which in streaming mode will be further converted to `StreamNow`. + Ok(Some(DataType::Timestamptz)) + } + // let signature map handle the rest + _ => Ok(None), + } + } + _ => Ok(None), + } +} + /// From all available functions in `sig_map`, find and return the best matching `FuncSign` for the /// provided `func_name` and `inputs`. This not only support exact function signature match, but can /// also match `substr(varchar, smallint)` or even `substr(varchar, unknown)` to `substr(varchar, diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index d452626bb9418..931a645b3d680 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -118,6 +118,14 @@ static DAG_TO_TREE: LazyLock = LazyLock::new(|| { ) }); +static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock = LazyLock::new(|| { + OptimizationStage::new( + "Convert GENERATE_SERIES Ends With NOW", + vec![GenerateSeriesWithNowRule::create()], + ApplyOrder::TopDown, + ) +}); + static TABLE_FUNCTION_TO_PROJECT_SET: LazyLock = LazyLock::new(|| { OptimizationStage::new( "Table Function To Project Set", @@ -572,6 +580,9 @@ impl LogicalOptimizer { } plan = plan.optimize_by_rules(&SET_OPERATION_MERGE); plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN); + // Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode. + // Should be applied before converting table function to project set. + plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW); // In order to unnest a table function, we need to convert it into a `project_set` first. plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_PROJECT_SET); diff --git a/src/frontend/src/optimizer/plan_node/convert.rs b/src/frontend/src/optimizer/plan_node/convert.rs index db961b3b1e20a..6f98073304d8b 100644 --- a/src/frontend/src/optimizer/plan_node/convert.rs +++ b/src/frontend/src/optimizer/plan_node/convert.rs @@ -84,11 +84,6 @@ pub fn stream_enforce_eowc_requirement( } Ok(StreamEowcSort::new(plan, watermark_col_idx).into()) } - } else if !emit_on_window_close && plan.emit_on_window_close() { - Err(ErrorCode::InternalError( - "Some bad thing happened, the generated plan is not correct.".to_string(), - ) - .into()) } else { Ok(plan) } diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index 00db5730e8038..392f073371843 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -78,6 +78,8 @@ mod cte_ref; pub use cte_ref::*; mod recursive_union; pub use recursive_union::*; +mod now; +pub use now::*; pub trait DistillUnit { fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a>; diff --git a/src/frontend/src/optimizer/plan_node/generic/now.rs b/src/frontend/src/optimizer/plan_node/generic/now.rs new file mode 100644 index 0000000000000..911217d064214 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/generic/now.rs @@ -0,0 +1,106 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use educe::Educe; +use enum_as_inner::EnumAsInner; +use pretty_xmlish::{Pretty, Str, XmlNode}; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::types::{DataType, Interval, Timestamptz}; + +use super::{DistillUnit, GenericPlanNode}; +use crate::optimizer::plan_node::utils::childless_record; +use crate::optimizer::property::FunctionalDependencySet; +use crate::OptimizerContextRef; + +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct Now { + #[educe(PartialEq(ignore))] + #[educe(Hash(ignore))] + pub ctx: OptimizerContextRef, + + pub mode: Mode, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumAsInner)] +pub enum Mode { + /// Emit current timestamp on startup, update it on barrier. + UpdateCurrent, + /// Generate a series of timestamps starting from `start_timestamp` with `interval`. + /// Keep generating new timestamps on barrier. + GenerateSeries { + start_timestamp: Timestamptz, + interval: Interval, + }, +} + +impl GenericPlanNode for Now { + fn functional_dependency(&self) -> crate::optimizer::property::FunctionalDependencySet { + FunctionalDependencySet::new(1) // only one column and no dependency + } + + fn schema(&self) -> risingwave_common::catalog::Schema { + Schema::new(vec![Field { + data_type: DataType::Timestamptz, + name: String::from(if self.mode.is_update_current() { + "now" + } else { + "ts" + }), + sub_fields: vec![], + type_name: String::default(), + }]) + } + + fn stream_key(&self) -> Option> { + match self.mode { + Mode::UpdateCurrent => Some(vec![]), + Mode::GenerateSeries { .. } => Some(vec![0]), + } + } + + fn ctx(&self) -> OptimizerContextRef { + self.ctx.clone() + } +} + +impl Now { + pub fn update_current(ctx: OptimizerContextRef) -> Self { + Self::new_inner(ctx, Mode::UpdateCurrent) + } + + pub fn generate_series( + ctx: OptimizerContextRef, + start_timestamp: Timestamptz, + interval: Interval, + ) -> Self { + Self::new_inner( + ctx, + Mode::GenerateSeries { + start_timestamp, + interval, + }, + ) + } + + fn new_inner(ctx: OptimizerContextRef, mode: Mode) -> Self { + Self { ctx, mode } + } +} + +impl DistillUnit for Now { + fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a> { + childless_record(name, vec![("mode", Pretty::debug(&self.mode))]) + } +} diff --git a/src/frontend/src/optimizer/plan_node/logical_now.rs b/src/frontend/src/optimizer/plan_node/logical_now.rs index f9c33eb3d9cc1..ea34037c8977a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_now.rs +++ b/src/frontend/src/optimizer/plan_node/logical_now.rs @@ -14,10 +14,8 @@ use pretty_xmlish::XmlNode; use risingwave_common::bail; -use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::types::DataType; -use super::generic::GenericPlanRef; +use super::generic::{self, GenericPlanRef, Mode}; use super::utils::{childless_record, Distill}; use super::{ ColPrunable, ColumnPruningContext, ExprRewritable, Logical, LogicalFilter, PlanBase, PlanRef, @@ -26,30 +24,25 @@ use super::{ use crate::error::Result; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::column_names_pretty; -use crate::optimizer::property::FunctionalDependencySet; use crate::utils::ColIndexMapping; -use crate::OptimizerContextRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogicalNow { pub base: PlanBase, + core: generic::Now, } impl LogicalNow { - pub fn new(ctx: OptimizerContextRef) -> Self { - let schema = Schema::new(vec![Field { - data_type: DataType::Timestamptz, - name: String::from("now"), - sub_fields: vec![], - type_name: String::default(), - }]); - let base = PlanBase::new_logical( - ctx, - schema, - Some(vec![]), - FunctionalDependencySet::default(), - ); - Self { base } + pub fn new(core: generic::Now) -> Self { + let base = PlanBase::new_logical_with_core(&core); + Self { base, core } + } + + pub fn max_one_row(&self) -> bool { + match self.core.mode { + Mode::UpdateCurrent => true, + Mode::GenerateSeries { .. } => false, + } } } @@ -91,7 +84,7 @@ impl ToStream for LogicalNow { /// `to_stream` is equivalent to `to_stream_with_dist_required(RequiredDist::Any)` fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { - Ok(StreamNow::new(self.clone(), self.ctx()).into()) + Ok(StreamNow::new(self.core.clone()).into()) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_project_set.rs b/src/frontend/src/optimizer/plan_node/logical_project_set.rs index 6f31b35a21ee0..8f6966ece6c70 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project_set.rs @@ -22,7 +22,7 @@ use super::{ LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamProjectSet, ToBatch, ToStream, }; -use crate::error::Result; +use crate::error::{ErrorCode, Result}; use crate::expr::{ collect_input_refs, Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall, InputRef, TableFunction, @@ -400,6 +400,16 @@ impl ToStream for LogicalProjectSet { // TODO: implement to_stream_with_dist_required like LogicalProject fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { + if self.select_list().iter().any(|item| item.has_now()) { + // User may use `now()` in table function in a wrong way, because we allow `now()` in `FROM` clause. + return Err(ErrorCode::NotSupported( + "General `now()` function in streaming queries".to_string(), + "Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns." + .to_string(), + ) + .into()); + } + let new_input = self.input().to_stream(ctx)?; let mut new_logical = self.core.clone(); new_logical.input = new_input; diff --git a/src/frontend/src/optimizer/plan_node/stream_now.rs b/src/frontend/src/optimizer/plan_node/stream_now.rs index d27321c08d06b..22a0d2c5fb0fb 100644 --- a/src/frontend/src/optimizer/plan_node/stream_now.rs +++ b/src/frontend/src/optimizer/plan_node/stream_now.rs @@ -14,46 +14,39 @@ use fixedbitset::FixedBitSet; use pretty_xmlish::XmlNode; -use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::types::DataType; +use risingwave_common::types::Datum; +use risingwave_common::util::value_encoding::DatumToProtoExt; +use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode; use risingwave_pb::stream_plan::stream_node::NodeBody; -use risingwave_pb::stream_plan::NowNode; +use risingwave_pb::stream_plan::{PbNowModeGenerateSeries, PbNowModeUpdateCurrent, PbNowNode}; +use super::generic::Mode; use super::stream::prelude::*; use super::utils::{childless_record, Distill, TableCatalogBuilder}; -use super::{ExprRewritable, LogicalNow, PlanBase, StreamNode}; +use super::{generic, ExprRewritable, PlanBase, StreamNode}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::column_names_pretty; -use crate::optimizer::property::{Distribution, FunctionalDependencySet}; +use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; -use crate::OptimizerContextRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamNow { pub base: PlanBase, + core: generic::Now, } impl StreamNow { - pub fn new(_logical: LogicalNow, ctx: OptimizerContextRef) -> Self { - let schema = Schema::new(vec![Field { - data_type: DataType::Timestamptz, - name: String::from("now"), - sub_fields: vec![], - type_name: String::default(), - }]); + pub fn new(core: generic::Now) -> Self { let mut watermark_columns = FixedBitSet::with_capacity(1); watermark_columns.set(0, true); - let base = PlanBase::new_stream( - ctx, - schema, - Some(vec![]), - FunctionalDependencySet::default(), + let base = PlanBase::new_stream_with_core( + &core, Distribution::Single, - false, - false, // TODO(rc): derive EOWC property from input + core.mode.is_generate_series(), // append only + core.mode.is_generate_series(), // emit on window close watermark_columns, ); - Self { base } + Self { base, core } } } @@ -83,8 +76,18 @@ impl StreamNode for StreamNow { let table_catalog = internal_table_catalog_builder .build(dist_keys, 0) .with_id(state.gen_table_id_wrapped()); - NodeBody::Now(NowNode { + NodeBody::Now(PbNowNode { state_table: Some(table_catalog.to_internal_table_prost()), + mode: Some(match &self.core.mode { + Mode::UpdateCurrent => PbNowMode::UpdateCurrent(PbNowModeUpdateCurrent {}), + Mode::GenerateSeries { + start_timestamp, + interval, + } => PbNowMode::GenerateSeries(PbNowModeGenerateSeries { + start_timestamp: Some(Datum::Some((*start_timestamp).into()).to_protobuf()), + interval: Some(Datum::Some((*interval).into()).to_protobuf()), + }), + }), }) } } diff --git a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs index c81bf539a5713..07459b59b1d5f 100644 --- a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs +++ b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs @@ -174,8 +174,12 @@ impl PlanVisitor for CardinalityVisitor { } } - fn visit_logical_now(&mut self, _plan: &plan_node::LogicalNow) -> Cardinality { - 1.into() + fn visit_logical_now(&mut self, plan: &plan_node::LogicalNow) -> Cardinality { + if plan.max_one_row() { + 1.into() + } else { + Cardinality::unknown() + } } fn visit_logical_expand(&mut self, plan: &plan_node::LogicalExpand) -> Cardinality { diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 9364a6f2b7f5b..fd06402c7497f 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -92,6 +92,7 @@ pub use top_n_on_index_rule::*; mod stream; pub use stream::bushy_tree_join_ordering_rule::*; pub use stream::filter_with_now_to_join_rule::*; +pub use stream::generate_series_with_now_rule::*; pub use stream::split_now_and_rule::*; pub use stream::split_now_or_rule::*; pub use stream::stream_project_merge_rule::*; @@ -203,6 +204,7 @@ macro_rules! for_all_rules { , { SplitNowAndRule } , { SplitNowOrRule } , { FilterWithNowToJoinRule } + , { GenerateSeriesWithNowRule } , { TopNOnIndexRule } , { TrivialProjectToValuesRule } , { UnionInputValuesMergeRule } diff --git a/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs b/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs index 498696589c81b..cbdb65b4528a5 100644 --- a/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs +++ b/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs @@ -18,7 +18,7 @@ use risingwave_pb::plan_common::JoinType; use crate::expr::{ try_derive_watermark, ExprRewriter, FunctionCall, InputRef, WatermarkDerivation, }; -use crate::optimizer::plan_node::generic::GenericPlanRef; +use crate::optimizer::plan_node::generic::{self, GenericPlanRef}; use crate::optimizer::plan_node::{LogicalFilter, LogicalJoin, LogicalNow}; use crate::optimizer::rule::{BoxedRule, Rule}; use crate::optimizer::PlanRef; @@ -63,7 +63,7 @@ impl Rule for FilterWithNowToJoinRule { for now_filter in now_filters { new_plan = LogicalJoin::new( new_plan, - LogicalNow::new(plan.ctx()).into(), + LogicalNow::new(generic::Now::update_current(plan.ctx())).into(), JoinType::LeftSemi, Condition { conjunctions: vec![now_filter.into()], diff --git a/src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs b/src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs new file mode 100644 index 0000000000000..665967f6660b0 --- /dev/null +++ b/src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs @@ -0,0 +1,86 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::DataType; +use risingwave_pb::expr::table_function::PbType as PbTableFuncType; + +use crate::expr::{Expr, ExprRewriter}; +use crate::optimizer::plan_node::{generic, LogicalNow}; +use crate::optimizer::rule::{BoxedRule, Rule}; +use crate::PlanRef; + +pub struct GenerateSeriesWithNowRule {} +impl Rule for GenerateSeriesWithNowRule { + fn apply(&self, plan: PlanRef) -> Option { + let ctx = plan.ctx(); + let table_func = plan.as_logical_table_function()?.table_function(); + + if !table_func.args.iter().any(|arg| arg.has_now()) { + return None; + } + + if !(table_func.function_type == PbTableFuncType::GenerateSeries + && table_func.args.len() == 3 + && table_func.args[0].return_type() == DataType::Timestamptz + && table_func.args[1].is_now() + && table_func.args[2].return_type() == DataType::Interval) + { + // only convert `generate_series(const timestamptz, now(), const interval)` + ctx.warn_to_user( + "`now()` is currently only supported in `generate_series(timestamptz, timestamptz, interval)` function as `stop`. \ + You may not using it correctly. Please kindly check the document." + ); + return None; + } + + let start_timestamp = ctx + .session_timezone() + .rewrite_expr(table_func.args[0].clone()) + .try_fold_const() + .transpose() + .ok() + .flatten() + .flatten(); + let interval = ctx + .session_timezone() + .rewrite_expr(table_func.args[2].clone()) + .try_fold_const() + .transpose() + .ok() + .flatten() + .flatten(); + + if start_timestamp.is_none() || interval.is_none() { + ctx.warn_to_user( + "When using `generate_series` with `now()`, the `start` and `step` must be non-NULL constants", + ); + return None; + } + + Some( + LogicalNow::new(generic::Now::generate_series( + ctx, + start_timestamp.unwrap().into_timestamptz(), + interval.unwrap().into_interval(), + )) + .into(), + ) + } +} + +impl GenerateSeriesWithNowRule { + pub fn create() -> BoxedRule { + Box::new(Self {}) + } +} diff --git a/src/frontend/src/optimizer/rule/stream/mod.rs b/src/frontend/src/optimizer/rule/stream/mod.rs index cc86298e766e8..539d9048cff60 100644 --- a/src/frontend/src/optimizer/rule/stream/mod.rs +++ b/src/frontend/src/optimizer/rule/stream/mod.rs @@ -14,6 +14,7 @@ pub(crate) mod bushy_tree_join_ordering_rule; pub(crate) mod filter_with_now_to_join_rule; +pub(crate) mod generate_series_with_now_rule; pub(crate) mod split_now_and_rule; pub(crate) mod split_now_or_rule; pub(crate) mod stream_project_merge_rule; diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 6d5c5867060de..a6a2152283668 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -129,7 +129,7 @@ pub use lookup_union::LookupUnionExecutor; pub use merge::MergeExecutor; pub use mview::*; pub use no_op::NoOpExecutor; -pub use now::NowExecutor; +pub use now::*; pub use over_window::*; pub use project::ProjectExecutor; pub use project_set::*; diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index 43316fef71094..049eee7f8c724 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -15,30 +15,65 @@ use std::ops::Bound; use std::ops::Bound::Unbounded; +use itertools::Itertools; use risingwave_common::array::Op; use risingwave_common::row; +use risingwave_common::types::{DefaultOrdered, Interval, Timestamptz, ToDatumRef}; +use risingwave_expr::capture_context; +use risingwave_expr::expr::{ + build_func_non_strict, EvalErrorReport, ExpressionBoxExt, InputRefExpression, + LiteralExpression, NonStrictExpression, +}; +use risingwave_expr::expr_context::TIME_ZONE; use tokio::sync::mpsc::UnboundedReceiver; use tokio_stream::wrappers::UnboundedReceiverStream; use crate::executor::prelude::*; +use crate::task::ActorEvalErrorReport; pub struct NowExecutor { data_types: Vec, + mode: NowMode, + eval_error_report: ActorEvalErrorReport, + /// Receiver of barrier channel. barrier_receiver: UnboundedReceiver, state_table: StateTable, } +pub enum NowMode { + /// Emit current timestamp on startup, update it on barrier. + UpdateCurrent, + /// Generate a series of timestamps starting from `start_timestamp` with `interval`. + /// Keep generating new timestamps on barrier. + GenerateSeries { + start_timestamp: Timestamptz, + interval: Interval, + }, +} + +enum ModeVars { + UpdateCurrent, + GenerateSeries { + chunk_builder: StreamChunkBuilder, + add_interval_expr: NonStrictExpression, + }, +} + impl NowExecutor { pub fn new( data_types: Vec, + mode: NowMode, + eval_error_report: ActorEvalErrorReport, barrier_receiver: UnboundedReceiver, state_table: StateTable, ) -> Self { Self { data_types, + mode, + eval_error_report, barrier_receiver, state_table, } @@ -48,24 +83,43 @@ impl NowExecutor { async fn execute_inner(self) { let Self { data_types, + mode, + eval_error_report, barrier_receiver, mut state_table, } = self; + let max_chunk_size = crate::config::chunk_size(); + // Whether the executor is paused. let mut paused = false; // The last timestamp **sent** to the downstream. let mut last_timestamp: Datum = None; + // Whether the first barrier is handled and `last_timestamp` is initialized. let mut initialized = false; + let mut mode_vars = match &mode { + NowMode::UpdateCurrent => ModeVars::UpdateCurrent, + NowMode::GenerateSeries { interval, .. } => { + // in most cases there won't be more than one row except for the first time + let chunk_builder = StreamChunkBuilder::unlimited(data_types.clone(), Some(1)); + let add_interval_expr = + build_add_interval_expr_captured(*interval, eval_error_report)?; + ModeVars::GenerateSeries { + chunk_builder, + add_interval_expr, + } + } + }; + const MAX_MERGE_BARRIER_SIZE: usize = 64; #[for_await] for barriers in UnboundedReceiverStream::new(barrier_receiver).ready_chunks(MAX_MERGE_BARRIER_SIZE) { - let mut timestamp = None; + let mut curr_timestamp = None; if barriers.len() > 1 { warn!( "handle multiple barriers at once in now executor: {}", @@ -74,7 +128,7 @@ impl NowExecutor { } for barrier in barriers { if !initialized { - // Handle the first barrier. + // Handle the initial barrier. state_table.init_epoch(barrier.epoch); let state_row = { let sub_range: &(Bound, Bound) = @@ -97,7 +151,7 @@ impl NowExecutor { } // Extract timestamp from the current epoch. - timestamp = Some(barrier.get_curr_epoch().as_scalar()); + curr_timestamp = Some(barrier.get_curr_epoch().as_scalar()); // Update paused state. if let Some(mutation) = barrier.mutation.as_deref() { @@ -116,28 +170,94 @@ impl NowExecutor { continue; } - let stream_chunk = if last_timestamp.is_some() { - let last_row = row::once(&last_timestamp); - let row = row::once(×tamp); - state_table.update(last_row, row); + match (&mode, &mut mode_vars) { + (NowMode::UpdateCurrent, ModeVars::UpdateCurrent) => { + let chunk = if last_timestamp.is_some() { + let last_row = row::once(&last_timestamp); + let row = row::once(&curr_timestamp); + state_table.update(last_row, row); + + StreamChunk::from_rows( + &[(Op::Delete, last_row), (Op::Insert, row)], + &data_types, + ) + } else { + let row = row::once(&curr_timestamp); + state_table.insert(row); + + StreamChunk::from_rows(&[(Op::Insert, row)], &data_types) + }; - StreamChunk::from_rows(&[(Op::Delete, last_row), (Op::Insert, row)], &data_types) - } else { - let row = row::once(×tamp); - state_table.insert(row); + yield Message::Chunk(chunk); + last_timestamp.clone_from(&curr_timestamp) + } + ( + NowMode::GenerateSeries { + start_timestamp, .. + }, + ModeVars::GenerateSeries { + chunk_builder, + ref add_interval_expr, + }, + ) => { + if last_timestamp.is_none() { + // We haven't emit any timestamp yet. Let's emit the first one and populate the state table. + let first = Some((*start_timestamp).into()); + let first_row = row::once(&first); + let _ = chunk_builder.append_row(Op::Insert, first_row); + state_table.insert(first_row); + last_timestamp = first; + } - StreamChunk::from_rows(&[(Op::Insert, row)], &data_types) - }; + // Now let's step through the timestamps from the last timestamp to the current timestamp. + // We use `last_row` as a temporary cursor to track the progress, and won't touch `last_timestamp` + // until the end of the loop, so that `last_timestamp` is always synced with the state table. + let mut last_row = OwnedRow::new(vec![last_timestamp.clone()]); + + loop { + if chunk_builder.size() >= max_chunk_size { + // Manually yield the chunk when size exceeds the limit. We don't want to use chunk builder + // with limited size here because the initial capacity can be too large for most cases. + // Basically only the first several chunks can potentially exceed the `max_chunk_size`. + if let Some(chunk) = chunk_builder.take() { + yield Message::Chunk(chunk); + } + } + + let next = add_interval_expr.eval_row_infallible(&last_row).await; + if DefaultOrdered(next.to_datum_ref()) + > DefaultOrdered(curr_timestamp.to_datum_ref()) + { + // We only increase the timestamp to the current timestamp. + break; + } + + let next_row = OwnedRow::new(vec![next]); + let _ = chunk_builder.append_row(Op::Insert, &next_row); + last_row = next_row; + } + + if let Some(chunk) = chunk_builder.take() { + yield Message::Chunk(chunk); + } - yield Message::Chunk(stream_chunk); + // Update the last timestamp. + state_table.update(row::once(&last_timestamp), &last_row); + last_timestamp = last_row + .into_inner() + .into_vec() + .into_iter() + .exactly_one() + .unwrap(); + } + _ => unreachable!(), + } yield Message::Watermark(Watermark::new( 0, DataType::Timestamptz, - timestamp.clone().unwrap(), + curr_timestamp.unwrap(), )); - - last_timestamp = timestamp; } } } @@ -148,36 +268,54 @@ impl Execute for NowExecutor { } } +#[capture_context(TIME_ZONE)] +pub fn build_add_interval_expr( + time_zone: &str, + interval: Interval, + eval_error_report: impl EvalErrorReport + 'static, +) -> risingwave_expr::Result { + let timestamptz_input = InputRefExpression::new(DataType::Timestamptz, 0); + let interval = LiteralExpression::new(DataType::Interval, Some(interval.into())); + let time_zone = LiteralExpression::new(DataType::Varchar, Some(time_zone.into())); + + use risingwave_pb::expr::expr_node::PbType as PbExprType; + build_func_non_strict( + PbExprType::AddWithTimeZone, + DataType::Timestamptz, + vec![ + timestamptz_input.boxed(), + interval.boxed(), + time_zone.boxed(), + ], + eval_error_report, + ) +} + #[cfg(test)] mod tests { - use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; use risingwave_common::test_prelude::StreamChunkTestExt; - use risingwave_common::types::{DataType, ScalarImpl}; + use risingwave_common::types::test_utils::IntervalTestExt; + use risingwave_common::util::epoch::test_epoch; use risingwave_storage::memory::MemoryStateStore; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; - use super::NowExecutor; - use crate::common::table::state_table::StateTable; + use super::*; use crate::executor::test_utils::StreamExecutorTestExt; - use crate::executor::{ - Barrier, BoxedMessageStream, Execute, Mutation, StreamExecutorResult, Watermark, - }; #[tokio::test] async fn test_now() -> StreamExecutorResult<()> { let state_store = create_state_store(); - let (tx, mut now_executor) = create_executor(&state_store).await; + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; // Init barrier - tx.send(Barrier::with_prev_epoch_for_test(1 << 16, 1)) - .unwrap(); + tx.send(Barrier::new_test_barrier(test_epoch(1))).unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), @@ -188,7 +326,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -199,14 +337,17 @@ mod tests { ) ); - tx.send(Barrier::with_prev_epoch_for_test(2 << 16, 1 << 16)) - .unwrap(); + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(2), + test_epoch(1), + )) + .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), @@ -218,7 +359,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -230,21 +371,25 @@ mod tests { ); // No more messages until the next barrier - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); // Recovery - drop((tx, now_executor)); - let (tx, mut now_executor) = create_executor(&state_store).await; - tx.send(Barrier::with_prev_epoch_for_test(3 << 16, 1 << 16)) - .unwrap(); + drop((tx, now)); + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(3), + test_epoch(2), + )) + .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), + // the last chunk was not checkpointed so the deleted old value should be `001` StreamChunk::from_pretty( " TZ - 2021-04-01T00:00:00.001Z @@ -253,7 +398,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -265,28 +410,32 @@ mod tests { ); // Recovery with paused - drop((tx, now_executor)); - let (tx, mut now_executor) = create_executor(&state_store).await; - tx.send(Barrier::new_test_barrier(4 << 16).with_mutation(Mutation::Pause)) - .unwrap(); + drop((tx, now)); + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; + tx.send( + Barrier::with_prev_epoch_for_test(test_epoch(4), test_epoch(3)) + .with_mutation(Mutation::Pause), + ) + .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // There should be no messages until `Resume` - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); // Resume barrier tx.send( - Barrier::with_prev_epoch_for_test(5 << 16, 4 << 16).with_mutation(Mutation::Resume), + Barrier::with_prev_epoch_for_test(test_epoch(5), test_epoch(4)) + .with_mutation(Mutation::Resume), ) .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), StreamChunk::from_pretty( @@ -297,7 +446,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -314,29 +463,30 @@ mod tests { #[tokio::test] async fn test_now_start_with_paused() -> StreamExecutorResult<()> { let state_store = create_state_store(); - let (tx, mut now_executor) = create_executor(&state_store).await; + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; // Init barrier - tx.send(Barrier::with_prev_epoch_for_test(1 << 16, 1).with_mutation(Mutation::Pause)) + tx.send(Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Pause)) .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // There should be no messages until `Resume` - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); // Resume barrier tx.send( - Barrier::with_prev_epoch_for_test(2 << 16, 1 << 16).with_mutation(Mutation::Resume), + Barrier::with_prev_epoch_for_test(test_epoch(2), test_epoch(1)) + .with_mutation(Mutation::Resume), ) .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), @@ -347,7 +497,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -359,7 +509,121 @@ mod tests { ); // No more messages until the next barrier - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); + + Ok(()) + } + + #[tokio::test] + async fn test_now_generate_series() -> StreamExecutorResult<()> { + TIME_ZONE::scope("UTC".to_string(), test_now_generate_series_inner()).await + } + + async fn test_now_generate_series_inner() -> StreamExecutorResult<()> { + let start_timestamp = Timestamptz::from_secs(1617235190).unwrap(); // 2021-03-31 23:59:50 UTC + let interval = Interval::from_millis(1000); // 1s interval + + let state_store = create_state_store(); + let (tx, mut now) = create_executor( + NowMode::GenerateSeries { + start_timestamp, + interval, + }, + &state_store, + ) + .await; + + // Init barrier + tx.send(Barrier::new_test_barrier(test_epoch(1000))) + .unwrap(); + now.next_unwrap_ready_barrier()?; + + // Initial timestamps + let chunk = now.next_unwrap_ready_chunk()?; + assert_eq!(chunk.cardinality(), 12); // seconds from 23:59:50 to 00:00:01 (inclusive) + + assert_eq!( + now.next_unwrap_ready_watermark()?, + Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Timestamptz("2021-04-01T00:00:01.000Z".parse().unwrap()) + ) + ); + + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(2000), + test_epoch(1000), + )) + .unwrap(); + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(3000), + test_epoch(2000), + )) + .unwrap(); + + now.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; + + let chunk = now.next_unwrap_ready_chunk()?; + assert_eq!( + chunk.compact(), + StreamChunk::from_pretty( + " TZ + + 2021-04-01T00:00:02.000Z + + 2021-04-01T00:00:03.000Z" + ) + ); + + let watermark = now.next_unwrap_ready_watermark()?; + assert_eq!( + watermark, + Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Timestamptz("2021-04-01T00:00:03.000Z".parse().unwrap()) + ) + ); + + // Recovery + drop((tx, now)); + let (tx, mut now) = create_executor( + NowMode::GenerateSeries { + start_timestamp, + interval, + }, + &state_store, + ) + .await; + + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(4000), + test_epoch(3000), + )) + .unwrap(); + + now.next_unwrap_ready_barrier()?; + + let chunk = now.next_unwrap_ready_chunk()?; + assert_eq!( + chunk.compact(), + StreamChunk::from_pretty( + " TZ + + 2021-04-01T00:00:02.000Z + + 2021-04-01T00:00:03.000Z + + 2021-04-01T00:00:04.000Z" + ) + ); + + let watermark = now.next_unwrap_ready_watermark()?; + assert_eq!( + watermark, + Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Timestamptz("2021-04-01T00:00:04.000Z".parse().unwrap()) + ) + ); Ok(()) } @@ -369,6 +633,7 @@ mod tests { } async fn create_executor( + mode: NowMode, state_store: &MemoryStateStore, ) -> (UnboundedSender, BoxedMessageStream) { let table_id = TableId::new(1); @@ -384,8 +649,17 @@ mod tests { let (sender, barrier_receiver) = unbounded_channel(); - let now_executor = - NowExecutor::new(vec![DataType::Timestamptz], barrier_receiver, state_table); + let eval_error_report = ActorEvalErrorReport { + actor_context: ActorContext::for_test(123), + identity: "NowExecutor".into(), + }; + let now_executor = NowExecutor::new( + vec![DataType::Timestamptz], + mode, + eval_error_report, + barrier_receiver, + state_table, + ); (sender, now_executor.boxed().execute()) } } diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs index a917cd136f3b4..9eac7caa13557 100644 --- a/src/stream/src/from_proto/now.rs +++ b/src/stream/src/from_proto/now.rs @@ -12,14 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::stream_plan::NowNode; +use anyhow::Context; +use risingwave_common::types::{DataType, Datum}; +use risingwave_common::util::value_encoding::DatumFromProtoExt; +use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode; +use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries}; use risingwave_storage::StateStore; use tokio::sync::mpsc::unbounded_channel; use super::ExecutorBuilder; use crate::common::table::state_table::StateTable; use crate::error::StreamResult; -use crate::executor::{Executor, NowExecutor}; +use crate::executor::{Executor, NowExecutor, NowMode}; use crate::task::ExecutorParams; pub struct NowExecutorBuilder; @@ -37,11 +41,43 @@ impl ExecutorBuilder for NowExecutorBuilder { .create_actor_context .register_sender(params.actor_context.id, sender); + let mode = if let Ok(pb_mode) = node.get_mode() { + match pb_mode { + PbNowMode::UpdateCurrent(_) => NowMode::UpdateCurrent, + PbNowMode::GenerateSeries(PbNowModeGenerateSeries { + start_timestamp, + interval, + }) => { + let start_timestamp = Datum::from_protobuf( + start_timestamp.as_ref().unwrap(), + &DataType::Timestamptz, + ) + .context("`start_timestamp` field is not decodable")? + .context("`start_timestamp` field should not be NULL")? + .into_timestamptz(); + let interval = + Datum::from_protobuf(interval.as_ref().unwrap(), &DataType::Interval) + .context("`interval` field is not decodable")? + .context("`interval` field should not be NULL")? + .into_interval(); + NowMode::GenerateSeries { + start_timestamp, + interval, + } + } + } + } else { + // default to `UpdateCurrent` for backward-compatibility + NowMode::UpdateCurrent + }; + let state_table = StateTable::from_table_catalog(node.get_state_table()?, store, None).await; let exec = NowExecutor::new( params.info.schema.data_types(), + mode, + params.eval_error_report, barrier_receiver, state_table, ); diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index 51a735af0f5a7..a199e4c8acb9f 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -40,6 +40,7 @@ #![feature(btree_cursors)] #![feature(assert_matches)] #![feature(try_blocks)] +#![feature(result_flattening)] // required by `capture_context` use std::sync::Arc;