Skip to content

Commit

Permalink
fix(sink): remove unimplemented (risingwavelabs#8622)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
Co-authored-by: tabVersion <[email protected]>
  • Loading branch information
fuyufjh and tabVersion authored Mar 17, 2023
1 parent 7cd7c9d commit 8c95702
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 140 deletions.
98 changes: 3 additions & 95 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@ use rdkafka::message::ToBytes;
use rdkafka::producer::{BaseRecord, DefaultProducerContext, Producer, ThreadedProducer};
use rdkafka::types::RDKafkaErrorCode;
use rdkafka::ClientConfig;
use risingwave_common::array::{ArrayError, ArrayResult, Op, RowRef, StreamChunk};
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::Row;
use risingwave_common::types::to_text::ToText;
use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl};
use risingwave_common::util::iter_util::ZipEqFast;
use serde_derive::Deserialize;
use serde_json::{json, Map, Value};
use tracing::warn;
Expand All @@ -37,7 +34,7 @@ use super::{
Sink, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
use crate::common::KafkaCommon;
use crate::sink::Result;
use crate::sink::{datum_to_json_object, record_to_json, Result};
use crate::{deserialize_bool_from_string, deserialize_duration_from_string};

pub const KAFKA_SINK: &str = "kafka";
Expand Down Expand Up @@ -386,96 +383,6 @@ impl<const APPEND_ONLY: bool> Debug for KafkaSink<APPEND_ONLY> {
}
}

fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult<Value> {
let scalar_ref = match datum {
None => return Ok(Value::Null),
Some(datum) => datum,
};

let data_type = field.data_type();

tracing::debug!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);

let value = match (data_type, scalar_ref) {
(DataType::Boolean, ScalarRefImpl::Bool(v)) => {
json!(v)
}
(DataType::Int16, ScalarRefImpl::Int16(v)) => {
json!(v)
}
(DataType::Int32, ScalarRefImpl::Int32(v)) => {
json!(v)
}
(DataType::Int64, ScalarRefImpl::Int64(v)) => {
json!(v)
}
(DataType::Float32, ScalarRefImpl::Float32(v)) => {
json!(f32::from(v))
}
(DataType::Float64, ScalarRefImpl::Float64(v)) => {
json!(f64::from(v))
}
(DataType::Varchar, ScalarRefImpl::Utf8(v)) => {
json!(v)
}
(DataType::Decimal, ScalarRefImpl::Decimal(v)) => {
// fixme
json!(v.to_text())
}
(
dt @ DataType::Date
| dt @ DataType::Time
| dt @ DataType::Timestamp
| dt @ DataType::Timestamptz
| dt @ DataType::Interval
| dt @ DataType::Bytea,
scalar,
) => {
json!(scalar.to_text_with_type(&dt))
}
(DataType::List { datatype }, ScalarRefImpl::List(list_ref)) => {
let mut vec = Vec::with_capacity(list_ref.values_ref().len());
let inner_field = Field::unnamed(Box::<DataType>::into_inner(datatype));
for sub_datum_ref in list_ref.values_ref() {
let value = datum_to_json_object(&inner_field, sub_datum_ref)?;
vec.push(value);
}
json!(vec)
}
(DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
let mut map = Map::with_capacity(st.fields.len());
for (sub_datum_ref, sub_field) in struct_ref.fields_ref().into_iter().zip_eq_fast(
st.fields
.iter()
.zip_eq_fast(st.field_names.iter())
.map(|(dt, name)| Field::with_name(dt.clone(), name)),
) {
let value = datum_to_json_object(&sub_field, sub_datum_ref)?;
map.insert(sub_field.name.clone(), value);
}
json!(map)
}
_ => {
return Err(ArrayError::internal(
"datum_to_json_object: unsupported data type".to_string(),
));
}
};

Ok(value)
}

