Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): support azblob file source (#18295) #18306

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ macro_rules! for_all_classified_sources {
{ Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
{ OpendalS3, $crate::source::filesystem::opendal_source::OpendalS3Properties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalS3> },
{ PosixFs, $crate::source::filesystem::opendal_source::PosixFsProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalPosixFs> },
{ Azblob, $crate::source::filesystem::opendal_source::AzblobProperties, $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalAzblob> },
{ Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit},
{ Iceberg, $crate::source::iceberg::IcebergProperties, $crate::source::iceberg::IcebergSplit}
}
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use risingwave_pb::plan_common::{
use crate::error::ConnectorResult;
use crate::source::cdc::MONGODB_CDC_CONNECTOR;
use crate::source::{
GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR,
S3_CONNECTOR,
AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR,
POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR,
};

// Hidden additional columns connectors which do not support `include` syntax.
Expand All @@ -57,6 +57,8 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet
(OPENDAL_S3_CONNECTOR, HashSet::from(["file", "offset"])),
(S3_CONNECTOR, HashSet::from(["file", "offset"])),
(GCS_CONNECTOR, HashSet::from(["file", "offset"])),
(AZBLOB_CONNECTOR, HashSet::from(["file", "offset"])),
(POSIX_FS_CONNECTOR, HashSet::from(["file", "offset"])),
// mongodb-cdc doesn't support cdc backfill table
(
MONGODB_CDC_CONNECTOR,
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use super::kafka::KafkaMeta;
use super::kinesis::KinesisMeta;
use super::monitor::SourceMetrics;
use super::nexmark::source::message::NexmarkMeta;
use super::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR};
use super::{AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR};
use crate::error::ConnectorResult as Result;
use crate::parser::schema_change::SchemaChangeEnvelope;
use crate::parser::ParserConfig;
Expand Down Expand Up @@ -385,6 +385,7 @@ impl ConnectorProperties {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
|| s.eq_ignore_ascii_case(GCS_CONNECTOR)
|| s.eq_ignore_ascii_case(AZBLOB_CONNECTOR)
})
.unwrap_or(false)
}
Expand Down Expand Up @@ -435,6 +436,7 @@ impl ConnectorProperties {
matches!(self, ConnectorProperties::Kafka(_))
|| matches!(self, ConnectorProperties::OpendalS3(_))
|| matches!(self, ConnectorProperties::Gcs(_))
|| matches!(self, ConnectorProperties::Azblob(_))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::marker::PhantomData;

use anyhow::Context;
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Azblob;
use opendal::Operator;

use super::opendal_enumerator::OpendalEnumerator;
use super::{AzblobProperties, OpendalSource};
use crate::error::ConnectorResult;
use crate::source::filesystem::s3::enumerator::get_prefix;

impl<Src: OpendalSource> OpendalEnumerator<Src> {
/// create opendal azblob source.
pub fn new_azblob_source(azblob_properties: AzblobProperties) -> ConnectorResult<Self> {
// Create azblob builder.
let mut builder = Azblob::default();

builder.container(&azblob_properties.container_name);

builder.endpoint(&azblob_properties.endpoint_url);

if let Some(account_name) = azblob_properties.account_name {
builder.account_name(&account_name);
} else {
tracing::warn!(
"account_name azblob is not set, container {}",
azblob_properties.container_name
);
}

if let Some(account_key) = azblob_properties.account_key {
builder.account_key(&account_key);
} else {
tracing::warn!(
"account_key azblob is not set, container {}",
azblob_properties.container_name
);
}
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
.finish();

let (prefix, matcher) = if let Some(pattern) = azblob_properties.match_pattern.as_ref() {
let prefix = get_prefix(pattern);
let matcher = glob::Pattern::new(pattern)
.with_context(|| format!("Invalid match_pattern: {}", pattern))?;
(Some(prefix), Some(matcher))
} else {
(None, None)
};

let compression_format = azblob_properties.compression_format;

Ok(Self {
op,
prefix,
matcher,
marker: PhantomData,
compression_format,
})
}
}
49 changes: 49 additions & 0 deletions src/connector/src/source/filesystem/opendal_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;

pub mod azblob_source;
pub mod gcs_source;
pub mod posix_fs_source;
pub mod s3_source;
Expand All @@ -31,6 +32,7 @@ use super::OpendalFsSplit;
use crate::error::ConnectorResult;
use crate::source::{SourceProperties, UnknownFields};

pub const AZBLOB_CONNECTOR: &str = "azblob";
pub const GCS_CONNECTOR: &str = "gcs";
// The new s3_v2 will use opendal.
pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
Expand Down Expand Up @@ -168,3 +170,50 @@ impl SourceProperties for PosixFsProperties {

const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;
}

#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct AzblobProperties {
#[serde(rename = "azblob.container_name")]
pub container_name: String,

#[serde(rename = "azblob.credentials.account_name", default)]
pub account_name: Option<String>,
#[serde(rename = "azblob.credentials.account_key", default)]
pub account_key: Option<String>,
#[serde(rename = "azblob.endpoint_url")]
pub endpoint_url: String,

#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,

#[serde(rename = "compression_format", default = "Default::default")]
pub compression_format: CompressionFormat,
}

impl UnknownFields for AzblobProperties {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}

impl SourceProperties for AzblobProperties {
type Split = OpendalFsSplit<OpendalAzblob>;
type SplitEnumerator = OpendalEnumerator<OpendalAzblob>;
type SplitReader = OpendalReader<OpendalAzblob>;

const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR;
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OpendalAzblob;

impl OpendalSource for OpendalAzblob {
type Properties = AzblobProperties;

fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
OpendalEnumerator::new_azblob_source(properties)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl<Src: OpendalSource> OpendalEnumerator<Src> {
let object_lister = self
.op
.lister_with(prefix)
.recursive(true)
.recursive(false)
.metakey(Metakey::ContentLength | Metakey::LastModified)
.await?;
let stream = stream::unfold(object_lister, |mut object_lister| async move {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use risingwave_common::array::{Array, ArrayRef};
use thiserror_ext::AsReport;

pub use crate::source::filesystem::opendal_source::{
GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR,
AZBLOB_CONNECTOR, GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR,
};
pub use crate::source::filesystem::S3_CONNECTOR;
pub use crate::source::nexmark::NEXMARK_CONNECTOR;
Expand Down
8 changes: 7 additions & 1 deletion src/connector/src/source/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::error::ConnectorResult;
use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
use crate::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
use crate::source::filesystem::opendal_source::{
OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
};
use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
use crate::source::{
Expand Down Expand Up @@ -95,6 +95,11 @@ impl SourceReader {
OpendalEnumerator::new_s3_source(prop.s3_properties, prop.assume_role)?;
Ok(build_opendal_fs_list_stream(lister))
}
ConnectorProperties::Azblob(prop) => {
let lister: OpendalEnumerator<OpendalAzblob> =
OpendalEnumerator::new_azblob_source(*prop)?;
Ok(build_opendal_fs_list_stream(lister))
}
ConnectorProperties::PosixFs(prop) => {
let lister: OpendalEnumerator<OpendalPosixFs> =
OpendalEnumerator::new_posix_fs_source(*prop)?;
Expand Down Expand Up @@ -192,6 +197,7 @@ async fn build_opendal_fs_list_stream<Src: OpendalSource>(lister: OpendalEnumera
.map(|m| m.matches(&res.name))
.unwrap_or(true)
{
println!("这里 res{:?}", res.name);
yield res
} else {
// Currrntly due to the lack of prefix list, we just skip the unmatched files.
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use crate::sink::catalog::SinkFormatDesc;
use crate::source::cdc::external::CdcTableType;
use crate::source::iceberg::ICEBERG_CONNECTOR;
use crate::source::{
GCS_CONNECTOR, KAFKA_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, UPSTREAM_SOURCE_KEY,
AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR,
UPSTREAM_SOURCE_KEY,
};

/// Marker trait for `WITH` options. Only for `#[derive(WithOptions)]`, should not be used manually.
Expand Down Expand Up @@ -142,6 +143,7 @@ pub trait WithPropertiesExt: Get + Sized {
s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
|| s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
|| s.eq_ignore_ascii_case(GCS_CONNECTOR)
|| s.eq_ignore_ascii_case(AZBLOB_CONNECTOR)
})
.unwrap_or(false)
}
Expand Down
24 changes: 24 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
# THIS FILE IS AUTO_GENERATED. DO NOT EDIT
# UPDATE WITH: ./risedev generate-with-options

AzblobProperties:
fields:
- name: azblob.container_name
field_type: String
required: true
- name: azblob.credentials.account_name
field_type: String
required: false
default: Default::default
- name: azblob.credentials.account_key
field_type: String
required: false
default: Default::default
- name: azblob.endpoint_url
field_type: String
required: true
- name: match_pattern
field_type: String
required: false
default: Default::default
- name: compression_format
field_type: CompressionFormat
required: false
default: Default::default
DatagenProperties:
fields:
- name: datagen.split.num
Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use risingwave_connector::source::nexmark::source::{get_event_data_types_with_na
use risingwave_connector::source::test_source::TEST_CONNECTOR;
pub use risingwave_connector::source::UPSTREAM_SOURCE_KEY;
use risingwave_connector::source::{
ConnectorProperties, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
ConnectorProperties, AZBLOB_CONNECTOR, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
KINESIS_CONNECTOR, MQTT_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR,
POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR,
};
Expand Down Expand Up @@ -1064,7 +1064,10 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
),
GCS_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Csv, Encode::Json],
Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
),
AZBLOB_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Csv, Encode::Json, Encode::Parquet],
),
POSIX_FS_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Csv],
Expand Down
17 changes: 15 additions & 2 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use risingwave_common::hash::table_distribution::TableDistribution;
use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping};
use risingwave_common::util::scan_range::ScanRange;
use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3};
use risingwave_connector::source::filesystem::opendal_source::{
OpendalAzblob, OpendalGcs, OpendalS3,
};
use risingwave_connector::source::iceberg::{IcebergSplitEnumerator, IcebergTimeTravelInfo};
use risingwave_connector::source::kafka::KafkaSplitEnumerator;
use risingwave_connector::source::reader::reader::build_opendal_fs_list_for_batch;
Expand Down Expand Up @@ -330,6 +332,15 @@ impl SourceScanInfo {

Ok(SourceScanInfo::Complete(res))
}
ConnectorProperties::Azblob(prop) => {
let lister: OpendalEnumerator<OpendalAzblob> =
OpendalEnumerator::new_azblob_source(*prop)?;
let stream = build_opendal_fs_list_for_batch(lister);
let batch_res: Vec<_> = stream.try_collect().await?;
let res = batch_res.into_iter().map(SplitImpl::Azblob).collect_vec();

Ok(SourceScanInfo::Complete(res))
}
ConnectorProperties::Iceberg(prop) => {
let iceberg_enumerator =
IcebergSplitEnumerator::new(*prop, SourceEnumeratorContext::dummy().into())
Expand Down Expand Up @@ -740,7 +751,9 @@ impl StageGraph {
let task_parallelism = match &stage.source_info {
Some(SourceScanInfo::Incomplete(source_fetch_info)) => {
match source_fetch_info.connector {
ConnectorProperties::Gcs(_) | ConnectorProperties::OpendalS3(_) => (min(
ConnectorProperties::Gcs(_)
| ConnectorProperties::OpendalS3(_)
| ConnectorProperties::Azblob(_) => (min(
complete_source_info.split_info().unwrap().len() as u32,
(self.batch_parallelism / 2) as u32,
))
Expand Down
7 changes: 6 additions & 1 deletion src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::catalog::{ColumnId, TableId};
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::types::ScalarRef;
use risingwave_connector::source::filesystem::opendal_source::{
OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3, OpendalSource,
};
use risingwave_connector::source::filesystem::OpendalFsSplit;
use risingwave_connector::source::reader::desc::SourceDesc;
Expand Down Expand Up @@ -110,6 +110,11 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
SplitImpl::from(split)
}
risingwave_connector::source::ConnectorProperties::Azblob(_) => {
let split: OpendalFsSplit<OpendalAzblob> =
OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
SplitImpl::from(split)
}
risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
let split: OpendalFsSplit<OpendalPosixFs> =
OpendalFsSplit::restore_from_json(jsonb_ref.to_owned_scalar())?;
Expand Down
11 changes: 10 additions & 1 deletion src/stream/src/from_proto/source/fs_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use risingwave_common::catalog::TableId;
use risingwave_connector::source::filesystem::opendal_source::{
OpendalGcs, OpendalPosixFs, OpendalS3,
OpendalAzblob, OpendalGcs, OpendalPosixFs, OpendalS3,
};
use risingwave_connector::source::reader::desc::SourceDescBuilder;
use risingwave_connector::source::ConnectorProperties;
Expand Down Expand Up @@ -104,6 +104,15 @@ impl ExecutorBuilder for FsFetchExecutorBuilder {
)
.boxed()
}
risingwave_connector::source::ConnectorProperties::Azblob(_) => {
FsFetchExecutor::<_, OpendalAzblob>::new(
params.actor_context.clone(),
stream_source_core,
upstream,
source.rate_limit,
)
.boxed()
}
risingwave_connector::source::ConnectorProperties::PosixFs(_) => {
FsFetchExecutor::<_, OpendalPosixFs>::new(
params.actor_context.clone(),
Expand Down
Loading