diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 0d47f777b4a5f..c7ed6c8449270 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -299,8 +299,10 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult { json!(v.to_text()) } - (DataType::Timestamptz, ScalarRefImpl::Timestamp(v)) => { - json!(v.0.and_local_timezone(chrono::Utc).unwrap().to_rfc3339()) + (DataType::Timestamptz, ScalarRefImpl::Int64(v)) => { + // risingwave's timestamp with timezone is stored in UTC and does not maintain the + // timezone info and the time is in microsecond. + json!(v) } (DataType::Time, ScalarRefImpl::Time(v)) => { // todo: just ignore the nanos part to avoid leap second complex @@ -341,11 +343,10 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult { - return Err(ArrayError::internal(format!( - "datum_to_json_object: unsupported data type {:?}, {:?}", - field.data_type, scalar_ref, - ))); + (data_type, scalar_ref) => { + return Err(ArrayError::internal( + format!("datum_to_json_object: unsupported data type: field name: {:?}, logical type: {:?}, physical type: {:?}", field.name, data_type, scalar_ref), + )); } }; @@ -356,6 +357,7 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult