Skip to content

Commit

Permalink
support azdls
Browse files Browse the repository at this point in the history
  • Loading branch information
twuebi committed Aug 23, 2024
1 parent d7095b9 commit 6b7a584
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 5 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ log = "0.4"
mockito = "1"
murmur3 = "0.5.2"
once_cell = "1"
opendal = "0.49"
opendal = { git="https://github.com/twuebi/opendal.git", rev = "a9e3d88e97" }
ordered-float = "4"
parquet = "52"
pilota = "0.11.2"
Expand All @@ -84,6 +84,7 @@ serde_derive = "1"
serde_json = "1"
serde_repr = "0.1.16"
serde_with = "3.4"
strum = "0.26.3"
tempfile = "3.8"
tokio = { version = "1", default-features = false }
typed-builder = "0.19"
Expand Down
4 changes: 3 additions & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ keywords = ["iceberg"]

[features]
default = ["storage-memory", "storage-fs", "storage-s3", "tokio"]
storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"]
storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-azdls", "storage-gcs"]

storage-memory = ["opendal/services-memory"]
storage-fs = ["opendal/services-fs"]
storage-s3 = ["opendal/services-s3"]
storage-azdls = ["opendal/services-azdls"]
storage-gcs = ["opendal/services-gcs"]

async-std = ["dep:async-std"]
Expand Down Expand Up @@ -74,6 +75,7 @@ serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_repr = { workspace = true }
serde_with = { workspace = true }
strum = { workspace = true }
tokio = { workspace = true, optional = true }
typed-builder = { workspace = true }
url = { workspace = true }
Expand Down
6 changes: 6 additions & 0 deletions crates/iceberg/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ impl FileIO {
Ok(op.delete(relative_path).await?)
}

/// Deletes all files in the directory.
pub async fn remove_all(&self, path: impl AsRef<str>) -> Result<()> {
let (op, relative_path) = self.inner.create_operator(&path)?;
Ok(op.remove_all(relative_path).await?)
}

/// Check file exists.
pub async fn is_exist(&self, path: impl AsRef<str>) -> Result<bool> {
let (op, relative_path) = self.inner.create_operator(&path)?;
Expand Down
4 changes: 4 additions & 0 deletions crates/iceberg/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ pub use storage_s3::*;
pub(crate) mod object_cache;
#[cfg(feature = "storage-fs")]
mod storage_fs;
#[cfg(feature = "storage-azdls")]
mod storage_azdls;
#[cfg(feature = "storage-azdls")]
pub use storage_azdls::ConfigKeys as AzdlsConfigKeys;

#[cfg(feature = "storage-fs")]
use storage_fs::*;
Expand Down
25 changes: 22 additions & 3 deletions crates/iceberg/src/io/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use opendal::services::GcsConfig;
#[cfg(feature = "storage-s3")]
use opendal::services::S3Config;
use opendal::{Operator, Scheme};

#[cfg(feature = "storage-azdls")]
use opendal::services::AzdlsConfig;
#[cfg(feature = "storage-azdls")]
use super::storage_azdls;
use super::FileIOBuilder;
use crate::{Error, ErrorKind};

Expand All @@ -44,6 +47,10 @@ pub(crate) enum Storage {
client: reqwest::Client,
config: Arc<S3Config>,
},
#[cfg(feature = "storage-azdls")]
Azdls {
config: Arc<AzdlsConfig>
},
#[cfg(feature = "storage-gcs")]
Gcs { config: Arc<GcsConfig> },
}
Expand All @@ -69,6 +76,13 @@ impl Storage {
Scheme::Gcs => Ok(Self::Gcs {
config: super::gcs_config_parse(props)?.into(),
}),
#[cfg(feature = "storage-azdls")]
Scheme::Azdls => {

Ok(Self::Azdls {
config: storage_azdls::azdls_config_parse(props)?.into(),
})
}
_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("Constructing file io from scheme: {scheme} not supported now",),
Expand Down Expand Up @@ -147,11 +161,15 @@ impl Storage {
))
}
}
#[cfg(feature = "storage-azdls")]
Storage::Azdls { config } => {
Ok((Operator::from_config(config.as_ref().clone())?.finish(), &path["azdls://".len()..]))
}
#[cfg(all(
not(feature = "storage-s3"),
not(feature = "storage-fs"),
not(feature = "storage-gcs")
))]
not(feature = "storage-gcs"),
not(feature = "storage-azdls")))]
_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
"No storage service has been enabled",
Expand All @@ -165,6 +183,7 @@ impl Storage {
"memory" => Ok(Scheme::Memory),
"file" | "" => Ok(Scheme::Fs),
"s3" | "s3a" => Ok(Scheme::S3),
"azdls" => Ok(Scheme::Azdls),
"gs" => Ok(Scheme::Gcs),
s => Ok(s.parse::<Scheme>()?),
}
Expand Down
50 changes: 50 additions & 0 deletions crates/iceberg/src/io/storage_azdls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::collections::HashMap;
use std::str::FromStr;
use opendal::services::AzdlsConfig;
use crate::{Error, ErrorKind, Result};

/// Azdls configuration keys with conversions to [`opendal::Operator`] configuration keys.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, strum::EnumString, strum::Display)]
#[strum(serialize_all = "snake_case")]
pub enum ConfigKeys {
/// Az endpoint to use
Endpoint,
/// Az client id, used for client credential flow, created in microsoft app registration
ClientId,
/// Az client secret, used for client credential flow, created in microsoft app registration
ClientSecret,
/// Az tenant id, required for client credential flow
TenantId,
/// Az account key, used for shared key authentication
AccountKey,
/// Az storage account name
AccountName,
/// Az filesystem to use, also known as container
Filesystem,
/// Az authority host, used for client credential flow
AuthorityHost
}

pub(crate) fn azdls_config_parse(m: HashMap<String, String>) -> Result<AzdlsConfig> {
let mut cfg = AzdlsConfig::default();
for (k, v) in m.into_iter() {
let config_key = ConfigKeys::from_str(k.as_str()).map_err(|_| {
Error::new(
ErrorKind::DataInvalid,
format!("Invalid azdls config key: {}", k),
)
})?;
match config_key {
ConfigKeys::Endpoint => cfg.endpoint = Some(v),
ConfigKeys::ClientId => cfg.client_id = Some(v),
ConfigKeys::ClientSecret => cfg.client_secret = Some(v),
ConfigKeys::TenantId => cfg.tenant_id = Some(v),
ConfigKeys::AccountKey => cfg.account_key = Some(v),
ConfigKeys::AccountName => cfg.account_name = Some(v),
ConfigKeys::Filesystem => cfg.filesystem = v,
ConfigKeys::AuthorityHost => cfg.authority_host = Some(v)
}
}

Ok(cfg)
}

0 comments on commit 6b7a584

Please sign in to comment.