Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: date_trunc support timezone #6818

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions datafusion/core/tests/sqllogictests/test_files/timestamps.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1281,7 +1281,19 @@ SELECT
----
true false true true

# date_trunc with timestamptz
statement ok
set timezone to '+08:00';

query P
select date_trunc('hour', timestamptz '2000-01-01T00:00:00');
----
2000-01-01T00:00:00+08:00
Comment on lines +1289 to +1291
Copy link
Contributor

@waitingkuo waitingkuo Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should add a test case that truncate the day


query P
select date_trunc('day', timestamptz '2000-01-01T00:00:00');
----
2000-01-01T00:00:00+08:00

##########
## Common timestamp data
Expand Down
53 changes: 37 additions & 16 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,13 +563,12 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ConcatWithSeparator => Ok(Utf8),
BuiltinScalarFunction::DatePart => Ok(Float64),
BuiltinScalarFunction::DateBin | BuiltinScalarFunction::DateTrunc => {
match input_expr_types[1] {
Timestamp(Nanosecond, _) | Utf8 | Null => {
Ok(Timestamp(Nanosecond, None))
}
Timestamp(Microsecond, _) => Ok(Timestamp(Microsecond, None)),
Timestamp(Millisecond, _) => Ok(Timestamp(Millisecond, None)),
Timestamp(Second, _) => Ok(Timestamp(Second, None)),
match &input_expr_types[1] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if any of the calculations actually have to change based on timezone (as wouldn't the end of theday being truncated depend on the timezone)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/arrow-datafusion/blob/02a470f6061cce8ee8e57f7af8a6a0e0ddc1571b/datafusion/physical-expr/src/datetime_expressions.rs#L343-L347

it truncates before applying timezone offset, which is inconsistent with postgresql

set timezone to '+08:00';
0 rows in set. Query took 0.026 seconds.

❯ select timestamptz '2000-01-01T00:00:00';
+-----------------------------+
| Utf8("2000-01-01T00:00:00") |
+-----------------------------+
| 2000-01-01T00:00:00+08:00   |
+-----------------------------+
1 row in set. Query took 0.002 seconds.

❯ select date_trunc('day', timestamptz '2000-01-01T00:00:00');
+-----------------------------------------------------+
| date_trunc(Utf8("day"),Utf8("2000-01-01T00:00:00")) |
+-----------------------------------------------------+
| 1999-12-31T00:00:00                                 |
+-----------------------------------------------------+
1 row in set. Query took 0.032 seconds.
willy=# select timestamptz '2000-01-01T00:00:00';
      timestamptz       
------------------------
 2000-01-01 00:00:00+08
(1 row)

willy=# select date_trunc('day', timestamptz '2000-01-01T00:00:00');
       date_trunc       
------------------------
 2000-01-01 00:00:00+08
(1 row)

Utf8 | Null => Ok(Timestamp(Nanosecond, None)),
Timestamp(Nanosecond, tz) => Ok(Timestamp(Nanosecond, tz.clone())),
Timestamp(Microsecond, tz) => Ok(Timestamp(Microsecond, tz.clone())),
Timestamp(Millisecond, tz) => Ok(Timestamp(Millisecond, tz.clone())),
Timestamp(Second, tz) => Ok(Timestamp(Second, tz.clone())),
_ => Err(DataFusionError::Internal(format!(
"The {self} function can only accept timestamp as the second arg."
))),
Expand Down Expand Up @@ -945,15 +944,37 @@ impl BuiltinScalarFunction {
],
self.volatility(),
),
BuiltinScalarFunction::DateTrunc => Signature::one_of(
vec![
Exact(vec![Utf8, Timestamp(Nanosecond, None)]),
Exact(vec![Utf8, Timestamp(Microsecond, None)]),
Exact(vec![Utf8, Timestamp(Millisecond, None)]),
Exact(vec![Utf8, Timestamp(Second, None)]),
],
self.volatility(),
),
BuiltinScalarFunction::DateTrunc => {
let time_zones = vec![
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @waitingkuo has some ideas about how to deal with timezones in function signatures.

It seems not quite right to have to list all timezones specially for date_trun only -- shouldn't the timezone logic apply to any function that takes timezones ? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure there is a better way to provide the timezone here. And wait for @waitingkuo suggestion.

Also, it should be extended to other functions using timezone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW this issue also arises for decimal types, I suspect that the current pattern of explicitly listing the possible type signatures is going to require revisiting. We probably want to extend the pattern established in #6778 and allow functions to return a signature given a set of input types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alamb @Weijun-H @tustvold . i don't have the solution. actually i have some other pr blocked by this for now.
i wonder whether force it to be coerced to UTC Timestamp(someunit, Some("+00:00".into()) makes sense before we have better solution for what @tustvold suggested

"-12:00", "-11:00", "-10:00", "-09:30", "-09:00", "-08:00", "-07:00",
"-06:00", "-05:00", "-04:30", "-04:00", "-03:30", "-03:00", "-02:00",
"-01:00", "+00:00", "+01:00", "+02:00", "+03:00", "+03:30", "+04:00",
"+04:30", "+05:00", "+05:30", "+05:45", "+06:00", "+06:30", "+07:00",
"+08:00", "+08:30", "+08:45", "+09:00", "+09:30", "+10:00", "+10:30",
"+11:00", "+11:30", "+12:00", "+12:45", "+13:00", "+14:00",
];
let time_units = vec![
TimeUnit::Second,
TimeUnit::Millisecond,
TimeUnit::Microsecond,
TimeUnit::Nanosecond,
];
let mut signatures = vec![];
for unit in time_units {
signatures.push(TypeSignature::Exact(vec![
Utf8,
Timestamp(unit.clone(), None),
]));
for tz in &time_zones {
signatures.push(TypeSignature::Exact(vec![
Utf8,
Timestamp(unit.clone(), Some((*tz).into())),
]));
}
}

Signature::one_of(signatures, self.volatility())
}
BuiltinScalarFunction::DateBin => {
let base_sig = |array_type: TimeUnit| {
vec![
Expand Down
202 changes: 173 additions & 29 deletions datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use arrow::{
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
},
};
use arrow_array::timezone::Tz;
use arrow_array::{
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampSecondArray,
};
Expand Down Expand Up @@ -218,13 +219,28 @@ fn quarter_month(date: &NaiveDateTime) -> u32 {
/// epoch, for granularities greater than 1 second, in taking into
/// account that some granularities are not uniform durations of time
/// (e.g. months are not always the same lengths, leap seconds, etc)
fn date_trunc_coarse(granularity: &str, value: i64) -> Result<i64> {
fn date_trunc_coarse(granularity: &str, value: i64, tz: Option<Arc<str>>) -> Result<i64> {
// Use chrono NaiveDateTime to clear the various fields
// correctly accounting for non uniform granularities
let value = timestamp_ns_to_datetime(value).ok_or_else(|| {
DataFusionError::Execution(format!("Timestamp {value} out of range"))
})?;

// convert to local time without time zone
let value = match tz {
Some(tz) => {
let tz: Tz = tz.parse()?;
tz.offset_from_local_datetime(&value)
.map(|offset| DateTime::<Tz>::from_utc(value + offset.fix(), offset))
.single()
.ok_or_else(|| {
DataFusionError::Execution(format!("Timestamp {value} out of range"))
})?
.naive_utc()
}
None => value,
};

let value = Some(value);

let value = match granularity {
Expand Down Expand Up @@ -282,6 +298,7 @@ fn date_trunc_coarse(granularity: &str, value: i64) -> Result<i64> {
// truncates a single value with the given timeunit to the specified granularity
fn _date_trunc(
tu: TimeUnit,
tz: Option<Arc<str>>,
value: &Option<i64>,
granularity: &str,
) -> Result<Option<i64>, DataFusionError> {
Expand All @@ -297,33 +314,105 @@ fn _date_trunc(
};

// convert to nanoseconds
let nano = date_trunc_coarse(granularity, scale * value)?;
let nano = date_trunc_coarse(granularity, scale * value, tz.clone())?;

let result = match tu {
TimeUnit::Second => match granularity {
"minute" => Some(nano / 1_000_000_000 / 60 * 60),
_ => Some(nano / 1_000_000_000),
"minute" => nano / 1_000_000_000 / 60 * 60,
_ => nano / 1_000_000_000,
},
TimeUnit::Millisecond => match granularity {
"minute" => Some(nano / 1_000_000 / 1_000 / 60 * 1_000 * 60),
"second" => Some(nano / 1_000_000 / 1_000 * 1_000),
_ => Some(nano / 1_000_000),
"minute" => nano / 1_000_000 / 1_000 / 60 * 1_000 * 60,
"second" => nano / 1_000_000 / 1_000 * 1_000,
_ => nano / 1_000_000,
},
TimeUnit::Microsecond => match granularity {
"minute" => Some(nano / 1_000 / 1_000_000 / 60 * 60 * 1_000_000),
"second" => Some(nano / 1_000 / 1_000_000 * 1_000_000),
"millisecond" => Some(nano / 1_000 / 1_000 * 1_000),
_ => Some(nano / 1_000),
"minute" => nano / 1_000 / 1_000_000 / 60 * 60 * 1_000_000,
"second" => nano / 1_000 / 1_000_000 * 1_000_000,
"millisecond" => nano / 1_000 / 1_000 * 1_000,
_ => nano / 1_000,
},
_ => match granularity {
"minute" => Some(nano / 1_000_000_000 / 60 * 1_000_000_000 * 60),
"second" => Some(nano / 1_000_000_000 * 1_000_000_000),
"millisecond" => Some(nano / 1_000_000 * 1_000_000),
"microsecond" => Some(nano / 1_000 * 1_000),
_ => Some(nano),
"minute" => nano / 1_000_000_000 / 60 * 1_000_000_000 * 60,
"second" => nano / 1_000_000_000 * 1_000_000_000,
"millisecond" => nano / 1_000_000 * 1_000_000,
"microsecond" => nano / 1_000 * 1_000,
_ => nano,
},
};
Ok(result)

let result = match tz {
Some(tz) => {
let tz: Tz = tz.parse()?;
match tu {
TimeUnit::Second => {
let value = tz.from_local_datetime(
&NaiveDateTime::from_timestamp_opt(result, 0).unwrap(),
);
let value = value
.single()
.ok_or_else(|| {
DataFusionError::Execution("Invalid timestamp".to_string())
})?
.naive_utc();
value.timestamp_nanos() / 1_000_000_000
}

TimeUnit::Millisecond => {
let value = tz.from_local_datetime(
&NaiveDateTime::from_timestamp_opt(
result / 1_000,
(result % 1_000 * 1_000_000) as u32,
)
.unwrap(),
);
let value = value
.single()
.ok_or_else(|| {
DataFusionError::Execution("Invalid timestamp".to_string())
})?
.naive_utc();
value.timestamp_nanos() / 1_000_000
}

TimeUnit::Microsecond => {
let value = tz.from_local_datetime(
&NaiveDateTime::from_timestamp_opt(
result / 1_000_000,
(result % 1_000_000 * 1_000) as u32,
)
.unwrap(),
);
let value = value
.single()
.ok_or_else(|| {
DataFusionError::Execution("Invalid timestamp".to_string())
})?
.naive_utc();
value.timestamp_nanos() / 1_000
}
_ => {
let value = tz.from_local_datetime(
&NaiveDateTime::from_timestamp_opt(
result / 1_000_000_000,
(result % 1_000_000_000) as u32,
)
.unwrap(),
);
let value = value
.single()
.ok_or_else(|| {
DataFusionError::Execution("Invalid timestamp".to_string())
})?
.naive_utc();
value.timestamp_nanos()
}
}
}
None => result,
};

Ok(Some(result))
}

/// date_trunc SQL function
Expand All @@ -341,62 +430,117 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {

Ok(match array {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
let value = _date_trunc(TimeUnit::Nanosecond, v, granularity.as_str())?;
let value = _date_trunc(
TimeUnit::Nanosecond,
tz_opt.clone(),
v,
granularity.as_str(),
)?;

let value = ScalarValue::TimestampNanosecond(value, tz_opt.clone());
ColumnarValue::Scalar(value)
}
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
let value = _date_trunc(TimeUnit::Microsecond, v, granularity.as_str())?;
let value = _date_trunc(
TimeUnit::Microsecond,
tz_opt.clone(),
v,
granularity.as_str(),
)?;
let value = ScalarValue::TimestampMicrosecond(value, tz_opt.clone());
ColumnarValue::Scalar(value)
}
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
let value = _date_trunc(TimeUnit::Millisecond, v, granularity.as_str())?;
let value = _date_trunc(
TimeUnit::Millisecond,
tz_opt.clone(),
v,
granularity.as_str(),
)?;
let value = ScalarValue::TimestampMillisecond(value, tz_opt.clone());
ColumnarValue::Scalar(value)
}
ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
let value = _date_trunc(TimeUnit::Second, v, granularity.as_str())?;
let value =
_date_trunc(TimeUnit::Second, tz_opt.clone(), v, granularity.as_str())?;
let value = ScalarValue::TimestampSecond(value, tz_opt.clone());
ColumnarValue::Scalar(value)
}
ColumnarValue::Array(array) => {
let array_type = array.data_type();
match array_type {
DataType::Timestamp(TimeUnit::Second, _) => {
DataType::Timestamp(TimeUnit::Second, tz_opt) => {
let array = as_timestamp_second_array(array)?;
let array = array
.iter()
.map(|x| _date_trunc(TimeUnit::Second, &x, granularity.as_str()))
.map(|x| {
_date_trunc(
TimeUnit::Second,
tz_opt.clone(),
&x,
granularity.as_str(),
)
})
.collect::<Result<TimestampSecondArray>>()?;
ColumnarValue::Array(Arc::new(array))
}
DataType::Timestamp(TimeUnit::Millisecond, _) => {
DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
let array = as_timestamp_millisecond_array(array)?;
let array = array
.iter()
.map(|x| {
_date_trunc(TimeUnit::Millisecond, &x, granularity.as_str())
_date_trunc(
TimeUnit::Millisecond,
tz_opt.clone(),
&x,
granularity.as_str(),
)
})
.collect::<Result<TimestampMillisecondArray>>()?;
ColumnarValue::Array(Arc::new(array))
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
let array = as_timestamp_microsecond_array(array)?;
let array = array
.iter()
.map(|x| {
_date_trunc(TimeUnit::Microsecond, &x, granularity.as_str())
_date_trunc(
TimeUnit::Microsecond,
tz_opt.clone(),
&x,
granularity.as_str(),
)
})
.collect::<Result<TimestampMicrosecondArray>>()?;
ColumnarValue::Array(Arc::new(array))
}
DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
let array = as_timestamp_nanosecond_array(array)?;
let array = array
.iter()
.map(|x| {
_date_trunc(
TimeUnit::Nanosecond,
tz_opt.clone(),
&x,
granularity.as_str(),
)
})
.collect::<Result<TimestampNanosecondArray>>()?;

ColumnarValue::Array(Arc::new(array))
}
_ => {
let array = as_timestamp_nanosecond_array(array)?;
let array = array
.iter()
.map(|x| {
_date_trunc(TimeUnit::Nanosecond, &x, granularity.as_str())
_date_trunc(
TimeUnit::Nanosecond,
None,
&x,
granularity.as_str(),
)
})
.collect::<Result<TimestampNanosecondArray>>()?;

Expand Down Expand Up @@ -990,7 +1134,7 @@ mod tests {
cases.iter().for_each(|(original, granularity, expected)| {
let left = string_to_timestamp_nanos(original).unwrap();
let right = string_to_timestamp_nanos(expected).unwrap();
let result = date_trunc_coarse(granularity, left).unwrap();
let result = date_trunc_coarse(granularity, left, None).unwrap();
assert_eq!(result, right, "{original} = {expected}");
});
}
Expand Down