fn record_to_json(row: RowRef<'_>, schema: &[Field]) -> Result<Map<String, Value>> {
let mut mappings = Map::with_capacity(schema.len());
for (field, datum_ref) in schema.iter().zip_eq_fast(row.iter()) {
let key = field.name.clone();
let value = datum_to_json_object(field, datum_ref)
.map_err(|e| SinkError::JsonParse(e.to_string()))?;
mappings.insert(key, value);
}
Ok(mappings)
}

fn pk_to_json(
row: RowRef<'_>,
schema: &[Field],
Expand Down Expand Up @@ -611,6 +518,7 @@ impl KafkaTransactionConductor {
mod test {
use maplit::hashmap;
use risingwave_common::test_prelude::StreamChunkTestExt;
use risingwave_common::types::DataType;

use super::*;

Expand Down
100 changes: 97 additions & 3 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ use std::collections::HashMap;
use anyhow::anyhow;
use async_trait::async_trait;
use enum_as_inner::EnumAsInner;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::array::{ArrayError, ArrayResult, RowRef, StreamChunk};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::row::Row;
use risingwave_common::types::to_text::ToText;
use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_rpc_client::error::RpcError;
use serde::{Deserialize, Serialize};
use serde_json::{json, Map, Value};
use thiserror::Error;
pub use tracing;

Expand All @@ -37,7 +42,6 @@ use crate::sink::kafka::{KafkaConfig, KafkaSink, KAFKA_SINK};
use crate::sink::redis::{RedisConfig, RedisSink};
use crate::sink::remote::{RemoteConfig, RemoteSink};
use crate::ConnectorParams;

pub const SINK_TYPE_OPTION: &str = "type";
pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
Expand Down Expand Up @@ -257,3 +261,93 @@ impl From<SinkError> for RwError {
ErrorCode::SinkError(Box::new(e)).into()
}
}

pub fn record_to_json(row: RowRef<'_>, schema: &[Field]) -> Result<Map<String, Value>> {
let mut mappings = Map::with_capacity(schema.len());
for (field, datum_ref) in schema.iter().zip_eq_fast(row.iter()) {
let key = field.name.clone();
let value = datum_to_json_object(field, datum_ref)
.map_err(|e| SinkError::JsonParse(e.to_string()))?;
mappings.insert(key, value);
}
Ok(mappings)
}

fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult<Value> {
let scalar_ref = match datum {
None => return Ok(Value::Null),
Some(datum) => datum,
};

let data_type = field.data_type();

tracing::debug!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);

let value = match (data_type, scalar_ref) {
(DataType::Boolean, ScalarRefImpl::Bool(v)) => {
json!(v)
}
(DataType::Int16, ScalarRefImpl::Int16(v)) => {
json!(v)
}
(DataType::Int32, ScalarRefImpl::Int32(v)) => {
json!(v)
}
(DataType::Int64, ScalarRefImpl::Int64(v)) => {
json!(v)
}
(DataType::Float32, ScalarRefImpl::Float32(v)) => {
json!(f32::from(v))
}
(DataType::Float64, ScalarRefImpl::Float64(v)) => {
json!(f64::from(v))
}
(DataType::Varchar, ScalarRefImpl::Utf8(v)) => {
json!(v)
}
(DataType::Decimal, ScalarRefImpl::Decimal(v)) => {
// fixme
json!(v.to_text())
}
(
dt @ DataType::Date
| dt @ DataType::Time
| dt @ DataType::Timestamp
| dt @ DataType::Timestamptz
| dt @ DataType::Interval
| dt @ DataType::Bytea,
scalar,
) => {
json!(scalar.to_text_with_type(&dt))
}
(DataType::List { datatype }, ScalarRefImpl::List(list_ref)) => {
let mut vec = Vec::with_capacity(list_ref.values_ref().len());
let inner_field = Field::unnamed(Box::<DataType>::into_inner(datatype));
for sub_datum_ref in list_ref.values_ref() {
let value = datum_to_json_object(&inner_field, sub_datum_ref)?;
vec.push(value);
}
json!(vec)
}
(DataType::Struct(st), ScalarRefImpl::Struct(struct_ref)) => {
let mut map = Map::with_capacity(st.fields.len());
for (sub_datum_ref, sub_field) in struct_ref.fields_ref().into_iter().zip_eq_fast(
st.fields
.iter()
.zip_eq_fast(st.field_names.iter())
.map(|(dt, name)| Field::with_name(dt.clone(), name)),
) {
let value = datum_to_json_object(&sub_field, sub_datum_ref)?;
map.insert(sub_field.name.clone(), value);
}
json!(map)
}
_ => {
return Err(ArrayError::internal(
"datum_to_json_object: unsupported data type".to_string(),
));
}
};

Ok(value)
}
44 changes: 2 additions & 42 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@ use risingwave_common::array::StreamChunk;
#[cfg(test)]
use risingwave_common::catalog::Field;
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
#[cfg(test)]
use risingwave_common::types::DataType;
use risingwave_common::types::{DatumRef, ScalarRefImpl};
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::connector_service::sink_stream_request::write_batch::json_payload::RowOp;
use risingwave_pb::connector_service::sink_stream_request::write_batch::{JsonPayload, Payload};
use risingwave_pb::connector_service::sink_stream_request::{
Expand All @@ -35,14 +32,12 @@ use risingwave_pb::connector_service::sink_stream_request::{
use risingwave_pb::connector_service::table_schema::Column;
use risingwave_pb::connector_service::{SinkResponse, SinkStreamRequest, TableSchema};
use risingwave_rpc_client::ConnectorClient;
use serde_json::Value;
use serde_json::Value::Number;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio_stream::StreamExt;
use tonic::{Status, Streaming};

use super::catalog::SinkCatalog;
use crate::sink::{Result, Sink, SinkError};
use crate::sink::{record_to_json, Result, Sink, SinkError};
use crate::ConnectorParams;

pub const VALID_REMOTE_SINKS: [&str; 3] = ["jdbc", "file", "iceberg"];
Expand Down Expand Up @@ -254,13 +249,7 @@ impl<const APPEND_ONLY: bool> Sink for RemoteSink<APPEND_ONLY> {
async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
let mut row_ops = vec![];
for (op, row_ref) in chunk.rows() {
let mut map = serde_json::Map::new();
row_ref
.iter()
.zip_eq_fast(self.schema.fields.iter())
.for_each(|(v, f)| {
map.insert(f.name.clone(), parse_datum(v));
});
let map = record_to_json(row_ref, &self.schema.fields)?;
let row_op = RowOp {
op_type: op.to_protobuf() as i32,
line: serde_json::to_string(&map)
Expand Down Expand Up @@ -319,35 +308,6 @@ impl<const APPEND_ONLY: bool> Sink for RemoteSink<APPEND_ONLY> {
}
}

fn parse_datum(datum: DatumRef<'_>) -> Value {
match datum {
None => Value::Null,
Some(ScalarRefImpl::Int32(v)) => Value::from(v),
Some(ScalarRefImpl::Int64(v)) => Value::from(v),
Some(ScalarRefImpl::Float32(v)) => Value::from(v.into_inner()),
Some(ScalarRefImpl::Float64(v)) => Value::from(v.into_inner()),
Some(ScalarRefImpl::Decimal(v)) => Number(v.to_string().parse().unwrap()),
Some(ScalarRefImpl::Utf8(v)) => Value::from(v),
Some(ScalarRefImpl::Bool(v)) => Value::from(v),
Some(ScalarRefImpl::NaiveDate(v)) => Value::from(v.to_string()),
Some(ScalarRefImpl::NaiveTime(v)) => Value::from(v.to_string()),
Some(ScalarRefImpl::Interval(v)) => Value::from(v.to_string()),
Some(ScalarRefImpl::Struct(v)) => Value::from(
v.fields_ref()
.iter()
.map(|v| parse_datum(*v))
.collect::<Vec<_>>(),
),
Some(ScalarRefImpl::List(v)) => Value::from(
v.values_ref()
.iter()
.map(|v| parse_datum(*v))
.collect::<Vec<_>>(),
),
_ => unimplemented!(),
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;
Expand Down

0 comments on commit 8c95702

Please sign in to comment.