From de85d82ffa01b36799f4b9898fe8488446a6e682 Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Wed, 28 Aug 2024 18:14:47 +0800 Subject: [PATCH] feat(connector): support azblob file source (#18295) --- src/connector/src/macros.rs | 1 + .../src/parser/additional_columns.rs | 6 +- src/connector/src/source/base.rs | 4 +- .../opendal_source/azblob_source.rs | 78 +++++++++++++++++++ .../source/filesystem/opendal_source/mod.rs | 49 ++++++++++++ .../opendal_source/opendal_enumerator.rs | 2 +- src/connector/src/source/mod.rs | 2 +- src/connector/src/source/reader/reader.rs | 8 +- src/connector/src/with_options.rs | 4 +- src/connector/with_options_source.yaml | 24 ++++++ src/frontend/src/handler/create_source.rs | 7 +- src/frontend/src/scheduler/plan_fragmenter.rs | 17 +++- .../src/executor/source/fetch_executor.rs | 7 +- src/stream/src/from_proto/source/fs_fetch.rs | 11 ++- 14 files changed, 207 insertions(+), 13 deletions(-) create mode 100644 src/connector/src/source/filesystem/opendal_source/azblob_source.rs diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index b422a029fa76..1dc304c651a8 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -39,6 +39,7 @@ macro_rules! for_all_classified_sources { { Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> }, { OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> }, { PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> }, + { Azblob, $crate::source::filesystem::opendal_source::AzblobProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalAzblob> }, { Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit}, { Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit} } diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index 54b1120d0643..c30f5f74ba39 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -31,8 +31,8 @@ use risingwave_pb::plan_common::{ use crate::error::ConnectorResult; use crate::source::cdc::MONGODB_CDC_CONNECTOR; use crate::source::{ - GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR, - S3_CONNECTOR, + AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, + POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, }; // Hidden additional columns connectors which do not support `include` syntax. @@ -57,6 +57,8 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock OpendalEnumerator { + /// create opendal azblob source. + pub fn new_azblob_source(azblob_properties: AzblobProperties) -> ConnectorResult { + // Create azblob builder. + let mut builder = Azblob::default(); + + builder.container(&azblob_properties.container_name); + + builder.endpoint(&azblob_properties.endpoint_url); + + if let Some(account_name) = azblob_properties.account_name { + builder.account_name(&account_name); + } else { + tracing::warn!( + "account_name azblob is not set, container {}", + azblob_properties.container_name + ); + } + + if let Some(account_key) = azblob_properties.account_key { + builder.account_key(&account_key); + } else { + tracing::warn!( + "account_key azblob is not set, container {}", + azblob_properties.container_name + ); + } + let op: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + + let (prefix, matcher) = if let Some(pattern) = azblob_properties.match_pattern.as_ref() { + let prefix = get_prefix(pattern); + let matcher = glob::Pattern::new(pattern) + .with_context(|| format!("Invalid match_pattern: {}", pattern))?; + (Some(prefix), Some(matcher)) + } else { + (None, None) + }; + + let compression_format = azblob_properties.compression_format; + + Ok(Self { + op, + prefix, + matcher, + marker: PhantomData, + compression_format, + }) + } +} diff --git a/src/connector/src/source/filesystem/opendal_source/mod.rs b/src/connector/src/source/filesystem/opendal_source/mod.rs index 78c6ebf4cd8c..1efeee0908e7 100644 --- a/src/connector/src/source/filesystem/opendal_source/mod.rs +++ b/src/connector/src/source/filesystem/opendal_source/mod.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; +pub mod azblob_source; pub mod gcs_source; pub mod posix_fs_source; pub mod s3_source; @@ -31,6 +32,7 @@ use super::OpendalFsSplit; use crate::error::ConnectorResult; use crate::source::{SourceProperties, UnknownFields}; +pub const AZBLOB_CONNECTOR: &str = "azblob"; pub const GCS_CONNECTOR: &str = "gcs"; // The new s3_v2 will use opendal. pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2"; @@ -168,3 +170,50 @@ impl SourceProperties for PosixFsProperties { const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR; } + +#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)] +pub struct AzblobProperties { + #[serde(rename = "azblob.container_name")] + pub container_name: String, + + #[serde(rename = "azblob.credentials.account_name", default)] + pub account_name: Option, + #[serde(rename = "azblob.credentials.account_key", default)] + pub account_key: Option, + #[serde(rename = "azblob.endpoint_url")] + pub endpoint_url: String, + + #[serde(rename = "match_pattern", default)] + pub match_pattern: Option, + + #[serde(flatten)] + pub unknown_fields: HashMap, + + #[serde(rename = "compression_format", default = "Default::default")] + pub compression_format: CompressionFormat, +} + +impl UnknownFields for AzblobProperties { + fn unknown_fields(&self) -> HashMap { + self.unknown_fields.clone() + } +} + +impl SourceProperties for AzblobProperties { + type Split = OpendalFsSplit; + type SplitEnumerator = OpendalEnumerator; + type SplitReader = OpendalReader; + + const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR; +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct OpendalAzblob; + +impl OpendalSource for OpendalAzblob { + type Properties = AzblobProperties; + + fn new_enumerator(properties: Self::Properties) -> ConnectorResult> { + OpendalEnumerator::new_azblob_source(properties) + } +} diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs index 864d1de56c7b..68828cc566c9 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs @@ -63,7 +63,7 @@ impl OpendalEnumerator { let object_lister = self .op .lister_with(prefix) - .recursive(true) + .recursive(false) .metakey(Metakey::ContentLength | Metakey::LastModified) .await?; let stream = stream::unfold(object_lister, |mut object_lister| async move { diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index ed8842e70825..dc965c9274ff 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -45,7 +45,7 @@ use risingwave_common::array::{Array, ArrayRef}; use thiserror_ext::AsReport; pub use crate::source::filesystem::opendal_source::{ - GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, + AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, }; pub use crate::source::filesystem::S3_CONNECTOR; pub use crate::source::nexmark::NEXMARK_CONNECTOR; diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index 10d549df0c49..d27e14253dd4 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -29,7 +29,7 @@ use crate::error::ConnectorResult; use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; use crate::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; use crate::source::filesystem::opendal_source::{ - OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource, + OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource, }; use crate::source::filesystem::{FsPageItem, OpendalFsSplit}; use crate::source::{ @@ -95,6 +95,11 @@ impl SourceReader { OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?; Ok(build_opendal_fs_list_stream(lister)) } + ConnectorProperties::Azblob(prop) => { + let lister: OpendalEnumerator = + OpendalEnumerator::new_azblob_source(*prop)?; + Ok(build_opendal_fs_list_stream(lister)) + } ConnectorProperties::PosixFs(prop) => { let lister: OpendalEnumerator = OpendalEnumerator::new_posix_fs_source(*prop)?; @@ -192,6 +197,7 @@ async fn build_opendal_fs_list_stream(lister: OpendalEnumera .map(|m| m.matches(&res.name)) .unwrap_or(true) { + println!("这里 res{:?}", res.name); yield res } else { // Currrntly due to the lack of prefix list, we just skip the unmatched files. diff --git a/src/connector/src/with_options.rs b/src/connector/src/with_options.rs index eef5ccbd9cbf..a3e65ca16f6d 100644 --- a/src/connector/src/with_options.rs +++ b/src/connector/src/with_options.rs @@ -20,7 +20,8 @@ use crate::sink::catalog::SinkFormatDesc; use crate::source::cdc::external::CdcTableType; use crate::source::iceberg::ICEBERG_CONNECTOR; use crate::source::{ - GCS_CONNECTOR, KAFKA_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, UPSTREAM_SOURCE_KEY, + AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, + UPSTREAM_SOURCE_KEY, }; /// Marker trait for `WITH` options. Only for `#[derive(WithOptions)]`, should not be used manually. @@ -142,6 +143,7 @@ pub trait WithPropertiesExt: Get + Sized { s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR) || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR) || s.eq_ignore_ascii_case(GCS_CONNECTOR) + || s.eq_ignore_ascii_case(AZBLOB_CONNECTOR) }) .unwrap_or(false) } diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 4a208465265e..fc7e09741e2c 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -1,6 +1,30 @@ # THIS FILE IS AUTO_GENERATED. DO NOT EDIT # UPDATE WITH: ./risedev generate-with-options +AzblobProperties: + fields: + - name: azblob.container_name + field_type: String + required: true + - name: azblob.credentials.account_name + field_type: String + required: false + default: Default::default + - name: azblob.credentials.account_key + field_type: String + required: false + default: Default::default + - name: azblob.endpoint_url + field_type: String + required: true + - name: match_pattern + field_type: String + required: false + default: Default::default + - name: compression_format + field_type: CompressionFormat + required: false + default: Default::default DatagenProperties: fields: - name: datagen.split.num diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index c4048dd7cac0..1ba390149d3a 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -54,7 +54,7 @@ use risingwave_connector::source::nexmark::source::{get_event_data_types_with_na use risingwave_connector::source::test_source::TEST_CONNECTOR; pub use risingwave_connector::source::UPSTREAM_SOURCE_KEY; use risingwave_connector::source::{ - ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, + ConnectorProperties, AZBLOB_CONNECTOR, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, }; @@ -1064,7 +1064,10 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock vec![Encode::Csv, Encode::Json, Encode::Parquet], ), GCS_CONNECTOR => hashmap!( - Format::Plain => vec![Encode::Csv, Encode::Json], + Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet], + ), + AZBLOB_CONNECTOR => hashmap!( + Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet], ), POSIX_FS_CONNECTOR => hashmap!( Format::Plain => vec![Encode::Csv], diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 65bfbe09c54b..09e4cbc0bfa0 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -33,7 +33,9 @@ use risingwave_common::hash::table_distribution::TableDistribution; use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::util::scan_range::ScanRange; use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; -use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3}; +use risingwave_connector::source::filesystem::opendal_source::{ + OpendalAzblob, OpendalGcs, OpendalS3, +}; use risingwave_connector::source::iceberg::{IcebergSplitEnumerator, IcebergTimeTravelInfo}; use risingwave_connector::source::kafka::KafkaSplitEnumerator; use risingwave_connector::source::reader::reader::build_opendal_fs_list_for_batch; @@ -330,6 +332,15 @@ impl SourceScanInfo { Ok(SourceScanInfo::Complete(res)) } + ConnectorProperties::Azblob(prop) => { + let lister: OpendalEnumerator = + OpendalEnumerator::new_azblob_source(*prop)?; + let stream = build_opendal_fs_list_for_batch(lister); + let batch_res: Vec<_> = stream.try_collect().await?; + let res = batch_res.into_iter().map(SplitImpl::Azblob).collect_vec(); + + Ok(SourceScanInfo::Complete(res)) + } ConnectorProperties::Iceberg(prop) => { let iceberg_enumerator = IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::dummy().into()) @@ -740,7 +751,9 @@ impl StageGraph { let task_parallelism = match &stage.source_info { Some(SourceScanInfo::Incomplete(source_fetch_info)) => { match source_fetch_info.connector { - ConnectorProperties::Gcs(_) | ConnectorProperties::OpendalS3(_) => (min( + ConnectorProperties::Gcs(_) + | ConnectorProperties::OpendalS3(_) + | ConnectorProperties::Azblob(_) => (min( complete_source_info.split_info().unwrap().len() as u32, (self.batch_parallelism / 2) as u32, )) diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 13580ca49e00..1576d65487d0 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -21,7 +21,7 @@ use risingwave_common::catalog::{ColumnId, TableId}; use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::types::ScalarRef; use risingwave_connector::source::filesystem::opendal_source::{ - OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource, + OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource, }; use risingwave_connector::source::filesystem::OpendalFsSplit; use risingwave_connector::source::reader::desc::SourceDesc; @@ -110,6 +110,11 @@ impl FsFetchExecutor { OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?; SplitImpl::from(split) } + risingwave_connector::source::ConnectorProperties::Azblob(_) => { + let split: OpendalFsSplit = + OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?; + SplitImpl::from(split) + } risingwave_connector::source::ConnectorProperties::PosixFs(_) => { let split: OpendalFsSplit = OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?; diff --git a/src/stream/src/from_proto/source/fs_fetch.rs b/src/stream/src/from_proto/source/fs_fetch.rs index 7a08c2d2f512..a8f6d588904b 100644 --- a/src/stream/src/from_proto/source/fs_fetch.rs +++ b/src/stream/src/from_proto/source/fs_fetch.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use risingwave_common::catalog::TableId; use risingwave_connector::source::filesystem::opendal_source::{ - OpendalGcs, OpendalPosixFs, OpendalS3, + OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, }; use risingwave_connector::source::reader::desc::SourceDescBuilder; use risingwave_connector::source::ConnectorProperties; @@ -104,6 +104,15 @@ impl ExecutorBuilder for FsFetchExecutorBuilder { ) .boxed() } + risingwave_connector::source::ConnectorProperties::Azblob(_) => { + FsFetchExecutor::<_, OpendalAzblob>::new( + params.actor_context.clone(), + stream_source_core, + upstream, + source.rate_limit, + ) + .boxed() + } risingwave_connector::source::ConnectorProperties::PosixFs(_) => { FsFetchExecutor::<_, OpendalPosixFs>::new( params.actor_context.clone(),