Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use opendal as the s3 sdk by default #18011

Merged
merged 5 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ pub struct S3ObjectStoreDeveloperConfig {
)]
pub retryable_service_error_codes: Vec<String>,

// TODO: the following field will be deprecated after opendal is stablized
// TODO: deprecate this config when we are completely deprecate aws sdk.
#[serde(default = "default::object_store_config::s3::developer::use_opendal")]
pub use_opendal: bool,
}
Expand Down Expand Up @@ -2167,10 +2167,6 @@ pub mod default {
}

pub mod developer {
use crate::util::env_var::env_var_is_true_or;

const RW_USE_OPENDAL_FOR_S3: &str = "RW_USE_OPENDAL_FOR_S3";

pub fn retry_unknown_service_error() -> bool {
false
}
Expand All @@ -2180,11 +2176,7 @@ pub mod default {
}

pub fn use_opendal() -> bool {
// TODO: deprecate this config when we are completely switch from aws sdk to opendal.
// The reason why we use !env_var_is_false_or(RW_USE_OPENDAL_FOR_S3, false) here is
// 1. Maintain compatibility so that there is no behavior change in cluster with RW_USE_OPENDAL_FOR_S3 set.
// 2. Change the default behavior to use opendal for s3 if RW_USE_OPENDAL_FOR_S3 is not set.
env_var_is_true_or(RW_USE_OPENDAL_FOR_S3, false)
true
}
}
}
Expand Down
13 changes: 0 additions & 13 deletions src/common/src/util/env_var.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,3 @@ pub fn env_var_is_true_or(key: impl AsRef<OsStr>, default: bool) -> bool {
})
.unwrap_or(default)
}

/// Checks whether the environment variable `key` is set to `false` or `f` or `0`.
///
/// Returns `default` if the environment variable is not set, or contains invalid characters.
pub fn env_var_is_false_or(key: impl AsRef<OsStr>, default: bool) -> bool {
env::var(key)
.map(|value| {
["0", "f", "false"]
.iter()
.any(|&s| value.eq_ignore_ascii_case(s))
})
.unwrap_or(default)
}
2 changes: 1 addition & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ identity_resolution_timeout_s = 5
[storage.object_store.s3.developer]
retry_unknown_service_error = false
retryable_service_error_codes = ["SlowDown", "TooManyRequests"]
use_opendal = false
use_opendal = true

[system]
barrier_interval_ms = 1000
Expand Down
8 changes: 6 additions & 2 deletions src/object_store/src/object/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,18 @@ impl ObjectError {
false
}

pub fn should_retry(&self) -> bool {
pub fn should_retry(&self, retry_opendal_s3_unknown_error: bool) -> bool {
match self.inner() {
ObjectErrorInner::S3 {
inner: _,
should_retry,
} => *should_retry,

ObjectErrorInner::Opendal(e) => e.is_temporary(),
ObjectErrorInner::Opendal(e) => {
e.is_temporary()
|| (retry_opendal_s3_unknown_error
&& e.kind() == opendal::ErrorKind::Unexpected)
}

ObjectErrorInner::Timeout(_) => true,

Expand Down
45 changes: 37 additions & 8 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
let operation_type = OperationType::Upload;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: How about using the same variable name? media_type and engine_type are too similar.


self.object_store_metrics
.write_bytes
Expand All @@ -578,7 +579,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type(), operation_type_str])
.with_label_values(&[media_type, operation_type_str])
.start_timer();

let builder = || async {
Expand All @@ -593,6 +594,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand Down Expand Up @@ -630,10 +632,12 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
let operation_type = OperationType::Read;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();

let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type(), operation_type_str])
.with_label_values(&[media_type, operation_type_str])
.start_timer();

let builder = || async {
Expand All @@ -648,6 +652,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand Down Expand Up @@ -701,6 +706,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand All @@ -719,10 +725,11 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
let operation_type = OperationType::Metadata;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();
let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type(), operation_type_str])
.with_label_values(&[media_type, operation_type_str])
.start_timer();

let builder = || async {
Expand All @@ -737,6 +744,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand All @@ -747,10 +755,12 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub async fn delete(&self, path: &str) -> ObjectResult<()> {
let operation_type = OperationType::Delete;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();

let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type(), operation_type_str])
.with_label_values(&[media_type, operation_type_str])
.start_timer();

let builder = || async {
Expand All @@ -765,6 +775,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand All @@ -775,6 +786,8 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
let operation_type = OperationType::DeleteObjects;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();

let _timer = self
.object_store_metrics
.operation_latency
Expand All @@ -793,6 +806,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand All @@ -803,11 +817,12 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
let operation_type = OperationType::List;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();

let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type(), operation_type_str])
.with_label_values(&[media_type, operation_type_str])
.start_timer();

