From d0a2c8aa4027b209438811fe7f9fa637f3951ad8 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 10 Apr 2023 21:14:04 +0800 Subject: [PATCH 1/4] fix timestamptz type mapping Signed-off-by: tabVersion --- src/connector/src/sink/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index ca83e660b0af0..979e135a3d1cd 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -299,8 +299,10 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult { json!(v.to_text()) } - (DataType::Timestamptz, ScalarRefImpl::Timestamp(v)) => { - json!(v.0.and_local_timezone(chrono::Utc).unwrap().to_rfc3339()) + (DataType::Timestamptz, ScalarRefImpl::Int64(v)) => { + // risingwave's timestamp with timezone is stored in UTC and does not maintain the + // timezone info and the time is in microsecond. + json!(v) } (DataType::Time, ScalarRefImpl::Time(v)) => { // todo: just ignore the nanos part to avoid leap second complex From e0f28727bfcfd6900e0186715409183f7477b632 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 10 Apr 2023 21:38:07 +0800 Subject: [PATCH 2/4] fix ut Signed-off-by: tabVersion --- src/connector/src/sink/mod.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 979e135a3d1cd..debbd74e9737a 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -357,6 +357,7 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult Date: Mon, 10 Apr 2023 21:59:27 +0800 Subject: [PATCH 3/4] fix ut Signed-off-by: tabVersion --- src/connector/src/sink/mod.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index debbd74e9737a..35ddf436c0ea9 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -17,7 +17,9 @@ pub mod kafka; pub mod redis; pub mod remote; +use std::any::Any; use std::collections::HashMap; +use std::fmt::Display; use anyhow::anyhow; use async_trait::async_trait; @@ -343,9 +345,9 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult { + (data_type, scalar_ref) => { return Err(ArrayError::internal( - "datum_to_json_object: unsupported data type".to_string(), + format!("datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}", field.name, data_type, scalar_ref), )); } }; @@ -409,10 +411,7 @@ mod tests { data_type: DataType::Timestamptz, ..mock_field.clone() }, - Some( - ScalarImpl::Int64(tstz_inner) - .as_scalar_ref_impl(), - ), + Some(ScalarImpl::Int64(tstz_inner).as_scalar_ref_impl()), ) .unwrap(); From e10bd40e5d34366a07999cea28d3a7329f448755 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 10 Apr 2023 22:11:32 +0800 Subject: [PATCH 4/4] style Signed-off-by: tabVersion --- src/connector/src/sink/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 35ddf436c0ea9..c7ed6c8449270 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -17,9 +17,7 @@ pub mod kafka; pub mod redis; pub mod remote; -use std::any::Any; use std::collections::HashMap; -use std::fmt::Display; use anyhow::anyhow; use async_trait::async_trait;