Skip to content

Commit

Permalink
Fix logical vs physical schema mismatch for aliased now() (#12951)
Browse files Browse the repository at this point in the history
* test: reproducer of error with aliased now field

* fix: now() UDF is not nullable

* fix: when extracting metadata from expr, handle CastExpr

* test: update tests now that the fixes are working
  • Loading branch information
wiedld authored Oct 16, 2024
1 parent 44127ec commit 875aaa6
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 6 deletions.
6 changes: 5 additions & 1 deletion datafusion/functions/src/datetime/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use arrow::datatypes::DataType;
use arrow::datatypes::DataType::Timestamp;
use arrow::datatypes::TimeUnit::Nanosecond;

use datafusion_common::{internal_err, Result, ScalarValue};
use datafusion_common::{internal_err, ExprSchema, Result, ScalarValue};
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, Volatility};

Expand Down Expand Up @@ -84,4 +84,8 @@ impl ScalarUDFImpl for NowFunc {
ScalarValue::TimestampNanosecond(now_ts, Some("+00:00".into())),
)))
}

fn is_nullable(&self, _args: &[Expr], _schema: &dyn ExprSchema) -> bool {
false
}
}
6 changes: 5 additions & 1 deletion datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use datafusion_common::stats::Precision;
use datafusion_common::Result;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::expressions::{CastExpr, Literal};

use crate::execution_plan::CardinalityEffect;
use futures::stream::{Stream, StreamExt};
Expand Down Expand Up @@ -246,6 +246,10 @@ pub(crate) fn get_field_metadata(
e: &Arc<dyn PhysicalExpr>,
input_schema: &Schema,
) -> Option<HashMap<String, String>> {
if let Some(cast) = e.as_any().downcast_ref::<CastExpr>() {
return get_field_metadata(cast.expr(), input_schema);
}

// Look up field by index in schema (not NAME as there can be more than one
// column with the same name)
e.as_any()
Expand Down
19 changes: 15 additions & 4 deletions datafusion/sqllogictest/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,18 +319,29 @@ pub async fn register_metadata_tables(ctx: &SessionContext) {
String::from("metadata_key"),
String::from("the l_name field"),
)]));
let ts = Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), false)
.with_metadata(HashMap::from([(
String::from("metadata_key"),
String::from("ts non-nullable field"),
)]));

let schema = Schema::new(vec![id, name, l_name]).with_metadata(HashMap::from([(
String::from("metadata_key"),
String::from("the entire schema"),
)]));
let schema =
Schema::new(vec![id, name, l_name, ts]).with_metadata(HashMap::from([(
String::from("metadata_key"),
String::from("the entire schema"),
)]));

let batch = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as _,
Arc::new(StringArray::from(vec![None, Some("bar"), Some("baz")])) as _,
Arc::new(StringArray::from(vec![None, Some("l_bar"), Some("l_baz")])) as _,
Arc::new(TimestampNanosecondArray::from(vec![
1599572549190855123,
1599572549190855123,
1599572549190855123,
])) as _,
],
)
.unwrap();
Expand Down
16 changes: 16 additions & 0 deletions datafusion/sqllogictest/test_files/metadata.slt
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,21 @@ NULL NULL l_bar



query P rowsort
SELECT ts
FROM ((
SELECT now() AS ts
FROM table_with_metadata
) UNION ALL (
SELECT ts
FROM table_with_metadata
))
GROUP BY ts
ORDER BY ts
LIMIT 1;
----
2020-09-08T13:42:29.190855123Z


statement ok
drop table table_with_metadata;

0 comments on commit 875aaa6

Please sign in to comment.