From da4d0483dd89099ba175c86c7390148966d56a48 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 20 Jun 2024 15:38:41 +0800 Subject: [PATCH 01/13] support `GenerateSeries` mode in NowExecutor Signed-off-by: Richard Chien --- src/common/src/array/stream_chunk_builder.rs | 5 + src/common/src/util/epoch.rs | 7 +- src/stream/src/executor/mod.rs | 10 + src/stream/src/executor/now.rs | 224 +++++++++++++++++-- src/stream/src/lib.rs | 1 + 5 files changed, 223 insertions(+), 24 deletions(-) diff --git a/src/common/src/array/stream_chunk_builder.rs b/src/common/src/array/stream_chunk_builder.rs index a13cc3676792a..c44d313fffffe 100644 --- a/src/common/src/array/stream_chunk_builder.rs +++ b/src/common/src/array/stream_chunk_builder.rs @@ -104,6 +104,11 @@ impl StreamChunkBuilder { } } + /// Get the current number of rows in the builder. + pub fn size(&self) -> usize { + self.size + } + /// Append an iterator of output index and datum to the builder, return a chunk if the builder /// is full. /// diff --git a/src/common/src/util/epoch.rs b/src/common/src/util/epoch.rs index a067c689f669d..56dbdf6c54da9 100644 --- a/src/common/src/util/epoch.rs +++ b/src/common/src/util/epoch.rs @@ -174,10 +174,11 @@ impl EpochPair { Self::new(curr, curr - EPOCH_INC_MIN_STEP_FOR_TEST) } } -/// As most unit tests initializ a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0. + +/// As most unit tests initialize a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0. /// This method is to turn a a random epoch into a well shifted value. -pub const fn test_epoch(value: u64) -> u64 { - value << EPOCH_AVAILABLE_BITS +pub const fn test_epoch(value_millis: u64) -> u64 { + value_millis << EPOCH_AVAILABLE_BITS } /// There are numerous operations in our system's unit tests that involve incrementing or decrementing the epoch. diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 6d5c5867060de..40ddf5b91c48b 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -329,6 +329,16 @@ impl Barrier { } } + pub fn new_initial_for_test(epoch: u64) -> Self { + Self { + epoch: EpochPair::new_test_epoch(epoch), + kind: BarrierKind::Initial, + tracing_context: TracingContext::none(), + mutation: Default::default(), + passed_actors: Default::default(), + } + } + #[must_use] pub fn with_mutation(self, mutation: Mutation) -> Self { Self { diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index 43316fef71094..be21cbad32e33 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -15,8 +15,16 @@ use std::ops::Bound; use std::ops::Bound::Unbounded; +use itertools::Itertools; use risingwave_common::array::Op; use risingwave_common::row; +use risingwave_common::types::test_utils::IntervalTestExt; +use risingwave_common::types::{DefaultOrdered, Interval, Timestamptz, ToDatumRef}; +use risingwave_expr::capture_context; +use risingwave_expr::expr::{ + build_func, BoxedExpression, ExpressionBoxExt, InputRefExpression, LiteralExpression, +}; +use risingwave_expr::expr_context::TIME_ZONE; use tokio::sync::mpsc::UnboundedReceiver; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -25,12 +33,31 @@ use crate::executor::prelude::*; pub struct NowExecutor { data_types: Vec, + mode: Mode, + max_chunk_size: usize, + /// Receiver of barrier channel. barrier_receiver: UnboundedReceiver, state_table: StateTable, } +enum Mode { + /// Emit current timestamp on startup, update it on barrier. + UpdateCurrent, + /// Generate a series of timestamps starting from `start_timestamp` with `interval`. + /// Keep generating new timestamps on barrier. + GenerateSeries { + start_timestamp: Timestamptz, + add_interval_expr: BoxedExpression, + }, +} + +enum ModeVars { + UpdateCurrent, + GenerateSeries { chunk_builder: StreamChunkBuilder }, +} + impl NowExecutor { pub fn new( data_types: Vec, @@ -39,6 +66,12 @@ impl NowExecutor { ) -> Self { Self { data_types, + // TODO(): only for dev + mode: Mode::GenerateSeries { + start_timestamp: Timestamptz::from_secs(1617235190).unwrap(), /* 2021-03-31 23:59:50 UTC */ + add_interval_expr: build_add_expr_captured(Interval::from_millis(1000)).unwrap(), + }, + max_chunk_size: 1024, barrier_receiver, state_table, } @@ -48,6 +81,8 @@ impl NowExecutor { async fn execute_inner(self) { let Self { data_types, + mode, + max_chunk_size, barrier_receiver, mut state_table, } = self; @@ -56,8 +91,15 @@ impl NowExecutor { let mut paused = false; // The last timestamp **sent** to the downstream. let mut last_timestamp: Datum = None; - // Whether the first barrier is handled and `last_timestamp` is initialized. - let mut initialized = false; + + let mut mode_vars = match &mode { + Mode::UpdateCurrent => ModeVars::UpdateCurrent, + Mode::GenerateSeries { .. } => { + // in most cases there won't be more than one row except for the first time + let chunk_builder = StreamChunkBuilder::unlimited(data_types.clone(), Some(1)); + ModeVars::GenerateSeries { chunk_builder } + } + }; const MAX_MERGE_BARRIER_SIZE: usize = 64; @@ -65,7 +107,7 @@ impl NowExecutor { for barriers in UnboundedReceiverStream::new(barrier_receiver).ready_chunks(MAX_MERGE_BARRIER_SIZE) { - let mut timestamp = None; + let mut curr_timestamp = None; if barriers.len() > 1 { warn!( "handle multiple barriers at once in now executor: {}", @@ -73,8 +115,8 @@ impl NowExecutor { ); } for barrier in barriers { - if !initialized { - // Handle the first barrier. + if barrier.kind.is_initial() { + // Handle the initial barrier. state_table.init_epoch(barrier.epoch); let state_row = { let sub_range: &(Bound, Bound) = @@ -91,13 +133,12 @@ impl NowExecutor { }; last_timestamp = state_row.and_then(|row| row[0].clone()); paused = barrier.is_pause_on_startup(); - initialized = true; } else { state_table.commit(barrier.epoch).await?; } // Extract timestamp from the current epoch. - timestamp = Some(barrier.get_curr_epoch().as_scalar()); + curr_timestamp = Some(barrier.get_curr_epoch().as_scalar()); // Update paused state. if let Some(mutation) = barrier.mutation.as_deref() { @@ -116,28 +157,92 @@ impl NowExecutor { continue; } - let stream_chunk = if last_timestamp.is_some() { - let last_row = row::once(&last_timestamp); - let row = row::once(×tamp); - state_table.update(last_row, row); + match (&mode, &mut mode_vars) { + (Mode::UpdateCurrent, ModeVars::UpdateCurrent) => { + let chunk = if last_timestamp.is_some() { + let last_row = row::once(&last_timestamp); + let row = row::once(&curr_timestamp); + state_table.update(last_row, row); + + StreamChunk::from_rows( + &[(Op::Delete, last_row), (Op::Insert, row)], + &data_types, + ) + } else { + let row = row::once(&curr_timestamp); + state_table.insert(row); + + StreamChunk::from_rows(&[(Op::Insert, row)], &data_types) + }; - StreamChunk::from_rows(&[(Op::Delete, last_row), (Op::Insert, row)], &data_types) - } else { - let row = row::once(×tamp); - state_table.insert(row); + yield Message::Chunk(chunk); + last_timestamp = curr_timestamp.clone(); + } + ( + Mode::GenerateSeries { + start_timestamp, + add_interval_expr, + }, + ModeVars::GenerateSeries { chunk_builder }, + ) => { + if last_timestamp.is_none() { + // We haven't emit any timestamp yet. Let's emit the first one and populate the state table. + let first = Some(start_timestamp.clone().into()); + let first_row = row::once(&first); + let _ = chunk_builder.append_row(Op::Insert, first_row); + state_table.insert(first_row); + last_timestamp = first; + } - StreamChunk::from_rows(&[(Op::Insert, row)], &data_types) - }; + // Now let's step through the timestamps from the last timestamp to the current timestamp. + // We use `last_row` as a temporary cursor to track the progress, and won't touch `last_timestamp` + // until the end of the loop, so that `last_timestamp` is always synced with the state table. + let mut last_row = OwnedRow::new(vec![last_timestamp.clone()]); + + loop { + if chunk_builder.size() >= max_chunk_size { + // Manually yield the chunk when size exceeds the limit. We don't want to use chunk builder + // with limited size here because the initial capacity can be too large for most cases. + // Basically only the first several chunks can potentially exceed the `max_chunk_size`. + if let Some(chunk) = chunk_builder.take() { + yield Message::Chunk(chunk); + } + } - yield Message::Chunk(stream_chunk); + let next = add_interval_expr.eval_row(&last_row).await?; + if DefaultOrdered(next.to_datum_ref()) + > DefaultOrdered(curr_timestamp.to_datum_ref()) + { + // We only increase the timestamp to the current timestamp. + break; + } + + let next_row = OwnedRow::new(vec![next]); + let _ = chunk_builder.append_row(Op::Insert, &next_row); + last_row = next_row; + } + + if let Some(chunk) = chunk_builder.take() { + yield Message::Chunk(chunk); + } + + // Update the last timestamp. + state_table.update(row::once(&last_timestamp), &last_row); + last_timestamp = last_row + .into_inner() + .into_vec() + .into_iter() + .exactly_one() + .unwrap(); + } + _ => unreachable!(), + } yield Message::Watermark(Watermark::new( 0, DataType::Timestamptz, - timestamp.clone().unwrap(), + curr_timestamp.unwrap(), )); - - last_timestamp = timestamp; } } } @@ -148,12 +253,32 @@ impl Execute for NowExecutor { } } +#[capture_context(TIME_ZONE)] +fn build_add_expr(time_zone: &str, interval: Interval) -> risingwave_expr::Result { + let timestamptz_input = InputRefExpression::new(DataType::Timestamptz, 0); + let interval = LiteralExpression::new(DataType::Interval, Some(interval.into())); + let time_zone = LiteralExpression::new(DataType::Varchar, Some(time_zone.into())); + + use risingwave_pb::expr::expr_node::PbType as PbExprType; + build_func( + PbExprType::AddWithTimeZone, + DataType::Timestamptz, + vec![ + timestamptz_input.boxed(), + interval.boxed(), + time_zone.boxed(), + ], + ) +} + #[cfg(test)] mod tests { use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; use risingwave_common::test_prelude::StreamChunkTestExt; use risingwave_common::types::{DataType, ScalarImpl}; + use risingwave_common::util::epoch::test_epoch; + use risingwave_expr::expr_context::TIME_ZONE; use risingwave_storage::memory::MemoryStateStore; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; @@ -364,6 +489,63 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_now_generate_series() -> StreamExecutorResult<()> { + TIME_ZONE::scope("UTC".to_string(), test_now_generate_series_inner()).await + } + + async fn test_now_generate_series_inner() -> StreamExecutorResult<()> { + let state_store = create_state_store(); + let (tx, mut now) = create_executor(&state_store).await; + + // Init barrier + tx.send(Barrier::new_initial_for_test(test_epoch(1000))) + .unwrap(); + now.next_unwrap_ready_barrier()?; + + // Initial timestamps + let chunk = now.next_unwrap_ready_chunk()?; + assert_eq!(chunk.cardinality(), 12); // seconds from 23:59:50 to 00:00:01 (inclusive) + + assert_eq!( + now.next_unwrap_ready_watermark()?, + Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Timestamptz("2021-04-01T00:00:01.000Z".parse().unwrap()) + ) + ); + + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(2000), + test_epoch(1000), + )) + .unwrap(); + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(3000), + test_epoch(2000), + )) + .unwrap(); + + now.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; + + let chunk = now.next_unwrap_ready_chunk()?; + assert_eq!(chunk.cardinality(), 2); // seconds from 00:00:02 to 00:00:03 (inclusive) + + let watermark = now.next_unwrap_ready_watermark()?; + assert_eq!( + watermark, + Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Timestamptz("2021-04-01T00:00:03.000Z".parse().unwrap()) + ) + ); + + Ok(()) + } + fn create_state_store() -> MemoryStateStore { MemoryStateStore::new() } diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index 51a735af0f5a7..a199e4c8acb9f 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -40,6 +40,7 @@ #![feature(btree_cursors)] #![feature(assert_matches)] #![feature(try_blocks)] +#![feature(result_flattening)] // required by `capture_context` use std::sync::Arc; From 2dcf92aa915ae7e25834d308615db3e0fb791fb9 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 20 Jun 2024 17:48:24 +0800 Subject: [PATCH 02/13] support `mode` in Now plan node Signed-off-by: Richard Chien --- proto/stream_plan.proto | 13 +++ .../src/optimizer/plan_node/generic/mod.rs | 2 + .../src/optimizer/plan_node/generic/now.rs | 103 ++++++++++++++++++ .../src/optimizer/plan_node/logical_now.rs | 26 +---- .../src/optimizer/plan_node/stream_now.rs | 47 ++++---- .../stream/filter_with_now_to_join_rule.rs | 4 +- src/stream/src/executor/mod.rs | 2 +- src/stream/src/executor/now.rs | 81 ++++++++------ src/stream/src/from_proto/now.rs | 41 ++++++- 9 files changed, 238 insertions(+), 81 deletions(-) create mode 100644 src/frontend/src/optimizer/plan_node/generic/now.rs diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 223a23813f68d..e8438794a7548 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -725,9 +725,22 @@ message RowIdGenNode { uint64 row_id_index = 1; } +message NowModeUpdateCurrent { +} + +message NowModeGenerateSeries { + data.Datum start_timestamp = 1; + data.Datum interval = 2; +} + message NowNode { // Persists emitted 'now'. catalog.Table state_table = 1; + + oneof mode { + NowModeUpdateCurrent update_current = 101; + NowModeGenerateSeries generate_series = 102; + } } message ValuesNode { diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index 00db5730e8038..392f073371843 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -78,6 +78,8 @@ mod cte_ref; pub use cte_ref::*; mod recursive_union; pub use recursive_union::*; +mod now; +pub use now::*; pub trait DistillUnit { fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a>; diff --git a/src/frontend/src/optimizer/plan_node/generic/now.rs b/src/frontend/src/optimizer/plan_node/generic/now.rs new file mode 100644 index 0000000000000..5e0be9c38dc98 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/generic/now.rs @@ -0,0 +1,103 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use educe::Educe; +use enum_as_inner::EnumAsInner; +use pretty_xmlish::{Pretty, Str, XmlNode}; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::types::{DataType, Interval, Timestamptz}; + +use super::{DistillUnit, GenericPlanNode}; +use crate::optimizer::plan_node::utils::childless_record; +use crate::optimizer::property::FunctionalDependencySet; +use crate::OptimizerContextRef; + +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct Now { + #[educe(PartialEq(ignore))] + #[educe(Hash(ignore))] + pub ctx: OptimizerContextRef, + + pub mode: Mode, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumAsInner)] +pub enum Mode { + /// Emit current timestamp on startup, update it on barrier. + UpdateCurrent, + /// Generate a series of timestamps starting from `start_timestamp` with `interval`. + /// Keep generating new timestamps on barrier. + GenerateSeries { + start_timestamp: Timestamptz, + interval: Interval, + }, +} + +impl GenericPlanNode for Now { + fn functional_dependency(&self) -> crate::optimizer::property::FunctionalDependencySet { + FunctionalDependencySet::new(1) // only one column and no dependency + } + + fn schema(&self) -> risingwave_common::catalog::Schema { + Schema::new(vec![Field { + data_type: DataType::Timestamptz, + name: String::from(if self.mode.is_update_current() { + "now" + } else { + "ts" + }), + sub_fields: vec![], + type_name: String::default(), + }]) + } + + fn stream_key(&self) -> Option> { + Some(vec![]) + } + + fn ctx(&self) -> OptimizerContextRef { + self.ctx.clone() + } +} + +impl Now { + pub fn update_current(ctx: OptimizerContextRef) -> Self { + Self::new_inner(ctx, Mode::UpdateCurrent) + } + + pub fn generate_series( + ctx: OptimizerContextRef, + start_timestamp: Timestamptz, + interval: Interval, + ) -> Self { + Self::new_inner( + ctx, + Mode::GenerateSeries { + start_timestamp, + interval, + }, + ) + } + + fn new_inner(ctx: OptimizerContextRef, mode: Mode) -> Self { + Self { ctx, mode } + } +} + +impl DistillUnit for Now { + fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a> { + childless_record(name, vec![("mode", Pretty::debug(&self.mode))]) + } +} diff --git a/src/frontend/src/optimizer/plan_node/logical_now.rs b/src/frontend/src/optimizer/plan_node/logical_now.rs index f9c33eb3d9cc1..a40cfc5cbe7fb 100644 --- a/src/frontend/src/optimizer/plan_node/logical_now.rs +++ b/src/frontend/src/optimizer/plan_node/logical_now.rs @@ -14,10 +14,8 @@ use pretty_xmlish::XmlNode; use risingwave_common::bail; -use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::types::DataType; -use super::generic::GenericPlanRef; +use super::generic::{self, GenericPlanRef}; use super::utils::{childless_record, Distill}; use super::{ ColPrunable, ColumnPruningContext, ExprRewritable, Logical, LogicalFilter, PlanBase, PlanRef, @@ -26,30 +24,18 @@ use super::{ use crate::error::Result; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::column_names_pretty; -use crate::optimizer::property::FunctionalDependencySet; use crate::utils::ColIndexMapping; -use crate::OptimizerContextRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct LogicalNow { pub base: PlanBase, + core: generic::Now, } impl LogicalNow { - pub fn new(ctx: OptimizerContextRef) -> Self { - let schema = Schema::new(vec![Field { - data_type: DataType::Timestamptz, - name: String::from("now"), - sub_fields: vec![], - type_name: String::default(), - }]); - let base = PlanBase::new_logical( - ctx, - schema, - Some(vec![]), - FunctionalDependencySet::default(), - ); - Self { base } + pub fn new(core: generic::Now) -> Self { + let base = PlanBase::new_logical_with_core(&core); + Self { base, core } } } @@ -91,7 +77,7 @@ impl ToStream for LogicalNow { /// `to_stream` is equivalent to `to_stream_with_dist_required(RequiredDist::Any)` fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result { - Ok(StreamNow::new(self.clone(), self.ctx()).into()) + Ok(StreamNow::new(self.core.clone()).into()) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_now.rs b/src/frontend/src/optimizer/plan_node/stream_now.rs index d27321c08d06b..22a0d2c5fb0fb 100644 --- a/src/frontend/src/optimizer/plan_node/stream_now.rs +++ b/src/frontend/src/optimizer/plan_node/stream_now.rs @@ -14,46 +14,39 @@ use fixedbitset::FixedBitSet; use pretty_xmlish::XmlNode; -use risingwave_common::catalog::{Field, Schema}; -use risingwave_common::types::DataType; +use risingwave_common::types::Datum; +use risingwave_common::util::value_encoding::DatumToProtoExt; +use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode; use risingwave_pb::stream_plan::stream_node::NodeBody; -use risingwave_pb::stream_plan::NowNode; +use risingwave_pb::stream_plan::{PbNowModeGenerateSeries, PbNowModeUpdateCurrent, PbNowNode}; +use super::generic::Mode; use super::stream::prelude::*; use super::utils::{childless_record, Distill, TableCatalogBuilder}; -use super::{ExprRewritable, LogicalNow, PlanBase, StreamNode}; +use super::{generic, ExprRewritable, PlanBase, StreamNode}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::column_names_pretty; -use crate::optimizer::property::{Distribution, FunctionalDependencySet}; +use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; -use crate::OptimizerContextRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamNow { pub base: PlanBase, + core: generic::Now, } impl StreamNow { - pub fn new(_logical: LogicalNow, ctx: OptimizerContextRef) -> Self { - let schema = Schema::new(vec![Field { - data_type: DataType::Timestamptz, - name: String::from("now"), - sub_fields: vec![], - type_name: String::default(), - }]); + pub fn new(core: generic::Now) -> Self { let mut watermark_columns = FixedBitSet::with_capacity(1); watermark_columns.set(0, true); - let base = PlanBase::new_stream( - ctx, - schema, - Some(vec![]), - FunctionalDependencySet::default(), + let base = PlanBase::new_stream_with_core( + &core, Distribution::Single, - false, - false, // TODO(rc): derive EOWC property from input + core.mode.is_generate_series(), // append only + core.mode.is_generate_series(), // emit on window close watermark_columns, ); - Self { base } + Self { base, core } } } @@ -83,8 +76,18 @@ impl StreamNode for StreamNow { let table_catalog = internal_table_catalog_builder .build(dist_keys, 0) .with_id(state.gen_table_id_wrapped()); - NodeBody::Now(NowNode { + NodeBody::Now(PbNowNode { state_table: Some(table_catalog.to_internal_table_prost()), + mode: Some(match &self.core.mode { + Mode::UpdateCurrent => PbNowMode::UpdateCurrent(PbNowModeUpdateCurrent {}), + Mode::GenerateSeries { + start_timestamp, + interval, + } => PbNowMode::GenerateSeries(PbNowModeGenerateSeries { + start_timestamp: Some(Datum::Some((*start_timestamp).into()).to_protobuf()), + interval: Some(Datum::Some((*interval).into()).to_protobuf()), + }), + }), }) } } diff --git a/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs b/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs index 498696589c81b..cbdb65b4528a5 100644 --- a/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs +++ b/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs @@ -18,7 +18,7 @@ use risingwave_pb::plan_common::JoinType; use crate::expr::{ try_derive_watermark, ExprRewriter, FunctionCall, InputRef, WatermarkDerivation, }; -use crate::optimizer::plan_node::generic::GenericPlanRef; +use crate::optimizer::plan_node::generic::{self, GenericPlanRef}; use crate::optimizer::plan_node::{LogicalFilter, LogicalJoin, LogicalNow}; use crate::optimizer::rule::{BoxedRule, Rule}; use crate::optimizer::PlanRef; @@ -63,7 +63,7 @@ impl Rule for FilterWithNowToJoinRule { for now_filter in now_filters { new_plan = LogicalJoin::new( new_plan, - LogicalNow::new(plan.ctx()).into(), + LogicalNow::new(generic::Now::update_current(plan.ctx())).into(), JoinType::LeftSemi, Condition { conjunctions: vec![now_filter.into()], diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 40ddf5b91c48b..9dba4e9ce457c 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -129,7 +129,7 @@ pub use lookup_union::LookupUnionExecutor; pub use merge::MergeExecutor; pub use mview::*; pub use no_op::NoOpExecutor; -pub use now::NowExecutor; +pub use now::*; pub use over_window::*; pub use project::ProjectExecutor; pub use project_set::*; diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index be21cbad32e33..53d214dd6550a 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -18,11 +18,11 @@ use std::ops::Bound::Unbounded; use itertools::Itertools; use risingwave_common::array::Op; use risingwave_common::row; -use risingwave_common::types::test_utils::IntervalTestExt; use risingwave_common::types::{DefaultOrdered, Interval, Timestamptz, ToDatumRef}; use risingwave_expr::capture_context; use risingwave_expr::expr::{ - build_func, BoxedExpression, ExpressionBoxExt, InputRefExpression, LiteralExpression, + build_func_non_strict, EvalErrorReport, ExpressionBoxExt, InputRefExpression, + LiteralExpression, NonStrictExpression, }; use risingwave_expr::expr_context::TIME_ZONE; use tokio::sync::mpsc::UnboundedReceiver; @@ -33,7 +33,7 @@ use crate::executor::prelude::*; pub struct NowExecutor { data_types: Vec, - mode: Mode, + mode: NowMode, max_chunk_size: usize, /// Receiver of barrier channel. @@ -42,14 +42,14 @@ pub struct NowExecutor { state_table: StateTable, } -enum Mode { +pub enum NowMode { /// Emit current timestamp on startup, update it on barrier. UpdateCurrent, /// Generate a series of timestamps starting from `start_timestamp` with `interval`. /// Keep generating new timestamps on barrier. GenerateSeries { start_timestamp: Timestamptz, - add_interval_expr: BoxedExpression, + add_interval_expr: NonStrictExpression, }, } @@ -61,16 +61,13 @@ enum ModeVars { impl NowExecutor { pub fn new( data_types: Vec, + mode: NowMode, barrier_receiver: UnboundedReceiver, state_table: StateTable, ) -> Self { Self { data_types, - // TODO(): only for dev - mode: Mode::GenerateSeries { - start_timestamp: Timestamptz::from_secs(1617235190).unwrap(), /* 2021-03-31 23:59:50 UTC */ - add_interval_expr: build_add_expr_captured(Interval::from_millis(1000)).unwrap(), - }, + mode, max_chunk_size: 1024, barrier_receiver, state_table, @@ -93,8 +90,8 @@ impl NowExecutor { let mut last_timestamp: Datum = None; let mut mode_vars = match &mode { - Mode::UpdateCurrent => ModeVars::UpdateCurrent, - Mode::GenerateSeries { .. } => { + NowMode::UpdateCurrent => ModeVars::UpdateCurrent, + NowMode::GenerateSeries { .. } => { // in most cases there won't be more than one row except for the first time let chunk_builder = StreamChunkBuilder::unlimited(data_types.clone(), Some(1)); ModeVars::GenerateSeries { chunk_builder } @@ -158,7 +155,7 @@ impl NowExecutor { } match (&mode, &mut mode_vars) { - (Mode::UpdateCurrent, ModeVars::UpdateCurrent) => { + (NowMode::UpdateCurrent, ModeVars::UpdateCurrent) => { let chunk = if last_timestamp.is_some() { let last_row = row::once(&last_timestamp); let row = row::once(&curr_timestamp); @@ -176,10 +173,10 @@ impl NowExecutor { }; yield Message::Chunk(chunk); - last_timestamp = curr_timestamp.clone(); + last_timestamp.clone_from(&curr_timestamp); } ( - Mode::GenerateSeries { + NowMode::GenerateSeries { start_timestamp, add_interval_expr, }, @@ -187,7 +184,7 @@ impl NowExecutor { ) => { if last_timestamp.is_none() { // We haven't emit any timestamp yet. Let's emit the first one and populate the state table. - let first = Some(start_timestamp.clone().into()); + let first = Some((*start_timestamp).into()); let first_row = row::once(&first); let _ = chunk_builder.append_row(Op::Insert, first_row); state_table.insert(first_row); @@ -209,7 +206,7 @@ impl NowExecutor { } } - let next = add_interval_expr.eval_row(&last_row).await?; + let next = add_interval_expr.eval_row_infallible(&last_row).await; if DefaultOrdered(next.to_datum_ref()) > DefaultOrdered(curr_timestamp.to_datum_ref()) { @@ -254,13 +251,17 @@ impl Execute for NowExecutor { } #[capture_context(TIME_ZONE)] -fn build_add_expr(time_zone: &str, interval: Interval) -> risingwave_expr::Result { +pub fn build_add_interval_expr( + time_zone: &str, + interval: Interval, + eval_error_report: impl EvalErrorReport + 'static, +) -> risingwave_expr::Result { let timestamptz_input = InputRefExpression::new(DataType::Timestamptz, 0); let interval = LiteralExpression::new(DataType::Interval, Some(interval.into())); let time_zone = LiteralExpression::new(DataType::Varchar, Some(time_zone.into())); use risingwave_pb::expr::expr_node::PbType as PbExprType; - build_func( + build_func_non_strict( PbExprType::AddWithTimeZone, DataType::Timestamptz, vec![ @@ -268,31 +269,27 @@ fn build_add_expr(time_zone: &str, interval: Interval) -> risingwave_expr::Resul interval.boxed(), time_zone.boxed(), ], + eval_error_report, ) } #[cfg(test)] mod tests { - use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; use risingwave_common::test_prelude::StreamChunkTestExt; - use risingwave_common::types::{DataType, ScalarImpl}; + use risingwave_common::types::test_utils::IntervalTestExt; use risingwave_common::util::epoch::test_epoch; - use risingwave_expr::expr_context::TIME_ZONE; use risingwave_storage::memory::MemoryStateStore; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; - use super::NowExecutor; - use crate::common::table::state_table::StateTable; + use super::*; use crate::executor::test_utils::StreamExecutorTestExt; - use crate::executor::{ - Barrier, BoxedMessageStream, Execute, Mutation, StreamExecutorResult, Watermark, - }; + use crate::task::ActorEvalErrorReport; #[tokio::test] async fn test_now() -> StreamExecutorResult<()> { let state_store = create_state_store(); - let (tx, mut now_executor) = create_executor(&state_store).await; + let (tx, mut now_executor) = create_executor(NowMode::UpdateCurrent, &state_store).await; // Init barrier tx.send(Barrier::with_prev_epoch_for_test(1 << 16, 1)) @@ -359,7 +356,7 @@ mod tests { // Recovery drop((tx, now_executor)); - let (tx, mut now_executor) = create_executor(&state_store).await; + let (tx, mut now_executor) = create_executor(NowMode::UpdateCurrent, &state_store).await; tx.send(Barrier::with_prev_epoch_for_test(3 << 16, 1 << 16)) .unwrap(); @@ -391,7 +388,7 @@ mod tests { // Recovery with paused drop((tx, now_executor)); - let (tx, mut now_executor) = create_executor(&state_store).await; + let (tx, mut now_executor) = create_executor(NowMode::UpdateCurrent, &state_store).await; tx.send(Barrier::new_test_barrier(4 << 16).with_mutation(Mutation::Pause)) .unwrap(); @@ -439,7 +436,7 @@ mod tests { #[tokio::test] async fn test_now_start_with_paused() -> StreamExecutorResult<()> { let state_store = create_state_store(); - let (tx, mut now_executor) = create_executor(&state_store).await; + let (tx, mut now_executor) = create_executor(NowMode::UpdateCurrent, &state_store).await; // Init barrier tx.send(Barrier::with_prev_epoch_for_test(1 << 16, 1).with_mutation(Mutation::Pause)) @@ -495,8 +492,19 @@ mod tests { } async fn test_now_generate_series_inner() -> StreamExecutorResult<()> { + let eval_error_report = ActorEvalErrorReport { + actor_context: ActorContext::for_test(123), + identity: "NowExecutor".into(), + }; + let state_store = create_state_store(); - let (tx, mut now) = create_executor(&state_store).await; + let (tx, mut now) = create_executor( + NowMode::GenerateSeries { + start_timestamp: Timestamptz::from_secs(1617235190).unwrap(), /* 2021-03-31 23:59:50 UTC */ + add_interval_expr: build_add_interval_expr_captured(Interval::from_millis(1000), eval_error_report).unwrap(), // 1s interval + }, + &state_store + ).await; // Init barrier tx.send(Barrier::new_initial_for_test(test_epoch(1000))) @@ -551,6 +559,7 @@ mod tests { } async fn create_executor( + mode: NowMode, state_store: &MemoryStateStore, ) -> (UnboundedSender, BoxedMessageStream) { let table_id = TableId::new(1); @@ -566,8 +575,12 @@ mod tests { let (sender, barrier_receiver) = unbounded_channel(); - let now_executor = - NowExecutor::new(vec![DataType::Timestamptz], barrier_receiver, state_table); + let now_executor = NowExecutor::new( + vec![DataType::Timestamptz], + mode, + barrier_receiver, + state_table, + ); (sender, now_executor.boxed().execute()) } } diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs index a917cd136f3b4..cd693214a29ce 100644 --- a/src/stream/src/from_proto/now.rs +++ b/src/stream/src/from_proto/now.rs @@ -12,14 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_pb::stream_plan::NowNode; +use anyhow::Context; +use risingwave_common::types::{DataType, Datum}; +use risingwave_common::util::value_encoding::DatumFromProtoExt; +use risingwave_pb::stream_plan::now_node::PbMode as PbNowMode; +use risingwave_pb::stream_plan::{NowNode, PbNowModeGenerateSeries}; use risingwave_storage::StateStore; use tokio::sync::mpsc::unbounded_channel; use super::ExecutorBuilder; use crate::common::table::state_table::StateTable; use crate::error::StreamResult; -use crate::executor::{Executor, NowExecutor}; +use crate::executor::{build_add_interval_expr_captured, Executor, NowExecutor, NowMode}; use crate::task::ExecutorParams; pub struct NowExecutorBuilder; @@ -37,11 +41,44 @@ impl ExecutorBuilder for NowExecutorBuilder { .create_actor_context .register_sender(params.actor_context.id, sender); + let mode = if let Ok(pb_mode) = node.get_mode() { + match pb_mode { + PbNowMode::UpdateCurrent(_) => NowMode::UpdateCurrent, + PbNowMode::GenerateSeries(PbNowModeGenerateSeries { + start_timestamp, + interval, + }) => { + let start_timestamp = Datum::from_protobuf( + start_timestamp.as_ref().unwrap(), + &DataType::Timestamptz, + ) + .context("`start_timestamp` field is not decodable")? + .context("`start_timestamp` field should not be NULL")? + .into_timestamptz(); + let interval = + Datum::from_protobuf(interval.as_ref().unwrap(), &DataType::Interval) + .context("`interval` field is not decodable")? + .context("`interval` field should not be NULL")? + .into_interval(); + NowMode::GenerateSeries { + start_timestamp, + add_interval_expr: build_add_interval_expr_captured( + interval, + params.eval_error_report, + )?, + } + } + } + } else { + NowMode::UpdateCurrent + }; + let state_table = StateTable::from_table_catalog(node.get_state_table()?, store, None).await; let exec = NowExecutor::new( params.info.schema.data_types(), + mode, barrier_receiver, state_table, ); From 60a7371a9c5312ef6bc7ab416b5d42278c81d05e Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 20 Jun 2024 19:42:01 +0800 Subject: [PATCH 03/13] fixup Signed-off-by: Richard Chien --- src/stream/src/from_proto/now.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs index cd693214a29ce..f3babb6b372e5 100644 --- a/src/stream/src/from_proto/now.rs +++ b/src/stream/src/from_proto/now.rs @@ -70,6 +70,7 @@ impl ExecutorBuilder for NowExecutorBuilder { } } } else { + // default to `UpdateCurrent` for backward-compatibility NowMode::UpdateCurrent }; From ae7d3c12d66b962bb373788eccdb727e6beb6cbf Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 21 Jun 2024 15:18:52 +0800 Subject: [PATCH 04/13] frontend support Signed-off-by: Richard Chien --- src/frontend/src/binder/expr/function.rs | 8 +- src/frontend/src/expr/mod.rs | 2 +- src/frontend/src/expr/type_inference/func.rs | 54 ++++++++++-- .../src/optimizer/logical_optimization.rs | 11 +++ .../src/optimizer/plan_node/convert.rs | 5 -- .../plan_node/logical_project_set.rs | 13 ++- src/frontend/src/optimizer/rule/mod.rs | 2 + .../stream/generate_series_with_now_rule.rs | 86 +++++++++++++++++++ src/frontend/src/optimizer/rule/stream/mod.rs | 1 + 9 files changed, 166 insertions(+), 16 deletions(-) create mode 100644 src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 09c05a29695dc..b9a3b27825abf 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -1572,11 +1572,15 @@ impl Binder { if self.is_for_stream() && !matches!( self.context.clause, - Some(Clause::Where) | Some(Clause::Having) | Some(Clause::JoinOn) + Some(Clause::Where) + | Some(Clause::Having) + | Some(Clause::JoinOn) + | Some(Clause::From) ) { return Err(ErrorCode::InvalidInputSyntax(format!( - "For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: {:?}. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information", + "For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: {:?}. \ + Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information", self.context.clause )) .into()); diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index d14d99766bcc4..a5d3d5bae0441 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -399,7 +399,7 @@ macro_rules! impl_has_variant { }; } -impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction} +impl_has_variant! {InputRef, Literal, FunctionCall, FunctionCallWithLambda, AggCall, Subquery, TableFunction, WindowFunction, Now} #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct InequalityInputPair { diff --git a/src/frontend/src/expr/type_inference/func.rs b/src/frontend/src/expr/type_inference/func.rs index 9f28dfeb74c8c..0fecee8ab45c0 100644 --- a/src/frontend/src/expr/type_inference/func.rs +++ b/src/frontend/src/expr/type_inference/func.rs @@ -20,6 +20,7 @@ use risingwave_common::types::{DataType, StructType}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_expr::aggregate::AggKind; pub use risingwave_expr::sig::*; +use risingwave_pb::expr::table_function::PbType as PbTableFuncType; use super::{align_types, cast_ok_base, CastContext}; use crate::error::{ErrorCode, Result}; @@ -36,13 +37,24 @@ pub fn infer_type_with_sigmap( sig_map: &FunctionRegistry, ) -> Result { // special cases - if let FuncName::Scalar(func_type) = func_name - && let Some(res) = infer_type_for_special(func_type, inputs).transpose() - { - return res; - } - if let FuncName::Aggregate(AggKind::Grouping) = func_name { - return Ok(DataType::Int32); + match &func_name { + FuncName::Scalar(func_type) => { + if let Some(res) = infer_type_for_special(*func_type, inputs).transpose() { + return res; + } + } + FuncName::Table(func_type) => { + if let Some(res) = infer_type_for_special_table_function(*func_type, inputs).transpose() + { + return res; + } + } + FuncName::Aggregate(agg_kind) => { + if *agg_kind == AggKind::Grouping { + return Ok(DataType::Int32); + } + } + _ => {} } let actuals = inputs @@ -634,6 +646,34 @@ fn infer_type_for_special( } } +fn infer_type_for_special_table_function( + func_type: PbTableFuncType, + inputs: &mut [ExprImpl], +) -> Result> { + match func_type { + PbTableFuncType::GenerateSeries => { + if inputs.len() < 3 { + // let signature map handle this + return Ok(None); + } + match ( + inputs[0].return_type(), + inputs[1].return_type(), + inputs[2].return_type(), + ) { + (DataType::Timestamptz, DataType::Timestamptz, DataType::Interval) => { + // This is to allow `generate_series('2024-06-20 00:00:00'::timestamptz, now(), interval '1 day')`, + // which in streaming mode will be further converted to `StreamNow`. + Ok(Some(DataType::Timestamptz)) + } + // let signature map handle the rest + _ => Ok(None), + } + } + _ => Ok(None), + } +} + /// From all available functions in `sig_map`, find and return the best matching `FuncSign` for the /// provided `func_name` and `inputs`. This not only support exact function signature match, but can /// also match `substr(varchar, smallint)` or even `substr(varchar, unknown)` to `substr(varchar, diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index d452626bb9418..931a645b3d680 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -118,6 +118,14 @@ static DAG_TO_TREE: LazyLock = LazyLock::new(|| { ) }); +static STREAM_GENERATE_SERIES_WITH_NOW: LazyLock = LazyLock::new(|| { + OptimizationStage::new( + "Convert GENERATE_SERIES Ends With NOW", + vec![GenerateSeriesWithNowRule::create()], + ApplyOrder::TopDown, + ) +}); + static TABLE_FUNCTION_TO_PROJECT_SET: LazyLock = LazyLock::new(|| { OptimizationStage::new( "Table Function To Project Set", @@ -572,6 +580,9 @@ impl LogicalOptimizer { } plan = plan.optimize_by_rules(&SET_OPERATION_MERGE); plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN); + // Convert `generate_series` ends with `now()` to a `Now` source. Only for streaming mode. + // Should be applied before converting table function to project set. + plan = plan.optimize_by_rules(&STREAM_GENERATE_SERIES_WITH_NOW); // In order to unnest a table function, we need to convert it into a `project_set` first. plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_PROJECT_SET); diff --git a/src/frontend/src/optimizer/plan_node/convert.rs b/src/frontend/src/optimizer/plan_node/convert.rs index db961b3b1e20a..6f98073304d8b 100644 --- a/src/frontend/src/optimizer/plan_node/convert.rs +++ b/src/frontend/src/optimizer/plan_node/convert.rs @@ -84,11 +84,6 @@ pub fn stream_enforce_eowc_requirement( } Ok(StreamEowcSort::new(plan, watermark_col_idx).into()) } - } else if !emit_on_window_close && plan.emit_on_window_close() { - Err(ErrorCode::InternalError( - "Some bad thing happened, the generated plan is not correct.".to_string(), - ) - .into()) } else { Ok(plan) } diff --git a/src/frontend/src/optimizer/plan_node/logical_project_set.rs b/src/frontend/src/optimizer/plan_node/logical_project_set.rs index 6f31b35a21ee0..f4d3f4f81cee1 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project_set.rs @@ -22,7 +22,7 @@ use super::{ LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamProjectSet, ToBatch, ToStream, }; -use crate::error::Result; +use crate::error::{ErrorCode, Result}; use crate::expr::{ collect_input_refs, Expr, ExprImpl, ExprRewriter, ExprVisitor, FunctionCall, InputRef, TableFunction, @@ -400,6 +400,17 @@ impl ToStream for LogicalProjectSet { // TODO: implement to_stream_with_dist_required like LogicalProject fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { + if self.select_list().iter().any(|item| item.has_now()) { + // User may use `now()` in table function in a wrong way, because we allow `now()` in `FROM` clause. + return Err(ErrorCode::NotSupported( + "General `now()` function".to_string(), + "If you are trying to use `generate_series` with `now()`, please kindly check \ + whether you are using it in a correct way." + .to_string(), + ) + .into()); + } + let new_input = self.input().to_stream(ctx)?; let mut new_logical = self.core.clone(); new_logical.input = new_input; diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 9364a6f2b7f5b..fd06402c7497f 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -92,6 +92,7 @@ pub use top_n_on_index_rule::*; mod stream; pub use stream::bushy_tree_join_ordering_rule::*; pub use stream::filter_with_now_to_join_rule::*; +pub use stream::generate_series_with_now_rule::*; pub use stream::split_now_and_rule::*; pub use stream::split_now_or_rule::*; pub use stream::stream_project_merge_rule::*; @@ -203,6 +204,7 @@ macro_rules! for_all_rules { , { SplitNowAndRule } , { SplitNowOrRule } , { FilterWithNowToJoinRule } + , { GenerateSeriesWithNowRule } , { TopNOnIndexRule } , { TrivialProjectToValuesRule } , { UnionInputValuesMergeRule } diff --git a/src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs b/src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs new file mode 100644 index 0000000000000..665967f6660b0 --- /dev/null +++ b/src/frontend/src/optimizer/rule/stream/generate_series_with_now_rule.rs @@ -0,0 +1,86 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::DataType; +use risingwave_pb::expr::table_function::PbType as PbTableFuncType; + +use crate::expr::{Expr, ExprRewriter}; +use crate::optimizer::plan_node::{generic, LogicalNow}; +use crate::optimizer::rule::{BoxedRule, Rule}; +use crate::PlanRef; + +pub struct GenerateSeriesWithNowRule {} +impl Rule for GenerateSeriesWithNowRule { + fn apply(&self, plan: PlanRef) -> Option { + let ctx = plan.ctx(); + let table_func = plan.as_logical_table_function()?.table_function(); + + if !table_func.args.iter().any(|arg| arg.has_now()) { + return None; + } + + if !(table_func.function_type == PbTableFuncType::GenerateSeries + && table_func.args.len() == 3 + && table_func.args[0].return_type() == DataType::Timestamptz + && table_func.args[1].is_now() + && table_func.args[2].return_type() == DataType::Interval) + { + // only convert `generate_series(const timestamptz, now(), const interval)` + ctx.warn_to_user( + "`now()` is currently only supported in `generate_series(timestamptz, timestamptz, interval)` function as `stop`. \ + You may not using it correctly. Please kindly check the document." + ); + return None; + } + + let start_timestamp = ctx + .session_timezone() + .rewrite_expr(table_func.args[0].clone()) + .try_fold_const() + .transpose() + .ok() + .flatten() + .flatten(); + let interval = ctx + .session_timezone() + .rewrite_expr(table_func.args[2].clone()) + .try_fold_const() + .transpose() + .ok() + .flatten() + .flatten(); + + if start_timestamp.is_none() || interval.is_none() { + ctx.warn_to_user( + "When using `generate_series` with `now()`, the `start` and `step` must be non-NULL constants", + ); + return None; + } + + Some( + LogicalNow::new(generic::Now::generate_series( + ctx, + start_timestamp.unwrap().into_timestamptz(), + interval.unwrap().into_interval(), + )) + .into(), + ) + } +} + +impl GenerateSeriesWithNowRule { + pub fn create() -> BoxedRule { + Box::new(Self {}) + } +} diff --git a/src/frontend/src/optimizer/rule/stream/mod.rs b/src/frontend/src/optimizer/rule/stream/mod.rs index cc86298e766e8..539d9048cff60 100644 --- a/src/frontend/src/optimizer/rule/stream/mod.rs +++ b/src/frontend/src/optimizer/rule/stream/mod.rs @@ -14,6 +14,7 @@ pub(crate) mod bushy_tree_join_ordering_rule; pub(crate) mod filter_with_now_to_join_rule; +pub(crate) mod generate_series_with_now_rule; pub(crate) mod split_now_and_rule; pub(crate) mod split_now_or_rule; pub(crate) mod stream_project_merge_rule; From 5f94b1acd1182489b646b668a45514438e073c9a Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 21 Jun 2024 16:44:53 +0800 Subject: [PATCH 05/13] fix timezone context Signed-off-by: Richard Chien --- src/stream/src/executor/now.rs | 54 +++++++++++++++++++++----------- src/stream/src/from_proto/now.rs | 8 ++--- 2 files changed, 39 insertions(+), 23 deletions(-) diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index 53d214dd6550a..568287c62e589 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -29,12 +29,13 @@ use tokio::sync::mpsc::UnboundedReceiver; use tokio_stream::wrappers::UnboundedReceiverStream; use crate::executor::prelude::*; +use crate::task::ActorEvalErrorReport; pub struct NowExecutor { data_types: Vec, mode: NowMode, - max_chunk_size: usize, + eval_error_report: ActorEvalErrorReport, /// Receiver of barrier channel. barrier_receiver: UnboundedReceiver, @@ -49,26 +50,30 @@ pub enum NowMode { /// Keep generating new timestamps on barrier. GenerateSeries { start_timestamp: Timestamptz, - add_interval_expr: NonStrictExpression, + interval: Interval, }, } enum ModeVars { UpdateCurrent, - GenerateSeries { chunk_builder: StreamChunkBuilder }, + GenerateSeries { + chunk_builder: StreamChunkBuilder, + add_interval_expr: NonStrictExpression, + }, } impl NowExecutor { pub fn new( data_types: Vec, mode: NowMode, + eval_error_report: ActorEvalErrorReport, barrier_receiver: UnboundedReceiver, state_table: StateTable, ) -> Self { Self { data_types, mode, - max_chunk_size: 1024, + eval_error_report, barrier_receiver, state_table, } @@ -79,22 +84,32 @@ impl NowExecutor { let Self { data_types, mode, - max_chunk_size, + eval_error_report, barrier_receiver, mut state_table, } = self; + let max_chunk_size = crate::config::chunk_size(); + // Whether the executor is paused. let mut paused = false; // The last timestamp **sent** to the downstream. let mut last_timestamp: Datum = None; + // Whether the first barrier is handled and `last_timestamp` is initialized. + let mut initialized = false; + let mut mode_vars = match &mode { NowMode::UpdateCurrent => ModeVars::UpdateCurrent, - NowMode::GenerateSeries { .. } => { + NowMode::GenerateSeries { interval, .. } => { // in most cases there won't be more than one row except for the first time let chunk_builder = StreamChunkBuilder::unlimited(data_types.clone(), Some(1)); - ModeVars::GenerateSeries { chunk_builder } + let add_interval_expr = + build_add_interval_expr_captured(*interval, eval_error_report)?; + ModeVars::GenerateSeries { + chunk_builder, + add_interval_expr, + } } }; @@ -112,7 +127,7 @@ impl NowExecutor { ); } for barrier in barriers { - if barrier.kind.is_initial() { + if !initialized { // Handle the initial barrier. state_table.init_epoch(barrier.epoch); let state_row = { @@ -130,6 +145,7 @@ impl NowExecutor { }; last_timestamp = state_row.and_then(|row| row[0].clone()); paused = barrier.is_pause_on_startup(); + initialized = true; } else { state_table.commit(barrier.epoch).await?; } @@ -173,14 +189,16 @@ impl NowExecutor { }; yield Message::Chunk(chunk); - last_timestamp.clone_from(&curr_timestamp); + last_timestamp = curr_timestamp.clone(); } ( NowMode::GenerateSeries { - start_timestamp, - add_interval_expr, + start_timestamp, .. + }, + ModeVars::GenerateSeries { + chunk_builder, + ref add_interval_expr, }, - ModeVars::GenerateSeries { chunk_builder }, ) => { if last_timestamp.is_none() { // We haven't emit any timestamp yet. Let's emit the first one and populate the state table. @@ -492,16 +510,11 @@ mod tests { } async fn test_now_generate_series_inner() -> StreamExecutorResult<()> { - let eval_error_report = ActorEvalErrorReport { - actor_context: ActorContext::for_test(123), - identity: "NowExecutor".into(), - }; - let state_store = create_state_store(); let (tx, mut now) = create_executor( NowMode::GenerateSeries { start_timestamp: Timestamptz::from_secs(1617235190).unwrap(), /* 2021-03-31 23:59:50 UTC */ - add_interval_expr: build_add_interval_expr_captured(Interval::from_millis(1000), eval_error_report).unwrap(), // 1s interval + interval: Interval::from_millis(1000), // 1s interval }, &state_store ).await; @@ -575,9 +588,14 @@ mod tests { let (sender, barrier_receiver) = unbounded_channel(); + let eval_error_report = ActorEvalErrorReport { + actor_context: ActorContext::for_test(123), + identity: "NowExecutor".into(), + }; let now_executor = NowExecutor::new( vec![DataType::Timestamptz], mode, + eval_error_report, barrier_receiver, state_table, ); diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs index f3babb6b372e5..9eac7caa13557 100644 --- a/src/stream/src/from_proto/now.rs +++ b/src/stream/src/from_proto/now.rs @@ -23,7 +23,7 @@ use tokio::sync::mpsc::unbounded_channel; use super::ExecutorBuilder; use crate::common::table::state_table::StateTable; use crate::error::StreamResult; -use crate::executor::{build_add_interval_expr_captured, Executor, NowExecutor, NowMode}; +use crate::executor::{Executor, NowExecutor, NowMode}; use crate::task::ExecutorParams; pub struct NowExecutorBuilder; @@ -62,10 +62,7 @@ impl ExecutorBuilder for NowExecutorBuilder { .into_interval(); NowMode::GenerateSeries { start_timestamp, - add_interval_expr: build_add_interval_expr_captured( - interval, - params.eval_error_report, - )?, + interval, } } } @@ -80,6 +77,7 @@ impl ExecutorBuilder for NowExecutorBuilder { let exec = NowExecutor::new( params.info.schema.data_types(), mode, + params.eval_error_report, barrier_receiver, state_table, ); From e355196e29436bf2c1d823a23fc7515b66fe62ec Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 21 Jun 2024 16:45:05 +0800 Subject: [PATCH 06/13] fix max one row check Signed-off-by: Richard Chien --- src/frontend/src/optimizer/plan_node/generic/now.rs | 5 ++++- src/frontend/src/optimizer/plan_node/logical_now.rs | 9 ++++++++- .../src/optimizer/plan_visitor/cardinality_visitor.rs | 8 ++++++-- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/generic/now.rs b/src/frontend/src/optimizer/plan_node/generic/now.rs index 5e0be9c38dc98..911217d064214 100644 --- a/src/frontend/src/optimizer/plan_node/generic/now.rs +++ b/src/frontend/src/optimizer/plan_node/generic/now.rs @@ -64,7 +64,10 @@ impl GenericPlanNode for Now { } fn stream_key(&self) -> Option> { - Some(vec![]) + match self.mode { + Mode::UpdateCurrent => Some(vec![]), + Mode::GenerateSeries { .. } => Some(vec![0]), + } } fn ctx(&self) -> OptimizerContextRef { diff --git a/src/frontend/src/optimizer/plan_node/logical_now.rs b/src/frontend/src/optimizer/plan_node/logical_now.rs index a40cfc5cbe7fb..ea34037c8977a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_now.rs +++ b/src/frontend/src/optimizer/plan_node/logical_now.rs @@ -15,7 +15,7 @@ use pretty_xmlish::XmlNode; use risingwave_common::bail; -use super::generic::{self, GenericPlanRef}; +use super::generic::{self, GenericPlanRef, Mode}; use super::utils::{childless_record, Distill}; use super::{ ColPrunable, ColumnPruningContext, ExprRewritable, Logical, LogicalFilter, PlanBase, PlanRef, @@ -37,6 +37,13 @@ impl LogicalNow { let base = PlanBase::new_logical_with_core(&core); Self { base, core } } + + pub fn max_one_row(&self) -> bool { + match self.core.mode { + Mode::UpdateCurrent => true, + Mode::GenerateSeries { .. } => false, + } + } } impl Distill for LogicalNow { diff --git a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs index c81bf539a5713..07459b59b1d5f 100644 --- a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs +++ b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs @@ -174,8 +174,12 @@ impl PlanVisitor for CardinalityVisitor { } } - fn visit_logical_now(&mut self, _plan: &plan_node::LogicalNow) -> Cardinality { - 1.into() + fn visit_logical_now(&mut self, plan: &plan_node::LogicalNow) -> Cardinality { + if plan.max_one_row() { + 1.into() + } else { + Cardinality::unknown() + } } fn visit_logical_expand(&mut self, plan: &plan_node::LogicalExpand) -> Cardinality { From 26e86347314142613a02f8508a6f515f93214939 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 21 Jun 2024 17:34:26 +0800 Subject: [PATCH 07/13] fix unit tests Signed-off-by: Richard Chien --- src/stream/src/executor/mod.rs | 10 --- src/stream/src/executor/now.rs | 153 +++++++++++++++++++++++---------- 2 files changed, 107 insertions(+), 56 deletions(-) diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 9dba4e9ce457c..a6a2152283668 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -329,16 +329,6 @@ impl Barrier { } } - pub fn new_initial_for_test(epoch: u64) -> Self { - Self { - epoch: EpochPair::new_test_epoch(epoch), - kind: BarrierKind::Initial, - tracing_context: TracingContext::none(), - mutation: Default::default(), - passed_actors: Default::default(), - } - } - #[must_use] pub fn with_mutation(self, mutation: Mutation) -> Self { Self { diff --git a/src/stream/src/executor/now.rs b/src/stream/src/executor/now.rs index 568287c62e589..049eee7f8c724 100644 --- a/src/stream/src/executor/now.rs +++ b/src/stream/src/executor/now.rs @@ -189,7 +189,7 @@ impl NowExecutor { }; yield Message::Chunk(chunk); - last_timestamp = curr_timestamp.clone(); + last_timestamp.clone_from(&curr_timestamp) } ( NowMode::GenerateSeries { @@ -302,22 +302,20 @@ mod tests { use super::*; use crate::executor::test_utils::StreamExecutorTestExt; - use crate::task::ActorEvalErrorReport; #[tokio::test] async fn test_now() -> StreamExecutorResult<()> { let state_store = create_state_store(); - let (tx, mut now_executor) = create_executor(NowMode::UpdateCurrent, &state_store).await; + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; // Init barrier - tx.send(Barrier::with_prev_epoch_for_test(1 << 16, 1)) - .unwrap(); + tx.send(Barrier::new_test_barrier(test_epoch(1))).unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), @@ -328,7 +326,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -339,14 +337,17 @@ mod tests { ) ); - tx.send(Barrier::with_prev_epoch_for_test(2 << 16, 1 << 16)) - .unwrap(); + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(2), + test_epoch(1), + )) + .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), @@ -358,7 +359,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -370,21 +371,25 @@ mod tests { ); // No more messages until the next barrier - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); // Recovery - drop((tx, now_executor)); - let (tx, mut now_executor) = create_executor(NowMode::UpdateCurrent, &state_store).await; - tx.send(Barrier::with_prev_epoch_for_test(3 << 16, 1 << 16)) - .unwrap(); + drop((tx, now)); + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(3), + test_epoch(2), + )) + .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), + // the last chunk was not checkpointed so the deleted old value should be `001` StreamChunk::from_pretty( " TZ - 2021-04-01T00:00:00.001Z @@ -393,7 +398,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -405,28 +410,32 @@ mod tests { ); // Recovery with paused - drop((tx, now_executor)); - let (tx, mut now_executor) = create_executor(NowMode::UpdateCurrent, &state_store).await; - tx.send(Barrier::new_test_barrier(4 << 16).with_mutation(Mutation::Pause)) - .unwrap(); + drop((tx, now)); + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; + tx.send( + Barrier::with_prev_epoch_for_test(test_epoch(4), test_epoch(3)) + .with_mutation(Mutation::Pause), + ) + .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // There should be no messages until `Resume` - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); // Resume barrier tx.send( - Barrier::with_prev_epoch_for_test(5 << 16, 4 << 16).with_mutation(Mutation::Resume), + Barrier::with_prev_epoch_for_test(test_epoch(5), test_epoch(4)) + .with_mutation(Mutation::Resume), ) .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), StreamChunk::from_pretty( @@ -437,7 +446,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -454,29 +463,30 @@ mod tests { #[tokio::test] async fn test_now_start_with_paused() -> StreamExecutorResult<()> { let state_store = create_state_store(); - let (tx, mut now_executor) = create_executor(NowMode::UpdateCurrent, &state_store).await; + let (tx, mut now) = create_executor(NowMode::UpdateCurrent, &state_store).await; // Init barrier - tx.send(Barrier::with_prev_epoch_for_test(1 << 16, 1).with_mutation(Mutation::Pause)) + tx.send(Barrier::new_test_barrier(test_epoch(1)).with_mutation(Mutation::Pause)) .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // There should be no messages until `Resume` - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); // Resume barrier tx.send( - Barrier::with_prev_epoch_for_test(2 << 16, 1 << 16).with_mutation(Mutation::Resume), + Barrier::with_prev_epoch_for_test(test_epoch(2), test_epoch(1)) + .with_mutation(Mutation::Resume), ) .unwrap(); // Consume the barrier - now_executor.next_unwrap_ready_barrier()?; + now.next_unwrap_ready_barrier()?; // Consume the data chunk - let chunk_msg = now_executor.next_unwrap_ready_chunk()?; + let chunk_msg = now.next_unwrap_ready_chunk()?; assert_eq!( chunk_msg.compact(), @@ -487,7 +497,7 @@ mod tests { ); // Consume the watermark - let watermark = now_executor.next_unwrap_ready_watermark()?; + let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( watermark, @@ -499,7 +509,7 @@ mod tests { ); // No more messages until the next barrier - now_executor.next_unwrap_pending(); + now.next_unwrap_pending(); Ok(()) } @@ -510,17 +520,21 @@ mod tests { } async fn test_now_generate_series_inner() -> StreamExecutorResult<()> { + let start_timestamp = Timestamptz::from_secs(1617235190).unwrap(); // 2021-03-31 23:59:50 UTC + let interval = Interval::from_millis(1000); // 1s interval + let state_store = create_state_store(); let (tx, mut now) = create_executor( NowMode::GenerateSeries { - start_timestamp: Timestamptz::from_secs(1617235190).unwrap(), /* 2021-03-31 23:59:50 UTC */ - interval: Interval::from_millis(1000), // 1s interval + start_timestamp, + interval, }, - &state_store - ).await; + &state_store, + ) + .await; // Init barrier - tx.send(Barrier::new_initial_for_test(test_epoch(1000))) + tx.send(Barrier::new_test_barrier(test_epoch(1000))) .unwrap(); now.next_unwrap_ready_barrier()?; @@ -552,7 +566,14 @@ mod tests { now.next_unwrap_ready_barrier()?; let chunk = now.next_unwrap_ready_chunk()?; - assert_eq!(chunk.cardinality(), 2); // seconds from 00:00:02 to 00:00:03 (inclusive) + assert_eq!( + chunk.compact(), + StreamChunk::from_pretty( + " TZ + + 2021-04-01T00:00:02.000Z + + 2021-04-01T00:00:03.000Z" + ) + ); let watermark = now.next_unwrap_ready_watermark()?; assert_eq!( @@ -564,6 +585,46 @@ mod tests { ) ); + // Recovery + drop((tx, now)); + let (tx, mut now) = create_executor( + NowMode::GenerateSeries { + start_timestamp, + interval, + }, + &state_store, + ) + .await; + + tx.send(Barrier::with_prev_epoch_for_test( + test_epoch(4000), + test_epoch(3000), + )) + .unwrap(); + + now.next_unwrap_ready_barrier()?; + + let chunk = now.next_unwrap_ready_chunk()?; + assert_eq!( + chunk.compact(), + StreamChunk::from_pretty( + " TZ + + 2021-04-01T00:00:02.000Z + + 2021-04-01T00:00:03.000Z + + 2021-04-01T00:00:04.000Z" + ) + ); + + let watermark = now.next_unwrap_ready_watermark()?; + assert_eq!( + watermark, + Watermark::new( + 0, + DataType::Timestamptz, + ScalarImpl::Timestamptz("2021-04-01T00:00:04.000Z".parse().unwrap()) + ) + ); + Ok(()) } From 556c0aa19bb19172fae8bb493a6b73af7134f514 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 21 Jun 2024 17:43:20 +0800 Subject: [PATCH 08/13] add e2e and planner tests Signed-off-by: Richard Chien --- e2e_test/streaming/now.slt | 27 ++++++++++++++ .../input/generate_series_with_now.yaml | 30 ++++++++++++++++ .../tests/testdata/output/expr.yaml | 6 ++-- .../output/generate_series_with_now.yaml | 35 +++++++++++++++++++ 4 files changed, 95 insertions(+), 3 deletions(-) create mode 100644 e2e_test/streaming/now.slt create mode 100644 src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml create mode 100644 src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml diff --git a/e2e_test/streaming/now.slt b/e2e_test/streaming/now.slt new file mode 100644 index 0000000000000..6ce471cca06de --- /dev/null +++ b/e2e_test/streaming/now.slt @@ -0,0 +1,27 @@ +system ok +psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev -c " +create materialized view mv as +select * from generate_series( + to_timestamp($(date +%s) - 10), + now(), + interval '1 second' +); +" + +statement ok +flush; + +query I +select count(*) > 10 from mv; +---- +t + +sleep 2s + +query I +select count(*) > 12 from mv; +---- +t + +statement ok +drop materialized view mv; diff --git a/src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml b/src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml new file mode 100644 index 0000000000000..e121aba41ff6f --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/input/generate_series_with_now.yaml @@ -0,0 +1,30 @@ +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamptz, + now(), + interval '1 hour' + ); + expected_outputs: + - logical_plan + - optimized_logical_plan_for_stream + - stream_plan +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamp, -- `timestamp` type is not supported + now(), + interval '1 hour' + ); + expected_outputs: + - binder_error +- sql: | + select * from generate_series( + now() - interval '1 hour', + now(), + interval '1 hour' + ); + expected_outputs: + - stream_error +- sql: | + select * from unnest(array[now(), now()]); + expected_outputs: + - stream_error diff --git a/src/frontend/planner_test/tests/testdata/output/expr.yaml b/src/frontend/planner_test/tests/testdata/output/expr.yaml index 4ba572a54e600..f88f7c4d69b76 100644 --- a/src/frontend/planner_test/tests/testdata/output/expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/expr.yaml @@ -543,7 +543,7 @@ Failed to bind expression: v1 >= now() Caused by: - Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: Some(GroupBy). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information + Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: Some(GroupBy). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information - name: forbid now in select for stream sql: | create table t (v1 timestamp with time zone, v2 timestamp with time zone); @@ -552,7 +552,7 @@ Failed to bind expression: now() Caused by: - Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: None. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information + Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: None. Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information - name: forbid now in agg filter for stream sql: | create table t (v1 timestamp with time zone, v2 int); @@ -561,7 +561,7 @@ Failed to bind expression: sum(v2) FILTER(WHERE v1 >= now()) Caused by: - Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING` and `ON`. Found in clause: Some(Filter). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information + Invalid input syntax: For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: Some(Filter). Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information - name: typo pg_teminate_backend sql: | select pg_teminate_backend(1); diff --git a/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml b/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml new file mode 100644 index 0000000000000..93277d6148ac2 --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml @@ -0,0 +1,35 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamptz, + now(), + interval '1 hour' + ); + logical_plan: |- + LogicalProject { exprs: [generate_series] } + └─LogicalTableFunction { table_function: GenerateSeries('2024-06-21 17:36:00':Varchar::Timestamptz, Now, '01:00:00':Interval) } + optimized_logical_plan_for_stream: 'LogicalNow { output: [ts] }' + stream_plan: |- + StreamMaterialize { columns: [generate_series], stream_key: [generate_series], pk_columns: [generate_series], pk_conflict: NoCheck, watermark_columns: [generate_series] } + └─StreamNow { output: [ts] } +- sql: | + select * from generate_series( + '2024-06-21 17:36:00'::timestamp, -- `timestamp` type is not supported + now(), + interval '1 hour' + ); + binder_error: function generate_series(timestamp without time zone, timestamp with time zone, interval) does not exist +- sql: | + select * from generate_series( + now() - interval '1 hour', + now(), + interval '1 hour' + ); + stream_error: |- + Not supported: General `now()` function + HINT: If you are trying to use `generate_series` with `now()`, please kindly check whether you are using it in a correct way. +- sql: | + select * from unnest(array[now(), now()]); + stream_error: |- + Not supported: General `now()` function + HINT: If you are trying to use `generate_series` with `now()`, please kindly check whether you are using it in a correct way. From 78bbaac0543dce6a77fa686e6927ae1637ff70c6 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 21 Jun 2024 17:55:51 +0800 Subject: [PATCH 09/13] fix misc check Signed-off-by: Richard Chien --- proto/stream_plan.proto | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index e8438794a7548..a43e34bde8df3 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -725,8 +725,7 @@ message RowIdGenNode { uint64 row_id_index = 1; } -message NowModeUpdateCurrent { -} +message NowModeUpdateCurrent {} message NowModeGenerateSeries { data.Datum start_timestamp = 1; From ee269407b34357d96d8dd5acf6a452f6b56e9ccc Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 24 Jun 2024 16:17:20 +0800 Subject: [PATCH 10/13] improve error message Signed-off-by: Richard Chien --- .../tests/testdata/output/generate_series_with_now.yaml | 8 ++++---- .../src/optimizer/plan_node/logical_project_set.rs | 7 +++---- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml b/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml index 93277d6148ac2..4c8d71f987351 100644 --- a/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml +++ b/src/frontend/planner_test/tests/testdata/output/generate_series_with_now.yaml @@ -26,10 +26,10 @@ interval '1 hour' ); stream_error: |- - Not supported: General `now()` function - HINT: If you are trying to use `generate_series` with `now()`, please kindly check whether you are using it in a correct way. + Not supported: General `now()` function in streaming queries + HINT: Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns. - sql: | select * from unnest(array[now(), now()]); stream_error: |- - Not supported: General `now()` function - HINT: If you are trying to use `generate_series` with `now()`, please kindly check whether you are using it in a correct way. + Not supported: General `now()` function in streaming queries + HINT: Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns. diff --git a/src/frontend/src/optimizer/plan_node/logical_project_set.rs b/src/frontend/src/optimizer/plan_node/logical_project_set.rs index f4d3f4f81cee1..8f6966ece6c70 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project_set.rs @@ -403,10 +403,9 @@ impl ToStream for LogicalProjectSet { if self.select_list().iter().any(|item| item.has_now()) { // User may use `now()` in table function in a wrong way, because we allow `now()` in `FROM` clause. return Err(ErrorCode::NotSupported( - "General `now()` function".to_string(), - "If you are trying to use `generate_series` with `now()`, please kindly check \ - whether you are using it in a correct way." - .to_string(), + "General `now()` function in streaming queries".to_string(), + "Streaming `now()` is currently only supported in GenerateSeries and TemporalFilter patterns." + .to_string(), ) .into()); } From e63f93a5b0c745529718fac57000689984243ec8 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 24 Jun 2024 16:21:24 +0800 Subject: [PATCH 11/13] try fix ci Signed-off-by: Richard Chien --- e2e_test/streaming/now.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e_test/streaming/now.slt b/e2e_test/streaming/now.slt index 6ce471cca06de..5fd12fef3ad6d 100644 --- a/e2e_test/streaming/now.slt +++ b/e2e_test/streaming/now.slt @@ -1,8 +1,8 @@ system ok -psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev -c " +./risedev psql -c " create materialized view mv as select * from generate_series( - to_timestamp($(date +%s) - 10), + to_timestamp($(date +%s)) - interval '10 second', now(), interval '1 second' ); From 092c695e6c3e993266c60219fd57a196b9d81d89 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 24 Jun 2024 17:57:08 +0800 Subject: [PATCH 12/13] try try fix ci Signed-off-by: Richard Chien --- ci/scripts/e2e-test-parallel.sh | 8 ++++---- e2e_test/streaming/now.slt | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/ci/scripts/e2e-test-parallel.sh b/ci/scripts/e2e-test-parallel.sh index 5f16a4c817871..c366ef07fc209 100755 --- a/ci/scripts/e2e-test-parallel.sh +++ b/ci/scripts/e2e-test-parallel.sh @@ -38,21 +38,21 @@ RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=i echo "--- e2e, ci-3streaming-2serving-3fe, streaming" RUST_LOG=$RUST_LOG \ risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-streaming-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-streaming-${profile}" --label "parallel" kill_cluster echo "--- e2e, ci-3streaming-2serving-3fe, batch" RUST_LOG=$RUST_LOG \ risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-batch-ddl-${profile}" -sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-batch-ddl-${profile}" --label "parallel" +sqllogictest "${host_args[@]}" -d dev './e2e_test/visibility_mode/*.slt' -j 16 --junit "parallel-batch-${profile}" --label "parallel" kill_cluster echo "--- e2e, ci-3streaming-2serving-3fe, generated" RUST_LOG=$RUST_LOG \ risedev ci-start ci-3streaming-2serving-3fe -sqllogictest "${host_args[@]}" -d dev './e2e_test/generated/**/*.slt' -j 16 --junit "parallel-generated-${profile}" +sqllogictest "${host_args[@]}" -d dev './e2e_test/generated/**/*.slt' -j 16 --junit "parallel-generated-${profile}" --label "parallel" kill_cluster diff --git a/e2e_test/streaming/now.slt b/e2e_test/streaming/now.slt index 5fd12fef3ad6d..324469e947e7a 100644 --- a/e2e_test/streaming/now.slt +++ b/e2e_test/streaming/now.slt @@ -1,3 +1,7 @@ +# In madsim test, we cannot spawn process. +skipif madsim +# In parallel test, we cannot get the DB name. +skipif parallel system ok ./risedev psql -c " create materialized view mv as @@ -8,20 +12,30 @@ select * from generate_series( ); " +skipif madsim +skipif parallel statement ok flush; +skipif madsim +skipif parallel query I select count(*) > 10 from mv; ---- t +skipif madsim +skipif parallel sleep 2s +skipif madsim +skipif parallel query I select count(*) > 12 from mv; ---- t +skipif madsim +skipif parallel statement ok drop materialized view mv; From 8b77020e6810bb98fa31477e0fff3eb7992af8be Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 24 Jun 2024 19:21:57 +0800 Subject: [PATCH 13/13] try fix ci again Signed-off-by: Richard Chien --- e2e_test/streaming/now.slt | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/e2e_test/streaming/now.slt b/e2e_test/streaming/now.slt index 324469e947e7a..ad086f2202e7a 100644 --- a/e2e_test/streaming/now.slt +++ b/e2e_test/streaming/now.slt @@ -2,6 +2,8 @@ skipif madsim # In parallel test, we cannot get the DB name. skipif parallel +# TODO: Later if we introduce a new `now()`-like function that returns the time of statement execution, +# we'll be able to directly create MV without `./risedev psql` and so that we can remove these `skipif`. system ok ./risedev psql -c " create materialized view mv as @@ -20,7 +22,7 @@ flush; skipif madsim skipif parallel query I -select count(*) > 10 from mv; +select count(*) >= 10 from mv; ---- t @@ -28,10 +30,15 @@ skipif madsim skipif parallel sleep 2s +skipif madsim +skipif parallel +statement ok +flush; + skipif madsim skipif parallel query I -select count(*) > 12 from mv; +select count(*) >= 12 from mv; ---- t