Skip to content

Commit

Permalink
feat: use opendal as the s3 sdk by default (#18011) (#18201)
Browse files Browse the repository at this point in the history
Co-authored-by: Zhanxiang (Patrick) Huang <[email protected]>
  • Loading branch information
github-actions[bot] and hzxa21 authored Aug 26, 2024
1 parent 5aeb188 commit 0228107
Show file tree
Hide file tree
Showing 15 changed files with 123 additions and 83 deletions.
12 changes: 2 additions & 10 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ pub struct S3ObjectStoreDeveloperConfig {
)]
pub retryable_service_error_codes: Vec<String>,

// TODO: the following field will be deprecated after opendal is stablized
// TODO: deprecate this config when we are completely deprecate aws sdk.
#[serde(default = "default::object_store_config::s3::developer::use_opendal")]
pub use_opendal: bool,
}
Expand Down Expand Up @@ -2167,10 +2167,6 @@ pub mod default {
}

pub mod developer {
use crate::util::env_var::env_var_is_true_or;

const RW_USE_OPENDAL_FOR_S3: &str = "RW_USE_OPENDAL_FOR_S3";

pub fn retry_unknown_service_error() -> bool {
false
}
Expand All @@ -2180,11 +2176,7 @@ pub mod default {
}

pub fn use_opendal() -> bool {
// TODO: deprecate this config when we are completely switch from aws sdk to opendal.
// The reason why we use !env_var_is_false_or(RW_USE_OPENDAL_FOR_S3, false) here is
// 1. Maintain compatibility so that there is no behavior change in cluster with RW_USE_OPENDAL_FOR_S3 set.
// 2. Change the default behavior to use opendal for s3 if RW_USE_OPENDAL_FOR_S3 is not set.
env_var_is_true_or(RW_USE_OPENDAL_FOR_S3, false)
true
}
}
}
Expand Down
13 changes: 0 additions & 13 deletions src/common/src/util/env_var.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,3 @@ pub fn env_var_is_true_or(key: impl AsRef<OsStr>, default: bool) -> bool {
})
.unwrap_or(default)
}

/// Checks whether the environment variable `key` is set to `false` or `f` or `0`.
///
/// Returns `default` if the environment variable is not set, or contains invalid characters.
pub fn env_var_is_false_or(key: impl AsRef<OsStr>, default: bool) -> bool {
env::var(key)
.map(|value| {
["0", "f", "false"]
.iter()
.any(|&s| value.eq_ignore_ascii_case(s))
})
.unwrap_or(default)
}
2 changes: 1 addition & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ identity_resolution_timeout_s = 5
[storage.object_store.s3.developer]
retry_unknown_service_error = false
retryable_service_error_codes = ["SlowDown", "TooManyRequests"]
use_opendal = false
use_opendal = true

[system]
barrier_interval_ms = 1000
Expand Down
8 changes: 6 additions & 2 deletions src/object_store/src/object/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,18 @@ impl ObjectError {
false
}

pub fn should_retry(&self) -> bool {
pub fn should_retry(&self, retry_opendal_s3_unknown_error: bool) -> bool {
match self.inner() {
ObjectErrorInner::S3 {
inner: _,
should_retry,
} => *should_retry,

ObjectErrorInner::Opendal(e) => e.is_temporary(),
ObjectErrorInner::Opendal(e) => {
e.is_temporary()
|| (retry_opendal_s3_unknown_error
&& e.kind() == opendal::ErrorKind::Unexpected)
}

ObjectErrorInner::Timeout(_) => true,

Expand Down
45 changes: 37 additions & 8 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
let operation_type = OperationType::Upload;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();

self.object_store_metrics
.write_bytes
Expand All @@ -578,7 +579,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type(), operation_type_str])
.with_label_values(&[media_type, operation_type_str])
.start_timer();

let builder = || async {
Expand All @@ -593,6 +594,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand Down Expand Up @@ -630,10 +632,12 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
let operation_type = OperationType::Read;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();

let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type(), operation_type_str])
.with_label_values(&[media_type, operation_type_str])
.start_timer();

let builder = || async {
Expand All @@ -648,6 +652,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand Down Expand Up @@ -701,6 +706,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand All @@ -719,10 +725,11 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
let operation_type = OperationType::Metadata;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();
let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type(), operation_type_str])
.with_label_values(&[media_type, operation_type_str])
.start_timer();

let builder = || async {
Expand All @@ -737,6 +744,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand All @@ -747,10 +755,12 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub async fn delete(&self, path: &str) -> ObjectResult<()> {
let operation_type = OperationType::Delete;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();

let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type(), operation_type_str])
.with_label_values(&[media_type, operation_type_str])
.start_timer();

let builder = || async {
Expand All @@ -765,6 +775,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand All @@ -775,6 +786,8 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
let operation_type = OperationType::DeleteObjects;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();

let _timer = self
.object_store_metrics
.operation_latency
Expand All @@ -793,6 +806,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand All @@ -803,11 +817,12 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
let operation_type = OperationType::List;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();

let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type(), operation_type_str])
.with_label_values(&[media_type, operation_type_str])
.start_timer();

