From cdaa8cf63d75498aef56a4b5a94b40aebed8b011 Mon Sep 17 00:00:00 2001 From: xiangjinwu <17769960+xiangjinwu@users.noreply.github.com> Date: Mon, 13 Mar 2023 16:59:20 +0800 Subject: [PATCH] fix(common): interval should have microsecond precision (#8501) --- dashboard/proto/gen/data.ts | 10 +- e2e_test/batch/types/interval.slt.part | 9 +- .../batch/types/temporal_arithmetic.slt.part | 6 +- proto/data.proto | 2 +- src/batch/src/executor/order_by.rs | 8 +- src/common/src/array/arrow.rs | 5 +- src/common/src/array/column_proto_readers.rs | 4 +- src/common/src/hash/key.rs | 4 +- src/common/src/row/owned_row.rs | 12 +- src/common/src/test_utils/rand_array.rs | 4 +- src/common/src/types/chrono_wrapper.rs | 2 +- src/common/src/types/interval.rs | 317 ++++++++++-------- src/common/src/types/mod.rs | 2 +- src/common/src/util/value_encoding/mod.rs | 6 +- src/connector/src/parser/avro/parser.rs | 6 +- src/connector/src/parser/avro/util.rs | 4 +- src/expr/src/vector_op/arithmetic_op.rs | 18 +- src/expr/src/vector_op/cast.rs | 14 +- src/expr/src/vector_op/tumble.rs | 4 +- .../optimizer/plan_node/logical_hop_window.rs | 8 +- src/stream/src/executor/watermark_filter.rs | 4 +- 21 files changed, 258 insertions(+), 191 deletions(-) diff --git a/dashboard/proto/gen/data.ts b/dashboard/proto/gen/data.ts index 1b865bc65a17a..f6d42c26fa32d 100644 --- a/dashboard/proto/gen/data.ts +++ b/dashboard/proto/gen/data.ts @@ -190,7 +190,7 @@ export function opToJSON(object: Op): string { export interface IntervalUnit { months: number; days: number; - ms: number; + usecs: number; } export interface DataType { @@ -513,7 +513,7 @@ export interface Terminate { } function createBaseIntervalUnit(): IntervalUnit { - return { months: 0, days: 0, ms: 0 }; + return { months: 0, days: 0, usecs: 0 }; } export const IntervalUnit = { @@ -521,7 +521,7 @@ export const IntervalUnit = { return { months: isSet(object.months) ? Number(object.months) : 0, days: isSet(object.days) ? Number(object.days) : 0, - ms: isSet(object.ms) ? Number(object.ms) : 0, + usecs: isSet(object.usecs) ? Number(object.usecs) : 0, }; }, @@ -529,7 +529,7 @@ export const IntervalUnit = { const obj: any = {}; message.months !== undefined && (obj.months = Math.round(message.months)); message.days !== undefined && (obj.days = Math.round(message.days)); - message.ms !== undefined && (obj.ms = Math.round(message.ms)); + message.usecs !== undefined && (obj.usecs = Math.round(message.usecs)); return obj; }, @@ -537,7 +537,7 @@ export const IntervalUnit = { const message = createBaseIntervalUnit(); message.months = object.months ?? 0; message.days = object.days ?? 0; - message.ms = object.ms ?? 0; + message.usecs = object.usecs ?? 0; return message; }, }; diff --git a/e2e_test/batch/types/interval.slt.part b/e2e_test/batch/types/interval.slt.part index b0d5b59d9bc59..16c688707485a 100644 --- a/e2e_test/batch/types/interval.slt.part +++ b/e2e_test/batch/types/interval.slt.part @@ -109,8 +109,13 @@ select interval '1 year 1 month 1 day 1:1:1.009'; ---- 1 year 1 mon 1 day 01:01:01.009 -# issue#7059, if we improve precision, then this should be removed. -query TTTTTT +# issue #7059 +query T select '1 mons 1 days 00:00:00.000001'::INTERVAL; ---- +1 mon 1 day 00:00:00.000001 + +query T +select '1 mons 1 days 00:00:00.0000001'::INTERVAL; +---- 1 mon 1 day diff --git a/e2e_test/batch/types/temporal_arithmetic.slt.part b/e2e_test/batch/types/temporal_arithmetic.slt.part index a07c029ae31b7..34e4f40ecc19c 100644 --- a/e2e_test/batch/types/temporal_arithmetic.slt.part +++ b/e2e_test/batch/types/temporal_arithmetic.slt.part @@ -66,7 +66,7 @@ select interval '20' / float '12.5'; query T select interval '12 days' / 4.2; ---- -2 days 20:34:17.143 +2 days 20:34:17.142857 query T SELECT interval '1 month' / 2000; @@ -136,7 +136,7 @@ select real '6.1' * interval '1' second; query T select real '61.1' * interval '1' second; ---- -00:01:01.1 +00:01:01.099998 query T select real '0.0' * interval '1' second; @@ -161,7 +161,7 @@ select interval '1' second * real '6.1'; query T select interval '1' second * real '61.1'; ---- -00:01:01.1 +00:01:01.099998 query T select interval '1' second * real '0.0'; diff --git a/proto/data.proto b/proto/data.proto index fba6ec6f4c1c0..1544130320211 100644 --- a/proto/data.proto +++ b/proto/data.proto @@ -10,7 +10,7 @@ option optimize_for = SPEED; message IntervalUnit { int32 months = 1; int32 days = 2; - int64 ms = 3; + int64 usecs = 3; } message DataType { diff --git a/src/batch/src/executor/order_by.rs b/src/batch/src/executor/order_by.rs index ad7fe212feab2..3d02af44e7cb9 100644 --- a/src/batch/src/executor/order_by.rs +++ b/src/batch/src/executor/order_by.rs @@ -464,9 +464,9 @@ mod tests { None] }, column! { IntervalArray, [ None, - Some(IntervalUnit::new(1, 2, 3)), + Some(IntervalUnit::from_month_day_usec(1, 2, 3)), None, - Some(IntervalUnit::new(4, 5, 6)), + Some(IntervalUnit::from_month_day_usec(4, 5, 6)), None] }, ], 5, @@ -491,8 +491,8 @@ mod tests { Some(NaiveDateTimeWrapper::with_secs_nsecs(1, 23).unwrap()), Some(NaiveDateTimeWrapper::with_secs_nsecs(7, 89).unwrap())] }, column! { IntervalArray, [ - Some(IntervalUnit::new(4, 5, 6)), - Some(IntervalUnit::new(1, 2, 3)), + Some(IntervalUnit::from_month_day_usec(4, 5, 6)), + Some(IntervalUnit::from_month_day_usec(1, 2, 3)), None, None, None] }, diff --git a/src/common/src/array/arrow.rs b/src/common/src/array/arrow.rs index 851bb93df2077..1a4c5099e31a6 100644 --- a/src/common/src/array/arrow.rs +++ b/src/common/src/array/arrow.rs @@ -348,14 +348,15 @@ impl FromIntoArrow for IntervalUnit { fn from_arrow(value: Self::ArrowType) -> Self { let (months, days, ns) = arrow_array::types::IntervalMonthDayNanoType::to_parts(value); - IntervalUnit::new(months, days, ns / 1_000_000) + IntervalUnit::from_month_day_usec(months, days, ns / 1000) } fn into_arrow(self) -> Self::ArrowType { arrow_array::types::IntervalMonthDayNanoType::make_value( self.get_months(), self.get_days(), - self.get_ms() * 1_000_000, + // TODO: this may overflow and we need `try_into` + self.get_usecs() * 1000, ) } } diff --git a/src/common/src/array/column_proto_readers.rs b/src/common/src/array/column_proto_readers.rs index f1e85f2e40d22..65345dce6efe2 100644 --- a/src/common/src/array/column_proto_readers.rs +++ b/src/common/src/array/column_proto_readers.rs @@ -101,9 +101,9 @@ pub fn read_interval_unit(cursor: &mut Cursor<&[u8]>) -> ArrayResult()?; let days = cursor.read_i32::()?; - let ms = cursor.read_i64::()?; + let usecs = cursor.read_i64::()?; - Ok::<_, std::io::Error>(IntervalUnit::new(months, days, ms)) + Ok::<_, std::io::Error>(IntervalUnit::from_month_day_usec(months, days, usecs)) }; match read() { diff --git a/src/common/src/hash/key.rs b/src/common/src/hash/key.rs index 0856bda158daa..95a6d6c58189d 100644 --- a/src/common/src/hash/key.rs +++ b/src/common/src/hash/key.rs @@ -365,14 +365,14 @@ impl HashKeySerDe<'_> for IntervalUnit { let mut ret = [0; 16]; ret[0..4].copy_from_slice(&self.get_months().to_ne_bytes()); ret[4..8].copy_from_slice(&self.get_days().to_ne_bytes()); - ret[8..16].copy_from_slice(&self.get_ms().to_ne_bytes()); + ret[8..16].copy_from_slice(&self.get_usecs().to_ne_bytes()); ret } fn deserialize(source: &mut R) -> Self { let value = Self::read_fixed_size_bytes::(source); - IntervalUnit::new( + IntervalUnit::from_month_day_usec( i32::from_ne_bytes(value[0..4].try_into().unwrap()), i32::from_ne_bytes(value[4..8].try_into().unwrap()), i64::from_ne_bytes(value[8..16].try_into().unwrap()), diff --git a/src/common/src/row/owned_row.rs b/src/common/src/row/owned_row.rs index 7d0029af70821..e929176dc8727 100644 --- a/src/common/src/row/owned_row.rs +++ b/src/common/src/row/owned_row.rs @@ -228,7 +228,9 @@ mod tests { Some(ScalarImpl::Float32(4.0.into())), Some(ScalarImpl::Float64(5.0.into())), Some(ScalarImpl::Decimal("-233.3".parse().unwrap())), - Some(ScalarImpl::Interval(IntervalUnit::new(7, 8, 9))), + Some(ScalarImpl::Interval(IntervalUnit::from_month_day_usec( + 7, 8, 9, + ))), ]); let value_indices = (0..9).collect_vec(); let bytes = (&row).project(&value_indices).value_serialize(); @@ -261,10 +263,14 @@ mod tests { Some(ScalarImpl::Float32(4.0.into())), Some(ScalarImpl::Float64(5.0.into())), Some(ScalarImpl::Decimal("-233.3".parse().unwrap())), - Some(ScalarImpl::Interval(IntervalUnit::new(7, 8, 9))), + Some(ScalarImpl::Interval(IntervalUnit::from_month_day_usec( + 7, 8, 9, + ))), ]); let row2 = OwnedRow::new(vec![ - Some(ScalarImpl::Interval(IntervalUnit::new(7, 8, 9))), + Some(ScalarImpl::Interval(IntervalUnit::from_month_day_usec( + 7, 8, 9, + ))), Some(ScalarImpl::Utf8("string".into())), Some(ScalarImpl::Bool(true)), Some(ScalarImpl::Int16(1)), diff --git a/src/common/src/test_utils/rand_array.rs b/src/common/src/test_utils/rand_array.rs index 6aa42d32acdbc..4341067cce8ad 100644 --- a/src/common/src/test_utils/rand_array.rs +++ b/src/common/src/test_utils/rand_array.rs @@ -78,8 +78,8 @@ impl RandValue for IntervalUnit { fn rand_value(rand: &mut R) -> Self { let months = rand.gen_range(0..100); let days = rand.gen_range(0..200); - let ms = rand.gen_range(0..100_000); - IntervalUnit::new(months, days, ms) + let usecs = rand.gen_range(0..100_000); + IntervalUnit::from_month_day_usec(months, days, usecs) } } diff --git a/src/common/src/types/chrono_wrapper.rs b/src/common/src/types/chrono_wrapper.rs index 2dd08c3a2dc35..03cd602744a6a 100644 --- a/src/common/src/types/chrono_wrapper.rs +++ b/src/common/src/types/chrono_wrapper.rs @@ -565,7 +565,7 @@ impl CheckedAdd for NaiveDateTimeWrapper { } let mut datetime = NaiveDateTime::new(date, self.0.time()); datetime = datetime.checked_add_signed(Duration::days(rhs.get_days().into()))?; - datetime = datetime.checked_add_signed(Duration::milliseconds(rhs.get_ms()))?; + datetime = datetime.checked_add_signed(Duration::microseconds(rhs.get_usecs()))?; Some(NaiveDateTimeWrapper::new(datetime)) } diff --git a/src/common/src/types/interval.rs b/src/common/src/types/interval.rs index b42a27e3e86c3..153eadc9076ab 100644 --- a/src/common/src/types/interval.rs +++ b/src/common/src/types/interval.rs @@ -33,7 +33,7 @@ use crate::error::{ErrorCode, Result, RwError}; /// Every interval can be represented by a `IntervalUnit`. /// Note that the difference between Interval and Instant. /// For example, `5 yrs 1 month 25 days 23:22:57` is a interval (Can be interpreted by Interval Unit -/// with month = 61, days = 25, seconds = (57 + 23 * 3600 + 22 * 60) * 1000), +/// with months = 61, days = 25, usecs = (57 + 23 * 3600 + 22 * 60) * 1000000), /// `1970-01-01 04:05:06` is a Instant or Timestamp /// One month may contain 28/31 days. One day may contain 23/25 hours. /// This internals is learned from PG: @@ -42,22 +42,27 @@ use crate::error::{ErrorCode, Result, RwError}; pub struct IntervalUnit { months: i32, days: i32, - ms: i64, + usecs: i64, } -const DAY_MS: i64 = 86400000; -const MONTH_MS: i64 = 30 * DAY_MS; +const USECS_PER_SEC: i64 = 1_000_000; +const USECS_PER_DAY: i64 = 86400 * USECS_PER_SEC; +const USECS_PER_MONTH: i64 = 30 * USECS_PER_DAY; impl IntervalUnit { /// Smallest interval value. pub const MIN: Self = Self { months: i32::MIN, days: i32::MIN, - ms: i64::MIN, + usecs: i64::MIN, }; - pub fn new(months: i32, days: i32, ms: i64) -> Self { - IntervalUnit { months, days, ms } + pub fn from_month_day_usec(months: i32, days: i32, usecs: i64) -> Self { + IntervalUnit { + months, + days, + usecs, + } } pub fn get_days(&self) -> i32 { @@ -68,24 +73,24 @@ impl IntervalUnit { self.months } - pub fn get_ms(&self) -> i64 { - self.ms + pub fn get_usecs(&self) -> i64 { + self.usecs } - pub fn get_ms_of_day(&self) -> u64 { - self.ms.rem_euclid(DAY_MS) as u64 + pub fn get_usecs_of_day(&self) -> u64 { + self.usecs.rem_euclid(USECS_PER_DAY) as u64 } - /// Justify interval, convert 1 month to 30 days and 86400 ms to 1 day. - /// If day is positive, complement the ms negative value. + /// Justify interval, convert 1 month to 30 days and 86400 s to 1 day. + /// If day is positive, complement the seconds negative value. /// These rules only use in interval comparison. pub fn justify_interval(&mut self) { #[expect(deprecated)] - let total_ms = self.as_ms_i64(); + let total_usecs = self.as_usecs_i64(); *self = Self { months: 0, - days: (total_ms / DAY_MS) as i32, - ms: total_ms % DAY_MS, + days: (total_usecs / USECS_PER_DAY) as i32, + usecs: total_usecs % USECS_PER_DAY, } } @@ -96,23 +101,23 @@ impl IntervalUnit { } #[deprecated] - fn from_total_ms(ms: i64) -> Self { - let mut remaining_ms = ms; - let months = remaining_ms / MONTH_MS; - remaining_ms -= months * MONTH_MS; - let days = remaining_ms / DAY_MS; - remaining_ms -= days * DAY_MS; + fn from_total_usecs(usecs: i64) -> Self { + let mut remaining_usecs = usecs; + let months = remaining_usecs / USECS_PER_MONTH; + remaining_usecs -= months * USECS_PER_MONTH; + let days = remaining_usecs / USECS_PER_DAY; + remaining_usecs -= days * USECS_PER_DAY; IntervalUnit { months: (months as i32), days: (days as i32), - ms: remaining_ms, + usecs: remaining_usecs, } } pub fn to_protobuf(self, output: &mut T) -> ArrayResult { output.write_i32::(self.months)?; output.write_i32::(self.days)?; - output.write_i64::(self.ms)?; + output.write_i64::(self.usecs)?; Ok(16) } @@ -124,9 +129,13 @@ impl IntervalUnit { let rhs = rhs.try_into().ok()?; let months = self.months.checked_mul(rhs)?; let days = self.days.checked_mul(rhs)?; - let ms = self.ms.checked_mul(rhs as i64)?; + let usecs = self.usecs.checked_mul(rhs as i64)?; - Some(IntervalUnit { months, days, ms }) + Some(IntervalUnit { + months, + days, + usecs, + }) } /// Divides [`IntervalUnit`] by an integer/float with zero check. @@ -142,14 +151,16 @@ impl IntervalUnit { } #[expect(deprecated)] - let ms = self.as_ms_i64(); + let usecs = self.as_usecs_i64(); #[expect(deprecated)] - Some(IntervalUnit::from_total_ms((ms as f64 / rhs).round() as i64)) + Some(IntervalUnit::from_total_usecs( + (usecs as f64 / rhs).round() as i64 + )) } #[deprecated] - fn as_ms_i64(&self) -> i64 { - self.months as i64 * MONTH_MS + self.days as i64 * DAY_MS + self.ms + fn as_usecs_i64(&self) -> i64 { + self.months as i64 * USECS_PER_MONTH + self.days as i64 * USECS_PER_DAY + self.usecs } /// times [`IntervalUnit`] with an integer/float. @@ -161,9 +172,11 @@ impl IntervalUnit { let rhs = rhs.0; #[expect(deprecated)] - let ms = self.as_ms_i64(); + let usecs = self.as_usecs_i64(); #[expect(deprecated)] - Some(IntervalUnit::from_total_ms((ms as f64 * rhs).round() as i64)) + Some(IntervalUnit::from_total_usecs( + (usecs as f64 * rhs).round() as i64 + )) } /// Performs an exact division, returns [`None`] if for any unit, lhs % rhs != 0. @@ -193,14 +206,14 @@ impl IntervalUnit { check_unit(self.months as i64, rhs.months as i64)?; check_unit(self.days as i64, rhs.days as i64)?; - check_unit(self.ms, rhs.ms)?; + check_unit(self.usecs, rhs.usecs)?; res } /// Checks if [`IntervalUnit`] is positive. pub fn is_positive(&self) -> bool { - self > &Self::new(0, 0, 0) + self > &Self::from_month_day_usec(0, 0, 0) } /// Truncate the interval to the precision of milliseconds. @@ -215,13 +228,10 @@ impl IntervalUnit { /// ); /// ``` pub const fn truncate_millis(self) -> Self { - // for now it's just an identity function. - // it will take effect once microseconds precision is supported. - // https://github.com/risingwavelabs/risingwave/issues/4514 IntervalUnit { months: self.months, days: self.days, - ms: self.ms, + usecs: self.usecs / 1000 * 1000, } } @@ -240,7 +250,7 @@ impl IntervalUnit { IntervalUnit { months: self.months, days: self.days, - ms: self.ms / 1000 * 1000, + usecs: self.usecs / USECS_PER_SEC * USECS_PER_SEC, } } @@ -259,7 +269,7 @@ impl IntervalUnit { IntervalUnit { months: self.months, days: self.days, - ms: self.ms / 1000 / 60 * 1000 * 60, + usecs: self.usecs / USECS_PER_SEC / 60 * USECS_PER_SEC * 60, } } @@ -278,7 +288,7 @@ impl IntervalUnit { IntervalUnit { months: self.months, days: self.days, - ms: self.ms / 1000 / 60 / 60 * 1000 * 60 * 60, + usecs: self.usecs / USECS_PER_SEC / 60 / 60 * USECS_PER_SEC * 60 * 60, } } @@ -294,7 +304,7 @@ impl IntervalUnit { IntervalUnit { months: self.months, days: self.days, - ms: 0, + usecs: 0, } } @@ -310,7 +320,7 @@ impl IntervalUnit { IntervalUnit { months: self.months, days: 0, - ms: 0, + usecs: 0, } } @@ -326,7 +336,7 @@ impl IntervalUnit { IntervalUnit { months: self.months / 3 * 3, days: 0, - ms: 0, + usecs: 0, } } @@ -342,7 +352,7 @@ impl IntervalUnit { IntervalUnit { months: self.months / 12 * 12, days: 0, - ms: 0, + usecs: 0, } } @@ -358,7 +368,7 @@ impl IntervalUnit { IntervalUnit { months: self.months / 12 / 10 * 12 * 10, days: 0, - ms: 0, + usecs: 0, } } @@ -374,7 +384,7 @@ impl IntervalUnit { IntervalUnit { months: self.months / 12 / 100 * 12 * 100, days: 0, - ms: 0, + usecs: 0, } } @@ -390,7 +400,7 @@ impl IntervalUnit { IntervalUnit { months: self.months / 12 / 1000 * 12 * 1000, days: 0, - ms: 0, + usecs: 0, } } } @@ -414,8 +424,12 @@ pub mod test_utils { fn from_ymd(year: i32, month: i32, days: i32) -> Self { let months = year * 12 + month; let days = days; - let ms = 0; - IntervalUnit { months, days, ms } + let usecs = 0; + IntervalUnit { + months, + days, + usecs, + } } #[must_use] @@ -437,7 +451,7 @@ pub mod test_utils { #[must_use] fn from_millis(ms: i64) -> Self { Self { - ms, + usecs: ms * 1000, ..Default::default() } } @@ -445,7 +459,7 @@ pub mod test_utils { #[must_use] fn from_minutes(minutes: i64) -> Self { Self { - ms: 1000 * 60 * minutes, + usecs: USECS_PER_SEC * 60 * minutes, ..Default::default() } } @@ -478,9 +492,13 @@ impl Serialize for IntervalUnit { where S: serde::Serializer, { - let IntervalUnit { months, days, ms } = self.justified(); + let IntervalUnit { + months, + days, + usecs, + } = self.justified(); // serialize the `IntervalUnit` as a tuple - (months, days, ms).serialize(serializer) + (months, days, usecs).serialize(serializer) } } @@ -489,8 +507,12 @@ impl<'de> Deserialize<'de> for IntervalUnit { where D: serde::Deserializer<'de>, { - let (months, days, ms) = <(i32, i32, i64)>::deserialize(deserializer)?; - Ok(Self { months, days, ms }) + let (months, days, usecs) = <(i32, i32, i64)>::deserialize(deserializer)?; + Ok(Self { + months, + days, + usecs, + }) } } @@ -501,7 +523,7 @@ impl Into for IntervalUnit { IntervalUnitProto { months: self.months, days: self.days, - ms: self.ms, + usecs: self.usecs, } } } @@ -511,19 +533,19 @@ impl From<&'_ IntervalUnitProto> for IntervalUnit { Self { months: p.months, days: p.days, - ms: p.ms, + usecs: p.usecs, } } } impl From for IntervalUnit { fn from(time: NaiveTimeWrapper) -> Self { - let mut ms: i64 = (time.0.num_seconds_from_midnight() * 1000) as i64; - ms += (time.0.nanosecond() / 1_000_000) as i64; + let mut usecs: i64 = (time.0.num_seconds_from_midnight() as i64) * USECS_PER_SEC; + usecs += (time.0.nanosecond() / 1000) as i64; Self { months: 0, days: 0, - ms, + usecs, } } } @@ -534,8 +556,12 @@ impl Add for IntervalUnit { fn add(self, rhs: Self) -> Self { let months = self.months + rhs.months; let days = self.days + rhs.days; - let ms = self.ms + rhs.ms; - IntervalUnit { months, days, ms } + let usecs = self.usecs + rhs.usecs; + IntervalUnit { + months, + days, + usecs, + } } } @@ -546,7 +572,7 @@ impl PartialOrd for IntervalUnit { } else { let diff = *self - *other; let days = diff.months as i64 * 30 + diff.days as i64; - Some((days * DAY_MS + diff.ms).cmp(&0)) + Some((days * USECS_PER_DAY + diff.usecs).cmp(&0)) } } } @@ -555,7 +581,7 @@ impl Hash for IntervalUnit { fn hash(&self, state: &mut H) { let interval = self.justified(); interval.months.hash(state); - interval.ms.hash(state); + interval.usecs.hash(state); interval.days.hash(state); } } @@ -564,7 +590,7 @@ impl PartialEq for IntervalUnit { fn eq(&self, other: &Self) -> bool { let interval = self.justified(); let other = other.justified(); - interval.days == other.days && interval.ms == other.ms + interval.days == other.days && interval.usecs == other.usecs } } @@ -580,8 +606,12 @@ impl CheckedNeg for IntervalUnit { fn checked_neg(&self) -> Option { let months = self.months.checked_neg()?; let days = self.days.checked_neg()?; - let ms = self.ms.checked_neg()?; - Some(IntervalUnit { months, days, ms }) + let usecs = self.usecs.checked_neg()?; + Some(IntervalUnit { + months, + days, + usecs, + }) } } @@ -589,8 +619,12 @@ impl CheckedAdd for IntervalUnit { fn checked_add(&self, other: &Self) -> Option { let months = self.months.checked_add(other.months)?; let days = self.days.checked_add(other.days)?; - let ms = self.ms.checked_add(other.ms)?; - Some(IntervalUnit { months, days, ms }) + let usecs = self.usecs.checked_add(other.usecs)?; + Some(IntervalUnit { + months, + days, + usecs, + }) } } @@ -600,8 +634,12 @@ impl Sub for IntervalUnit { fn sub(self, rhs: Self) -> Self { let months = self.months - rhs.months; let days = self.days - rhs.days; - let ms = self.ms - rhs.ms; - IntervalUnit { months, days, ms } + let usecs = self.usecs - rhs.usecs; + IntervalUnit { + months, + days, + usecs, + } } } @@ -609,25 +647,31 @@ impl CheckedSub for IntervalUnit { fn checked_sub(&self, other: &Self) -> Option { let months = self.months.checked_sub(other.months)?; let days = self.days.checked_sub(other.days)?; - let ms = self.ms.checked_sub(other.ms)?; - Some(IntervalUnit { months, days, ms }) + let usecs = self.usecs.checked_sub(other.usecs)?; + Some(IntervalUnit { + months, + days, + usecs, + }) } } impl Zero for IntervalUnit { fn zero() -> Self { - Self::new(0, 0, 0) + Self::from_month_day_usec(0, 0, 0) } fn is_zero(&self) -> bool { - self.months == 0 && self.days == 0 && self.ms == 0 + self.months == 0 && self.days == 0 && self.usecs == 0 } } impl IsNegative for IntervalUnit { fn is_negative(&self) -> bool { let i = self.justified(); - i.months < 0 || (i.months == 0 && i.days < 0) || (i.months == 0 && i.days == 0 && i.ms < 0) + i.months < 0 + || (i.months == 0 && i.days < 0) + || (i.months == 0 && i.days == 0 && i.usecs < 0) } } @@ -638,7 +682,7 @@ impl Neg for IntervalUnit { Self { months: -self.months, days: -self.days, - ms: -self.ms, + usecs: -self.usecs, } } } @@ -685,21 +729,22 @@ impl Display for IntervalUnit { } else if days != 0 { write(format_args!("{days} days"))?; } - if self.ms != 0 || self.months == 0 && self.days == 0 { - let ms = self.ms.abs(); + if self.usecs != 0 || self.months == 0 && self.days == 0 { + let usecs = self.usecs.abs(); + let ms = usecs / 1000; let hours = ms / 1000 / 3600; let minutes = (ms / 1000 / 60) % 60; let seconds = ms % 60000 / 1000; - let secs_fract = ms % 1000; + let secs_fract = usecs % USECS_PER_SEC; - if self.ms < 0 { + if self.usecs < 0 { write(format_args!("-{hours:0>2}:{minutes:0>2}:{seconds:0>2}"))?; } else { write(format_args!("{hours:0>2}:{minutes:0>2}:{seconds:0>2}"))?; } if secs_fract != 0 { - let mut buf = [0u8; 4]; - write!(buf.as_mut_slice(), ".{:03}", secs_fract).unwrap(); + let mut buf = [0u8; 7]; + write!(buf.as_mut_slice(), ".{:06}", secs_fract).unwrap(); write!( f, "{}", @@ -720,7 +765,7 @@ impl ToSql for IntervalUnit { out: &mut BytesMut, ) -> std::result::Result> { // refer: https://github.com/postgres/postgres/blob/517bf2d91/src/backend/utils/adt/timestamp.c#L1008 - out.put_i64(self.ms * 1000); + out.put_i64(self.usecs); out.put_i32(self.days); out.put_i32(self.months); Ok(IsNull::No) @@ -736,12 +781,10 @@ impl<'a> FromSql<'a> for IntervalUnit { _: &Type, mut raw: &'a [u8], ) -> std::result::Result> { - let micros = raw.read_i64::()?; + let usecs = raw.read_i64::()?; let days = raw.read_i32::()?; let months = raw.read_i32::()?; - // TODO: https://github.com/risingwavelabs/risingwave/issues/4514 - // Only support ms now. - Ok(IntervalUnit::new(months, days, micros / 1000)) + Ok(IntervalUnit::from_month_day_usec(months, days, usecs)) } fn accepts(ty: &Type) -> bool { @@ -943,21 +986,21 @@ impl IntervalUnit { (|| match leading_field { Year => { let months = num.checked_mul(12)?; - Some(IntervalUnit::new(months as i32, 0, 0)) + Some(IntervalUnit::from_month_day_usec(months as i32, 0, 0)) } - Month => Some(IntervalUnit::new(num as i32, 0, 0)), - Day => Some(IntervalUnit::new(0, num as i32, 0)), + Month => Some(IntervalUnit::from_month_day_usec(num as i32, 0, 0)), + Day => Some(IntervalUnit::from_month_day_usec(0, num as i32, 0)), Hour => { - let ms = num.checked_mul(3600 * 1000)?; - Some(IntervalUnit::new(0, 0, ms)) + let usecs = num.checked_mul(3600 * USECS_PER_SEC)?; + Some(IntervalUnit::from_month_day_usec(0, 0, usecs)) } Minute => { - let ms = num.checked_mul(60 * 1000)?; - Some(IntervalUnit::new(0, 0, ms)) + let usecs = num.checked_mul(60 * USECS_PER_SEC)?; + Some(IntervalUnit::from_month_day_usec(0, 0, usecs)) } Second => { - let ms = num.checked_mul(1000)?; - Some(IntervalUnit::new(0, 0, ms)) + let usecs = num.checked_mul(USECS_PER_SEC)?; + Some(IntervalUnit::from_month_day_usec(0, 0, usecs)) } })() .ok_or_else(|| ErrorCode::InvalidInputSyntax(format!("Invalid interval {}.", s)).into()) @@ -973,28 +1016,28 @@ impl IntervalUnit { return Err(ErrorCode::InvalidInputSyntax(format!("Invalid interval {}.", &s)).into()); } let mut token_iter = tokens.into_iter(); - let mut result = IntervalUnit::new(0, 0, 0); + let mut result = IntervalUnit::from_month_day_usec(0, 0, 0); while let Some(num) = token_iter.next() && let Some(interval_unit) = token_iter.next() { match (num, interval_unit) { (TimeStrToken::Num(num), TimeStrToken::TimeUnit(interval_unit)) => { result = result + (|| match interval_unit { Year => { let months = num.checked_mul(12)?; - Some(IntervalUnit::new(months as i32, 0, 0)) + Some(IntervalUnit::from_month_day_usec(months as i32, 0, 0)) } - Month => Some(IntervalUnit::new(num as i32, 0, 0)), - Day => Some(IntervalUnit::new(0, num as i32, 0)), + Month => Some(IntervalUnit::from_month_day_usec(num as i32, 0, 0)), + Day => Some(IntervalUnit::from_month_day_usec(0, num as i32, 0)), Hour => { - let ms = num.checked_mul(3600 * 1000)?; - Some(IntervalUnit::new(0, 0, ms)) + let usecs = num.checked_mul(3600 * USECS_PER_SEC)?; + Some(IntervalUnit::from_month_day_usec(0, 0, usecs)) } Minute => { - let ms = num.checked_mul(60 * 1000)?; - Some(IntervalUnit::new(0, 0, ms)) + let usecs = num.checked_mul(60 * USECS_PER_SEC)?; + Some(IntervalUnit::from_month_day_usec(0, 0, usecs)) } Second => { - let ms = num.checked_mul(1000)?; - Some(IntervalUnit::new(0, 0, ms)) + let usecs = num.checked_mul(USECS_PER_SEC)?; + Some(IntervalUnit::from_month_day_usec(0, 0, usecs)) } })() .ok_or_else(|| ErrorCode::InvalidInputSyntax(format!("Invalid interval {}.", s)))?; @@ -1002,11 +1045,9 @@ impl IntervalUnit { (TimeStrToken::Second(second), TimeStrToken::TimeUnit(interval_unit)) => { result = result + match interval_unit { Second => { - // Currently our precision is millisecond, we should consider to refactor it to microseconds later (#4514). // If unsatisfied precision is passed as input, we should not return None (Error). - // TODO: IntervalUnit only support millisecond precision so the part smaller than millisecond will be truncated. - let ms = (second.into_inner() * 1000_f64).round() as i64; - Some(IntervalUnit::new(0, 0, ms)) + let usecs = (second.into_inner() * (USECS_PER_SEC as f64)).round() as i64; + Some(IntervalUnit::from_month_day_usec(0, 0, usecs)) } _ => None, } @@ -1121,23 +1162,27 @@ mod tests { #[test] fn test_to_string() { assert_eq!( - IntervalUnit::new(-14, 3, 11 * 3600 * 1000 + 45 * 60 * 1000 + 14 * 1000 + 233) - .to_string(), - "-1 years -2 mons 3 days 11:45:14.233" + IntervalUnit::from_month_day_usec( + -14, + 3, + (11 * 3600 + 45 * 60 + 14) * USECS_PER_SEC + 233 + ) + .to_string(), + "-1 years -2 mons 3 days 11:45:14.000233" ); assert_eq!( - IntervalUnit::new(-14, 3, 0).to_string(), + IntervalUnit::from_month_day_usec(-14, 3, 0).to_string(), "-1 years -2 mons 3 days" ); assert_eq!(IntervalUnit::default().to_string(), "00:00:00"); assert_eq!( - IntervalUnit::new( + IntervalUnit::from_month_day_usec( -14, 3, - -(11 * 3600 * 1000 + 45 * 60 * 1000 + 14 * 1000 + 233) + -((11 * 3600 + 45 * 60 + 14) * USECS_PER_SEC + 233) ) .to_string(), - "-1 years -2 mons 3 days -11:45:14.233" + "-1 years -2 mons 3 days -11:45:14.000233" ); } @@ -1157,8 +1202,8 @@ mod tests { ]; for (lhs, rhs, expected) in cases { - let lhs = IntervalUnit::new(lhs.0, lhs.1, lhs.2 as i64); - let rhs = IntervalUnit::new(rhs.0, rhs.1, rhs.2 as i64); + let lhs = IntervalUnit::from_month_day_usec(lhs.0, lhs.1, lhs.2 as i64); + let rhs = IntervalUnit::from_month_day_usec(rhs.0, rhs.1, rhs.2 as i64); let result = std::panic::catch_unwind(|| { let actual = lhs.exact_div(&rhs); assert_eq!(actual, expected); @@ -1174,21 +1219,21 @@ mod tests { fn test_div_float() { let cases_int = [ ((10, 8, 6), 2, Some((5, 4, 3))), - ((1, 2, 33), 3, Some((0, 10, 57600011))), + ((1, 2, 33), 3, Some((0, 10, 57600000011i64))), ((1, 0, 11), 10, Some((0, 3, 1))), ((5, 6, 7), 0, None), ]; let cases_float = [ ((10, 8, 6), 2.0f32, Some((5, 4, 3))), - ((1, 2, 33), 3.0f32, Some((0, 10, 57600011))), + ((1, 2, 33), 3.0f32, Some((0, 10, 57600000011i64))), ((10, 15, 100), 2.5f32, Some((4, 6, 40))), ((5, 6, 7), 0.0f32, None), ]; for (lhs, rhs, expected) in cases_int { - let lhs = IntervalUnit::new(lhs.0, lhs.1, lhs.2 as i64); - let expected = expected.map(|x| IntervalUnit::new(x.0, x.1, x.2 as i64)); + let lhs = IntervalUnit::from_month_day_usec(lhs.0, lhs.1, lhs.2 as i64); + let expected = expected.map(|x| IntervalUnit::from_month_day_usec(x.0, x.1, x.2)); let actual = lhs.div_float(rhs as i16); assert_eq!(actual, expected); @@ -1201,8 +1246,8 @@ mod tests { } for (lhs, rhs, expected) in cases_float { - let lhs = IntervalUnit::new(lhs.0, lhs.1, lhs.2 as i64); - let expected = expected.map(|x| IntervalUnit::new(x.0, x.1, x.2 as i64)); + let lhs = IntervalUnit::from_month_day_usec(lhs.0, lhs.1, lhs.2 as i64); + let expected = expected.map(|x| IntervalUnit::from_month_day_usec(x.0, x.1, x.2)); let actual = lhs.div_float(OrderedFloat::(rhs)); assert_eq!(actual, expected); @@ -1215,7 +1260,7 @@ mod tests { #[test] fn test_serialize_deserialize() { let mut serializer = memcomparable::Serializer::new(vec![]); - let a = IntervalUnit::new(123, 456, 789); + let a = IntervalUnit::from_month_day_usec(123, 456, 789); a.serialize(&mut serializer).unwrap(); let buf = serializer.into_inner(); let mut deserializer = memcomparable::Deserializer::new(&buf[..]); @@ -1227,22 +1272,26 @@ mod tests { let cases = [ ((1, 2, 3), (4, 5, 6), Ordering::Less), ((0, 31, 0), (1, 0, 0), Ordering::Greater), - ((1, 0, 0), (0, 0, MONTH_MS + 1), Ordering::Less), - ((0, 1, 0), (0, 0, DAY_MS + 1), Ordering::Less), - ((2, 3, 4), (1, 2, 4 + DAY_MS + MONTH_MS), Ordering::Equal), + ((1, 0, 0), (0, 0, USECS_PER_MONTH + 1), Ordering::Less), + ((0, 1, 0), (0, 0, USECS_PER_DAY + 1), Ordering::Less), + ( + (2, 3, 4), + (1, 2, 4 + USECS_PER_DAY + USECS_PER_MONTH), + Ordering::Equal, + ), ]; - for ((lhs_months, lhs_days, lhs_ms), (rhs_months, rhs_days, rhs_ms), order) in cases { + for ((lhs_months, lhs_days, lhs_usecs), (rhs_months, rhs_days, rhs_usecs), order) in cases { let lhs = { let mut serializer = memcomparable::Serializer::new(vec![]); - IntervalUnit::new(lhs_months, lhs_days, lhs_ms) + IntervalUnit::from_month_day_usec(lhs_months, lhs_days, lhs_usecs) .serialize(&mut serializer) .unwrap(); serializer.into_inner() }; let rhs = { let mut serializer = memcomparable::Serializer::new(vec![]); - IntervalUnit::new(rhs_months, rhs_days, rhs_ms) + IntervalUnit::from_month_day_usec(rhs_months, rhs_days, rhs_usecs) .serialize(&mut serializer) .unwrap(); serializer.into_inner() diff --git a/src/common/src/types/mod.rs b/src/common/src/types/mod.rs index ff44b5b9c419e..8ba856d20f3ec 100644 --- a/src/common/src/types/mod.rs +++ b/src/common/src/types/mod.rs @@ -1390,7 +1390,7 @@ mod tests { ), DataTypeName::Timestamptz => (ScalarImpl::Int64(233333333), DataType::Timestamptz), DataTypeName::Interval => ( - ScalarImpl::Interval(IntervalUnit::new(2, 3, 3333)), + ScalarImpl::Interval(IntervalUnit::from_month_day_usec(2, 3, 3333)), DataType::Interval, ), DataTypeName::Jsonb => (ScalarImpl::Jsonb(JsonbVal::dummy()), DataType::Jsonb), diff --git a/src/common/src/util/value_encoding/mod.rs b/src/common/src/util/value_encoding/mod.rs index 29ec11e8e0a2a..c5e72b0d43f8c 100644 --- a/src/common/src/util/value_encoding/mod.rs +++ b/src/common/src/util/value_encoding/mod.rs @@ -254,7 +254,7 @@ fn serialize_str(bytes: &[u8], buf: &mut impl BufMut) { fn serialize_interval(interval: &IntervalUnit, buf: &mut impl BufMut) { buf.put_i32_le(interval.get_months()); buf.put_i32_le(interval.get_days()); - buf.put_i64_le(interval.get_ms()); + buf.put_i64_le(interval.get_usecs()); } fn serialize_naivedate(days: i32, buf: &mut impl BufMut) { @@ -348,8 +348,8 @@ fn deserialize_bool(data: &mut impl Buf) -> Result { fn deserialize_interval(data: &mut impl Buf) -> Result { let months = data.get_i32_le(); let days = data.get_i32_le(); - let ms = data.get_i64_le(); - Ok(IntervalUnit::new(months, days, ms)) + let usecs = data.get_i64_le(); + Ok(IntervalUnit::from_month_day_usec(months, days, usecs)) } fn deserialize_naivetime(data: &mut impl Buf) -> Result { diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 0118e7a8d6472..c4cccdc9800dd 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -447,9 +447,9 @@ mod test { Value::Duration(duration) => { let months = u32::from(duration.months()) as i32; let days = u32::from(duration.days()) as i32; - let millis = u32::from(duration.millis()) as i64; - let duration = Some(ScalarImpl::Interval(IntervalUnit::new( - months, days, millis, + let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows + let duration = Some(ScalarImpl::Interval(IntervalUnit::from_month_day_usec( + months, days, usecs, ))); assert_eq!(row[i], duration); } diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index 4761855bd7970..33937e62282ad 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -310,8 +310,8 @@ pub(crate) fn from_avro_value(value: Value, value_schema: &Schema) -> Result { let months = u32::from(duration.months()) as i32; let days = u32::from(duration.days()) as i32; - let millis = u32::from(duration.millis()) as i64; - ScalarImpl::Interval(IntervalUnit::new(months, days, millis)) + let usecs = (u32::from(duration.millis()) as i64) * 1000; // never overflows + ScalarImpl::Interval(IntervalUnit::from_month_day_usec(months, days, usecs)) } Value::Enum(_, symbol) => ScalarImpl::Utf8(symbol.into_boxed_str()), Value::Record(descs) => { diff --git a/src/expr/src/vector_op/arithmetic_op.rs b/src/expr/src/vector_op/arithmetic_op.rs index 6275c74b811c5..13be501bbea5d 100644 --- a/src/expr/src/vector_op/arithmetic_op.rs +++ b/src/expr/src/vector_op/arithmetic_op.rs @@ -137,8 +137,10 @@ pub fn timestamp_timestamp_sub( ) -> Result { let tmp = l.0 - r.0; // this does not overflow or underflow let days = tmp.num_days(); - let ms = (tmp - Duration::days(tmp.num_days())).num_milliseconds(); - Ok(IntervalUnit::new(0, days as i32, ms)) + let usecs = (tmp - Duration::days(tmp.num_days())) + .num_microseconds() + .ok_or_else(|| ExprError::NumericOutOfRange)?; + Ok(IntervalUnit::from_month_day_usec(0, days as i32, usecs)) } #[inline(always)] @@ -258,7 +260,7 @@ fn timestamptz_interval_inner( } let result: Option = try { - let delta_usecs = r.get_ms().checked_mul(1000)?; + let delta_usecs = r.get_usecs(); f(l, delta_usecs)? }; @@ -301,8 +303,10 @@ pub fn time_date_add( #[inline(always)] pub fn time_time_sub(l: NaiveTimeWrapper, r: NaiveTimeWrapper) -> Result { let tmp = l.0 - r.0; // this does not overflow or underflow - let ms = tmp.num_milliseconds(); - Ok(IntervalUnit::new(0, 0, ms)) + let usecs = tmp + .num_microseconds() + .ok_or_else(|| ExprError::NumericOutOfRange)?; + Ok(IntervalUnit::from_month_day_usec(0, 0, usecs)) } #[inline(always)] @@ -311,7 +315,7 @@ pub fn time_interval_sub( r: IntervalUnit, ) -> Result { let time = l.0; - let (new_time, ignored) = time.overflowing_sub_signed(Duration::milliseconds(r.get_ms())); + let (new_time, ignored) = time.overflowing_sub_signed(Duration::microseconds(r.get_usecs())); if ignored == 0 { Ok(NaiveTimeWrapper::new(new_time)) } else { @@ -325,7 +329,7 @@ pub fn time_interval_add( r: IntervalUnit, ) -> Result { let time = l.0; - let (new_time, ignored) = time.overflowing_add_signed(Duration::milliseconds(r.get_ms())); + let (new_time, ignored) = time.overflowing_add_signed(Duration::microseconds(r.get_usecs())); if ignored == 0 { Ok(NaiveTimeWrapper::new(new_time)) } else { diff --git a/src/expr/src/vector_op/cast.rs b/src/expr/src/vector_op/cast.rs index 7ab68cca3b271..26f955773efae 100644 --- a/src/expr/src/vector_op/cast.rs +++ b/src/expr/src/vector_op/cast.rs @@ -368,9 +368,9 @@ pub fn timestamp_to_time(elem: NaiveDateTimeWrapper) -> NaiveTimeWrapper { /// In `PostgreSQL`, casting from interval to time discards the days part. #[inline(always)] pub fn interval_to_time(elem: IntervalUnit) -> NaiveTimeWrapper { - let ms = elem.get_ms_of_day(); - let secs = (ms / 1000) as u32; - let nano = (ms % 1000 * 1_000_000) as u32; + let usecs = elem.get_usecs_of_day(); + let secs = (usecs / 1_000_000) as u32; + let nano = (usecs % 1_000_000 * 1000) as u32; NaiveTimeWrapper::from_num_seconds_from_midnight_uncheck(secs, nano) } @@ -854,12 +854,12 @@ mod tests { str_to_time("04:02").unwrap(), ); assert_eq!( - interval_to_time(IntervalUnit::new(1, 2, 61003)), - str_to_time("00:01:01.003").unwrap(), + interval_to_time(IntervalUnit::from_month_day_usec(1, 2, 61000003)), + str_to_time("00:01:01.000003").unwrap(), ); assert_eq!( - interval_to_time(IntervalUnit::new(0, 0, -61003)), - str_to_time("23:58:58.997").unwrap(), + interval_to_time(IntervalUnit::from_month_day_usec(0, 0, -61000003)), + str_to_time("23:58:58.999997").unwrap(), ); } diff --git a/src/expr/src/vector_op/tumble.rs b/src/expr/src/vector_op/tumble.rs index d5381b00f76d3..d8eee168eaaef 100644 --- a/src/expr/src/vector_op/tumble.rs +++ b/src/expr/src/vector_op/tumble.rs @@ -55,7 +55,7 @@ fn tm_diff_bin(diff_usecs: i64, window: IntervalUnit) -> Result { reason: "unimplemented: tumble_start only support days or milliseconds".to_string(), }); } - let window_usecs = window.get_days() as i64 * 24 * 60 * 60 * 1_000_000 + window.get_ms() * 1000; + let window_usecs = window.get_days() as i64 * 24 * 60 * 60 * 1_000_000 + window.get_usecs(); if window_usecs <= 0 { return Err(ExprError::InvalidParam { @@ -78,7 +78,7 @@ mod tests { #[test] fn test_tumble_start_date_time() { let dt = NaiveDateWrapper::from_ymd_uncheck(2022, 2, 22).and_hms_uncheck(22, 22, 22); - let interval = IntervalUnit::new(0, 0, 30 * 60 * 1000); + let interval = IntervalUnit::from_month_day_usec(0, 0, 30 * 60 * 1_000_000); let w = tumble_start_date_time(dt, interval).unwrap().0; assert_eq!(w.year(), 2022); assert_eq!(w.month(), 2); diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index 85efe484429f2..d95de46748975 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -442,8 +442,8 @@ mod test { let hop_window: PlanRef = LogicalHopWindow::new( values.into(), InputRef::new(0, DataType::Date), - IntervalUnit::new(0, 1, 0), - IntervalUnit::new(0, 3, 0), + IntervalUnit::from_month_day_usec(0, 1, 0), + IntervalUnit::from_month_day_usec(0, 3, 0), None, ) .into(); @@ -497,8 +497,8 @@ mod test { let hop_window: PlanRef = LogicalHopWindow::new( values.into(), InputRef::new(0, DataType::Date), - IntervalUnit::new(0, 1, 0), - IntervalUnit::new(0, 3, 0), + IntervalUnit::from_month_day_usec(0, 1, 0), + IntervalUnit::from_month_day_usec(0, 3, 0), None, ) .into(); diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 44b2a5c02f228..98e9fc82d4c89 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -337,7 +337,9 @@ mod tests { InputRefExpression::new(WATERMARK_TYPE.clone(), 1).boxed(), LiteralExpression::new( interval_type, - Some(ScalarImpl::Interval(IntervalUnit::new(0, 1, 0))), + Some(ScalarImpl::Interval(IntervalUnit::from_month_day_usec( + 0, 1, 0, + ))), ) .boxed(), )