Skip to content

Commit

Permalink
feat(meta): introduce sink validation in meta (#8417)
Browse files Browse the repository at this point in the history
  • Loading branch information
xx01cyx authored Mar 14, 2023
1 parent fbcd407 commit 8183b41
Show file tree
Hide file tree
Showing 12 changed files with 296 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ public void handle(ConnectorServiceProto.ValidateSinkRequest request) {
ConnectorServiceProto.ValidateSinkResponse.newBuilder()
.setError(
ConnectorServiceProto.ValidationError.newBuilder()
.setErrorMessage(e.toString())
.setErrorMessage(e.getMessage())
.build())
.build());
responseObserver.onCompleted();
}

responseObserver.onNext(ConnectorServiceProto.ValidateSinkResponse.newBuilder().build());
responseObserver.onCompleted();
}
}
23 changes: 11 additions & 12 deletions src/common/src/util/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
use std::net::SocketAddr;
use std::str::FromStr;

use anyhow::anyhow;
use risingwave_pb::common::HostAddress as ProstHostAddress;

use crate::error::{internal_error, Result};

/// General host address and port.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct HostAddr {
Expand All @@ -41,33 +40,33 @@ impl From<SocketAddr> for HostAddr {
}

impl TryFrom<&str> for HostAddr {
type Error = crate::error::RwError;
type Error = anyhow::Error;

fn try_from(s: &str) -> Result<Self> {
let addr = url::Url::parse(&format!("http://{}", s))
.map_err(|e| internal_error(format!("{}: {}", e, s)))?;
fn try_from(s: &str) -> Result<Self, Self::Error> {
let addr =
url::Url::parse(&format!("http://{}", s)).map_err(|e| anyhow!("{}: {}", e, s))?;
Ok(HostAddr {
host: addr
.host()
.ok_or_else(|| internal_error("invalid host"))?
.ok_or_else(|| anyhow!("invalid host"))?
.to_string(),
port: addr.port().ok_or_else(|| internal_error("invalid port"))?,
port: addr.port().ok_or_else(|| anyhow!("invalid port"))?,
})
}
}

impl TryFrom<&String> for HostAddr {
type Error = crate::error::RwError;
type Error = anyhow::Error;

fn try_from(s: &String) -> Result<Self> {
fn try_from(s: &String) -> Result<Self, Self::Error> {
Self::try_from(s.as_str())
}
}

impl FromStr for HostAddr {
type Err = crate::error::RwError;
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self> {
fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::try_from(s)
}
}
Expand Down
17 changes: 16 additions & 1 deletion src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ pub mod desc;
use std::collections::HashMap;

use itertools::Itertools;
use risingwave_common::catalog::{ColumnCatalog, DatabaseId, SchemaId, TableId, UserId};
use risingwave_common::catalog::{
ColumnCatalog, DatabaseId, Field, Schema, SchemaId, TableId, UserId,
};
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::{Sink as ProstSink, SinkType as ProstSinkType};

Expand Down Expand Up @@ -165,6 +167,19 @@ impl SinkCatalog {
sink_type: self.sink_type.to_proto() as i32,
}
}

pub fn schema(&self) -> Schema {
let fields = self
.columns
.iter()
.map(|column| Field::from(column.column_desc.clone()))
.collect_vec();
Schema { fields }
}

pub fn pk_indices(&self) -> Vec<usize> {
self.pk.iter().map(|k| k.column_index).collect_vec()
}
}

impl From<ProstSink> for SinkCatalog {
Expand Down
35 changes: 33 additions & 2 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;
pub use tracing;

use self::catalog::SinkType;
use self::catalog::{SinkCatalog, SinkType};
use crate::sink::console::{ConsoleConfig, ConsoleSink, CONSOLE_SINK};
use crate::sink::kafka::{KafkaConfig, KafkaSink, KAFKA_SINK};
use crate::sink::redis::{RedisConfig, RedisSink};
Expand Down Expand Up @@ -160,6 +160,37 @@ impl SinkImpl {
SinkConfig::BlackHole => SinkImpl::Blackhole,
})
}

pub async fn validate(
cfg: SinkConfig,
sink_catalog: SinkCatalog,
connector_rpc_endpoint: Option<String>,
) -> Result<()> {
match cfg {
SinkConfig::Redis(cfg) => RedisSink::new(cfg, sink_catalog.schema()).map(|_| ()),
SinkConfig::Kafka(cfg) => {
// We simply call `KafkaSink::new` here to validate a Kafka sink.
if sink_catalog.sink_type.is_append_only() {
KafkaSink::<true>::new(*cfg, sink_catalog.schema(), sink_catalog.pk_indices())
.await
.map(|_| ())
} else {
KafkaSink::<false>::new(*cfg, sink_catalog.schema(), sink_catalog.pk_indices())
.await
.map(|_| ())
}
}
SinkConfig::Remote(cfg) => {
if sink_catalog.sink_type.is_append_only() {
RemoteSink::<true>::validate(cfg, sink_catalog, connector_rpc_endpoint).await
} else {
RemoteSink::<false>::validate(cfg, sink_catalog, connector_rpc_endpoint).await
}
}
SinkConfig::Console(_) => Ok(()),
SinkConfig::BlackHole => Ok(()),
}
}
}

macro_rules! impl_sink {
Expand Down Expand Up @@ -222,7 +253,7 @@ pub enum SinkError {

impl From<RpcError> for SinkError {
fn from(value: RpcError) -> Self {
SinkError::Remote(format!("{:?}", value))
SinkError::Remote(format!("{}", value))
}
}

Expand Down
Loading

0 comments on commit 8183b41

Please sign in to comment.