let builder = || async {
Expand All @@ -822,6 +837,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand Down Expand Up @@ -1101,20 +1117,26 @@ struct RetryCondition {
operation_type: OperationType,
retry_count: usize,
metrics: Arc<ObjectStoreMetrics>,
retry_opendal_s3_unknown_error: bool,
}

impl RetryCondition {
fn new(operation_type: OperationType, metrics: Arc<ObjectStoreMetrics>) -> Self {
fn new(
operation_type: OperationType,
metrics: Arc<ObjectStoreMetrics>,
retry_opendal_s3_unknown_error: bool,
) -> Self {
Self {
operation_type,
retry_count: 0,
metrics,
retry_opendal_s3_unknown_error,
}
}

#[inline(always)]
fn should_retry_inner(&mut self, err: &ObjectError) -> bool {
let should_retry = err.should_retry();
let should_retry = err.should_retry(self.retry_opendal_s3_unknown_error);
if should_retry {
self.retry_count += 1;
}
Expand Down Expand Up @@ -1145,6 +1167,7 @@ async fn retry_request<F, T, B>(
config: &ObjectStoreConfig,
operation_type: OperationType,
object_store_metrics: Arc<ObjectStoreMetrics>,
media_type: &'static str,
) -> ObjectResult<T>
where
B: Fn() -> F,
Expand All @@ -1155,7 +1178,13 @@ where
Duration::from_millis(get_attempt_timeout_by_type(config, operation_type));
let operation_type_str = operation_type.as_str();

let retry_condition = RetryCondition::new(operation_type, object_store_metrics);
let retry_condition = RetryCondition::new(
operation_type,
object_store_metrics,
(config.s3.developer.retry_unknown_service_error || config.s3.retry_unknown_service_error)
&& (media_type == opendal_engine::EngineType::S3.as_str()
|| media_type == opendal_engine::EngineType::Minio.as_str()),
);

let f = || async {
let future = builder();
Expand Down
30 changes: 18 additions & 12 deletions src/object_store/src/object/opendal_engine/opendal_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ pub enum EngineType {
Fs,
}

impl EngineType {
pub fn as_str(&self) -> &'static str {
match self {
EngineType::Memory => "Memory",
EngineType::Hdfs => "Hdfs",
EngineType::Gcs => "Gcs",
EngineType::Minio => "Minio",
EngineType::S3 => "S3",
EngineType::Obs => "Obs",
EngineType::Oss => "Oss",
EngineType::Webhdfs => "Webhdfs",
EngineType::Azblob => "Azblob",
EngineType::Fs => "Fs",
}
}
}

impl OpendalObjectStore {
/// create opendal memory engine, used for unit tests.
pub fn test_new_memory_engine() -> ObjectResult<Self> {
Expand Down Expand Up @@ -248,18 +265,7 @@ impl ObjectStore for OpendalObjectStore {
}

fn store_media_type(&self) -> &'static str {
match self.engine_type {
EngineType::Memory => "Memory",
EngineType::Hdfs => "Hdfs",
EngineType::Minio => "Minio",
EngineType::S3 => "S3",
EngineType::Gcs => "Gcs",
EngineType::Obs => "Obs",
EngineType::Oss => "Oss",
EngineType::Webhdfs => "Webhdfs",
EngineType::Azblob => "Azblob",
EngineType::Fs => "Fs",
}
self.engine_type.as_str()
}

fn support_streaming_upload(&self) -> bool {
Expand Down
32 changes: 27 additions & 5 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub struct S3StreamingUploader {
}

impl S3StreamingUploader {
const MEDIA_TYPE: &'static str = "s3";

pub fn new(
client: Client,
bucket: String,
Expand Down Expand Up @@ -159,6 +161,7 @@ impl S3StreamingUploader {
&self.config,
OperationType::StreamingUploadInit,
self.metrics.clone(),
Self::MEDIA_TYPE,
)
.await;

Expand Down Expand Up @@ -221,7 +224,14 @@ impl S3StreamingUploader {
})
};

let res = retry_request(builder, &config, operation_type, metrics.clone()).await;
let res = retry_request(
builder,
&config,
operation_type,
metrics.clone(),
Self::MEDIA_TYPE,
)
.await;
try_update_failure_metric(&metrics, &res, operation_type_str);
Ok((part_id, res?))
}));
Expand Down Expand Up @@ -280,7 +290,14 @@ impl S3StreamingUploader {
})
};

let res = retry_request(builder, &self.config, operation_type, self.metrics.clone()).await;
let res = retry_request(
builder,
&self.config,
operation_type,
self.metrics.clone(),
Self::MEDIA_TYPE,
)
.await;
try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
let _res = res?;

Expand Down Expand Up @@ -353,9 +370,14 @@ impl StreamingUploader for S3StreamingUploader {
})
};

let res =
retry_request(builder, &self.config, operation_type, self.metrics.clone())
.await;
let res = retry_request(
builder,
&self.config,
operation_type,
self.metrics.clone(),
Self::MEDIA_TYPE,
)
.await;
try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
res?;
Ok(())
Expand Down
Loading