Skip to content

Commit

Permalink
azdls fileio for iceberg-rust
Browse files Browse the repository at this point in the history
  • Loading branch information
twuebi committed Jul 31, 2024
1 parent 50db431 commit 16c290e
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 4 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ log = "^0.4"
mockito = "^1"
murmur3 = "0.5.2"
once_cell = "1"
opendal = "0.46"
opendal = { git="https://github.com/twuebi/opendal.git", branch="tp/azdls-client-secret" }
ordered-float = "4.0.0"
parquet = "51"
pilota = "0.11.0"
Expand All @@ -79,6 +79,7 @@ serde_derive = "^1.0"
serde_json = "^1.0"
serde_repr = "0.1.16"
serde_with = "3.4.0"
strum = "0.26.3"
tempfile = "3.8"
tokio = { version = "1", features = ["macros"] }
typed-builder = "^0.18"
Expand Down
3 changes: 2 additions & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ lazy_static = { workspace = true }
log = { workspace = true }
murmur3 = { workspace = true }
once_cell = { workspace = true }
opendal = { workspace = true, features = ["services-s3", "services-fs"] }
opendal = { workspace = true, features = ["services-s3", "services-fs", "services-azdls"] }
ordered-float = { workspace = true }
parquet = { workspace = true, features = ["async"] }
reqwest = { workspace = true }
Expand All @@ -63,6 +63,7 @@ serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_repr = { workspace = true }
serde_with = { workspace = true }
strum = { workspace = true }
tokio = { workspace = true }
typed-builder = { workspace = true }
url = { workspace = true }
Expand Down
73 changes: 71 additions & 2 deletions crates/iceberg/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@
use bytes::Bytes;
use std::ops::Range;
use std::{collections::HashMap, sync::Arc};

use std::str::FromStr;
use crate::{error::Result, Error, ErrorKind};
use once_cell::sync::Lazy;
use opendal::{Operator, Scheme};
use url::Url;

/// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3).
/// S3 endopint.
/// S3 endpoint.
pub const S3_ENDPOINT: &str = "s3.endpoint";
/// S3 access key id.
pub const S3_ACCESS_KEY_ID: &str = "s3.access-key-id";
Expand All @@ -78,6 +78,50 @@ static S3_CONFIG_MAPPING: Lazy<HashMap<&'static str, &'static str>> = Lazy::new(
m
});

/// Module containing azdls related structs.
pub mod azdls {
/// Azdls configuration keys with conversions to [`opendal::Operator`] configuration keys.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, strum::EnumString, strum::Display)]
#[strum(serialize_all = "snake_case")]
pub enum ConfigKeys {
/// Az endpoint to use
Endpoint,
/// Az client id, used for client credential flow, created in microsoft app registration
ClientId,
/// Az client secret, used for client credential flow, created in microsoft app registration
ClientSecret,
/// Az tenant id, required for client credential flow
TenantId,
/// Az account key, used for shared key authentication
AccountKey,
/// Az storage account name
AccountName,
/// Az filesystem to use, also known as container
Filesystem,
/// TODO: this is overwritten anyways, why keep?
Root,
/// Az authority host, used for client credential flow
AuthorityHost
}

impl ConfigKeys {
/// Convert to [`opendal::Operator`] configuration key.
pub fn into_opendal_config_key(self) -> &'static str {
match self {
Self::Endpoint => "endpoint",
Self::ClientId => "client_id",
Self::ClientSecret => "client_secret",
Self::TenantId => "tenant_id",
Self::AccountKey => "account_key",
Self::AccountName => "account_name",
Self::Filesystem => "filesystem",
Self::Root => "root",
Self::AuthorityHost => "authority_host"
}
}
}
}

const DEFAULT_ROOT_PATH: &str = "/";

/// FileIO implementation, used to manipulate files in underlying storage.
Expand Down Expand Up @@ -383,6 +427,9 @@ enum Storage {
scheme_str: String,
props: HashMap<String, String>,
},
Azdls {
props: HashMap<String, String>,
},
}

impl Storage {
Expand Down Expand Up @@ -431,6 +478,10 @@ impl Storage {
))
}
}
Storage::Azdls { props } => {

Ok((Operator::via_map(Scheme::Azdls, props.clone())?, &path["azdls://".len()..]))
}
}
}

Expand All @@ -439,6 +490,7 @@ impl Storage {
match scheme {
"file" | "" => Ok(Scheme::Fs),
"s3" | "s3a" => Ok(Scheme::S3),
"azdls" => Ok(Scheme::Azdls),
s => Ok(s.parse::<Scheme>()?),
}
}
Expand Down Expand Up @@ -466,6 +518,23 @@ impl Storage {
props: new_props,
})
}
Scheme::Azdls => {
for prop in file_io_builder.props {
let config_key = azdls::ConfigKeys::from_str(prop.0.as_str()).map_err(|_| {
Error::new(
ErrorKind::DataInvalid,
format!("Invalid azdls config key: {}", prop.0),
)
})?;
new_props.insert(config_key.into_opendal_config_key().to_string(), prop.1);
}
// TODO: validate config, i.e. check that all required fields for auth method are
// present
Ok(Self::Azdls {
props: new_props
})
}

_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("Constructing file io from scheme: {scheme} not supported now",),
Expand Down

0 comments on commit 16c290e

Please sign in to comment.