let builder = || async {
Expand All @@ -822,6 +837,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand Down Expand Up @@ -1101,20 +1117,26 @@ struct RetryCondition {
operation_type: OperationType,
retry_count: usize,
metrics: Arc<ObjectStoreMetrics>,
retry_opendal_s3_unknown_error: bool,
}

impl RetryCondition {
fn new(operation_type: OperationType, metrics: Arc<ObjectStoreMetrics>) -> Self {
fn new(
operation_type: OperationType,
metrics: Arc<ObjectStoreMetrics>,
retry_opendal_s3_unknown_error: bool,
) -> Self {
Self {
operation_type,
retry_count: 0,
metrics,
retry_opendal_s3_unknown_error,
}
}

#[inline(always)]
fn should_retry_inner(&mut self, err: &ObjectError) -> bool {
let should_retry = err.should_retry();
let should_retry = err.should_retry(self.retry_opendal_s3_unknown_error);
if should_retry {
self.retry_count += 1;
}
Expand Down Expand Up @@ -1145,6 +1167,7 @@ async fn retry_request<F, T, B>(
config: &ObjectStoreConfig,
operation_type: OperationType,
object_store_metrics: Arc<ObjectStoreMetrics>,
media_type: &'static str,
) -> ObjectResult<T>
where
B: Fn() -> F,
Expand All @@ -1155,7 +1178,13 @@ where
Duration::from_millis(get_attempt_timeout_by_type(config, operation_type));
let operation_type_str = operation_type.as_str();

let retry_condition = RetryCondition::new(operation_type, object_store_metrics);
let retry_condition = RetryCondition::new(
operation_type,
object_store_metrics,
(config.s3.developer.retry_unknown_service_error || config.s3.retry_unknown_service_error)
&& (media_type == opendal_engine::MediaType::S3.as_str()
|| media_type == opendal_engine::MediaType::Minio.as_str()),
);

let f = || async {
let future = builder();
Expand Down
4 changes: 2 additions & 2 deletions src/object_store/src/object/opendal_engine/azblob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use opendal::services::Azblob;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{EngineType, OpendalObjectStore};
use super::{MediaType, OpendalObjectStore};
use crate::object::object_metrics::ObjectStoreMetrics;
use crate::object::ObjectResult;

Expand Down Expand Up @@ -47,7 +47,7 @@ impl OpendalObjectStore {
.finish();
Ok(Self {
op,
engine_type: EngineType::Azblob,
media_type: MediaType::Azblob,
config,
metrics,
})
Expand Down
4 changes: 2 additions & 2 deletions src/object_store/src/object/opendal_engine/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use opendal::services::Fs;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{EngineType, OpendalObjectStore};
use super::{MediaType, OpendalObjectStore};
use crate::object::object_metrics::ObjectStoreMetrics;
use crate::object::opendal_engine::ATOMIC_WRITE_DIR;
use crate::object::ObjectResult;
Expand All @@ -44,7 +44,7 @@ impl OpendalObjectStore {
.finish();
Ok(Self {
op,
engine_type: EngineType::Fs,
media_type: MediaType::Fs,
config,
metrics,
})
Expand Down
4 changes: 2 additions & 2 deletions src/object_store/src/object/opendal_engine/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use opendal::services::Gcs;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{EngineType, OpendalObjectStore};
use super::{MediaType, OpendalObjectStore};
use crate::object::object_metrics::ObjectStoreMetrics;
use crate::object::ObjectResult;

Expand Down Expand Up @@ -49,7 +49,7 @@ impl OpendalObjectStore {
.finish();
Ok(Self {
op,
engine_type: EngineType::Gcs,
media_type: MediaType::Gcs,
config,
metrics,
})
Expand Down
4 changes: 2 additions & 2 deletions src/object_store/src/object/opendal_engine/hdfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use opendal::services::Hdfs;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{EngineType, OpendalObjectStore};
use super::{MediaType, OpendalObjectStore};
// use crate::object::opendal_engine::ATOMIC_WRITE_DIR;
use crate::object::ObjectResult;

Expand Down Expand Up @@ -53,7 +53,7 @@ impl OpendalObjectStore {
.finish();
Ok(Self {
op,
engine_type: EngineType::Hdfs,
media_type: MediaType::Hdfs,
config,
metrics,
})
Expand Down
4 changes: 2 additions & 2 deletions src/object_store/src/object/opendal_engine/obs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use opendal::services::Obs;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{EngineType, OpendalObjectStore};
use super::{MediaType, OpendalObjectStore};
use crate::object::object_metrics::ObjectStoreMetrics;
use crate::object::ObjectResult;

Expand Down Expand Up @@ -55,7 +55,7 @@ impl OpendalObjectStore {
.finish();
Ok(Self {
op,
engine_type: EngineType::Obs,
media_type: MediaType::Obs,
config,
metrics,
})
Expand Down
Loading

0 comments on commit 0228107

Please sign in to comment.