From 0228107916a1455f561a3f82e3f748aeb3341108 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 26 Aug 2024 11:48:33 +0800 Subject: [PATCH] feat: use opendal as the s3 sdk by default (#18011) (#18201) Co-authored-by: Zhanxiang (Patrick) Huang --- src/common/src/config.rs | 12 +--- src/common/src/util/env_var.rs | 13 ----- src/config/example.toml | 2 +- src/object_store/src/object/error.rs | 8 ++- src/object_store/src/object/mod.rs | 45 +++++++++++--- .../src/object/opendal_engine/azblob.rs | 4 +- .../src/object/opendal_engine/fs.rs | 4 +- .../src/object/opendal_engine/gcs.rs | 4 +- .../src/object/opendal_engine/hdfs.rs | 4 +- .../src/object/opendal_engine/obs.rs | 4 +- .../opendal_engine/opendal_object_store.rs | 58 ++++++++++--------- .../src/object/opendal_engine/opendal_s3.rs | 8 +-- .../src/object/opendal_engine/oss.rs | 4 +- .../src/object/opendal_engine/webhdfs.rs | 4 +- src/object_store/src/object/s3.rs | 32 ++++++++-- 15 files changed, 123 insertions(+), 83 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 50e1ce853ebe..ed67aefff9a1 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1165,7 +1165,7 @@ pub struct S3ObjectStoreDeveloperConfig { )] pub retryable_service_error_codes: Vec, - // TODO: the following field will be deprecated after opendal is stablized + // TODO: deprecate this config when we are completely deprecate aws sdk. #[serde(default = "default::object_store_config::s3::developer::use_opendal")] pub use_opendal: bool, } @@ -2167,10 +2167,6 @@ pub mod default { } pub mod developer { - use crate::util::env_var::env_var_is_true_or; - - const RW_USE_OPENDAL_FOR_S3: &str = "RW_USE_OPENDAL_FOR_S3"; - pub fn retry_unknown_service_error() -> bool { false } @@ -2180,11 +2176,7 @@ pub mod default { } pub fn use_opendal() -> bool { - // TODO: deprecate this config when we are completely switch from aws sdk to opendal. - // The reason why we use !env_var_is_false_or(RW_USE_OPENDAL_FOR_S3, false) here is - // 1. Maintain compatibility so that there is no behavior change in cluster with RW_USE_OPENDAL_FOR_S3 set. - // 2. Change the default behavior to use opendal for s3 if RW_USE_OPENDAL_FOR_S3 is not set. - env_var_is_true_or(RW_USE_OPENDAL_FOR_S3, false) + true } } } diff --git a/src/common/src/util/env_var.rs b/src/common/src/util/env_var.rs index 1b7655dc73b1..ae5870644751 100644 --- a/src/common/src/util/env_var.rs +++ b/src/common/src/util/env_var.rs @@ -34,16 +34,3 @@ pub fn env_var_is_true_or(key: impl AsRef, default: bool) -> bool { }) .unwrap_or(default) } - -/// Checks whether the environment variable `key` is set to `false` or `f` or `0`. -/// -/// Returns `default` if the environment variable is not set, or contains invalid characters. -pub fn env_var_is_false_or(key: impl AsRef, default: bool) -> bool { - env::var(key) - .map(|value| { - ["0", "f", "false"] - .iter() - .any(|&s| value.eq_ignore_ascii_case(s)) - }) - .unwrap_or(default) -} diff --git a/src/config/example.toml b/src/config/example.toml index e9076bf9bf08..c81b35163eaf 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -232,7 +232,7 @@ identity_resolution_timeout_s = 5 [storage.object_store.s3.developer] retry_unknown_service_error = false retryable_service_error_codes = ["SlowDown", "TooManyRequests"] -use_opendal = false +use_opendal = true [system] barrier_interval_ms = 1000 diff --git a/src/object_store/src/object/error.rs b/src/object_store/src/object/error.rs index aa79f53c4c06..11cda88468c4 100644 --- a/src/object_store/src/object/error.rs +++ b/src/object_store/src/object/error.rs @@ -101,14 +101,18 @@ impl ObjectError { false } - pub fn should_retry(&self) -> bool { + pub fn should_retry(&self, retry_opendal_s3_unknown_error: bool) -> bool { match self.inner() { ObjectErrorInner::S3 { inner: _, should_retry, } => *should_retry, - ObjectErrorInner::Opendal(e) => e.is_temporary(), + ObjectErrorInner::Opendal(e) => { + e.is_temporary() + || (retry_opendal_s3_unknown_error + && e.kind() == opendal::ErrorKind::Unexpected) + } ObjectErrorInner::Timeout(_) => true, diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 8ee8dc078fe1..aff197263f8f 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -567,6 +567,7 @@ impl MonitoredObjectStore { pub async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> { let operation_type = OperationType::Upload; let operation_type_str = operation_type.as_str(); + let media_type = self.media_type(); self.object_store_metrics .write_bytes @@ -578,7 +579,7 @@ impl MonitoredObjectStore { let _timer = self .object_store_metrics .operation_latency - .with_label_values(&[self.media_type(), operation_type_str]) + .with_label_values(&[media_type, operation_type_str]) .start_timer(); let builder = || async { @@ -593,6 +594,7 @@ impl MonitoredObjectStore { &self.config, operation_type, self.object_store_metrics.clone(), + media_type, ) .await; @@ -630,10 +632,12 @@ impl MonitoredObjectStore { pub async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult { let operation_type = OperationType::Read; let operation_type_str = operation_type.as_str(); + let media_type = self.media_type(); + let _timer = self .object_store_metrics .operation_latency - .with_label_values(&[self.media_type(), operation_type_str]) + .with_label_values(&[media_type, operation_type_str]) .start_timer(); let builder = || async { @@ -648,6 +652,7 @@ impl MonitoredObjectStore { &self.config, operation_type, self.object_store_metrics.clone(), + media_type, ) .await; @@ -701,6 +706,7 @@ impl MonitoredObjectStore { &self.config, operation_type, self.object_store_metrics.clone(), + media_type, ) .await; @@ -719,10 +725,11 @@ impl MonitoredObjectStore { pub async fn metadata(&self, path: &str) -> ObjectResult { let operation_type = OperationType::Metadata; let operation_type_str = operation_type.as_str(); + let media_type = self.media_type(); let _timer = self .object_store_metrics .operation_latency - .with_label_values(&[self.media_type(), operation_type_str]) + .with_label_values(&[media_type, operation_type_str]) .start_timer(); let builder = || async { @@ -737,6 +744,7 @@ impl MonitoredObjectStore { &self.config, operation_type, self.object_store_metrics.clone(), + media_type, ) .await; @@ -747,10 +755,12 @@ impl MonitoredObjectStore { pub async fn delete(&self, path: &str) -> ObjectResult<()> { let operation_type = OperationType::Delete; let operation_type_str = operation_type.as_str(); + let media_type = self.media_type(); + let _timer = self .object_store_metrics .operation_latency - .with_label_values(&[self.media_type(), operation_type_str]) + .with_label_values(&[media_type, operation_type_str]) .start_timer(); let builder = || async { @@ -765,6 +775,7 @@ impl MonitoredObjectStore { &self.config, operation_type, self.object_store_metrics.clone(), + media_type, ) .await; @@ -775,6 +786,8 @@ impl MonitoredObjectStore { async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> { let operation_type = OperationType::DeleteObjects; let operation_type_str = operation_type.as_str(); + let media_type = self.media_type(); + let _timer = self .object_store_metrics .operation_latency @@ -793,6 +806,7 @@ impl MonitoredObjectStore { &self.config, operation_type, self.object_store_metrics.clone(), + media_type, ) .await; @@ -803,11 +817,12 @@ impl MonitoredObjectStore { pub async fn list(&self, prefix: &str) -> ObjectResult { let operation_type = OperationType::List; let operation_type_str = operation_type.as_str(); + let media_type = self.media_type(); let _timer = self .object_store_metrics .operation_latency - .with_label_values(&[self.media_type(), operation_type_str]) + .with_label_values(&[media_type, operation_type_str]) .start_timer(); let builder = || async { @@ -822,6 +837,7 @@ impl MonitoredObjectStore { &self.config, operation_type, self.object_store_metrics.clone(), + media_type, ) .await; @@ -1101,20 +1117,26 @@ struct RetryCondition { operation_type: OperationType, retry_count: usize, metrics: Arc, + retry_opendal_s3_unknown_error: bool, } impl RetryCondition { - fn new(operation_type: OperationType, metrics: Arc) -> Self { + fn new( + operation_type: OperationType, + metrics: Arc, + retry_opendal_s3_unknown_error: bool, + ) -> Self { Self { operation_type, retry_count: 0, metrics, + retry_opendal_s3_unknown_error, } } #[inline(always)] fn should_retry_inner(&mut self, err: &ObjectError) -> bool { - let should_retry = err.should_retry(); + let should_retry = err.should_retry(self.retry_opendal_s3_unknown_error); if should_retry { self.retry_count += 1; } @@ -1145,6 +1167,7 @@ async fn retry_request( config: &ObjectStoreConfig, operation_type: OperationType, object_store_metrics: Arc, + media_type: &'static str, ) -> ObjectResult where B: Fn() -> F, @@ -1155,7 +1178,13 @@ where Duration::from_millis(get_attempt_timeout_by_type(config, operation_type)); let operation_type_str = operation_type.as_str(); - let retry_condition = RetryCondition::new(operation_type, object_store_metrics); + let retry_condition = RetryCondition::new( + operation_type, + object_store_metrics, + (config.s3.developer.retry_unknown_service_error || config.s3.retry_unknown_service_error) + && (media_type == opendal_engine::MediaType::S3.as_str() + || media_type == opendal_engine::MediaType::Minio.as_str()), + ); let f = || async { let future = builder(); diff --git a/src/object_store/src/object/opendal_engine/azblob.rs b/src/object_store/src/object/opendal_engine/azblob.rs index e584e59aafe8..24ccacb3c649 100644 --- a/src/object_store/src/object/opendal_engine/azblob.rs +++ b/src/object_store/src/object/opendal_engine/azblob.rs @@ -19,7 +19,7 @@ use opendal::services::Azblob; use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; -use super::{EngineType, OpendalObjectStore}; +use super::{MediaType, OpendalObjectStore}; use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::ObjectResult; @@ -47,7 +47,7 @@ impl OpendalObjectStore { .finish(); Ok(Self { op, - engine_type: EngineType::Azblob, + media_type: MediaType::Azblob, config, metrics, }) diff --git a/src/object_store/src/object/opendal_engine/fs.rs b/src/object_store/src/object/opendal_engine/fs.rs index 2edaaa44d6bb..3792151ff474 100644 --- a/src/object_store/src/object/opendal_engine/fs.rs +++ b/src/object_store/src/object/opendal_engine/fs.rs @@ -19,7 +19,7 @@ use opendal::services::Fs; use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; -use super::{EngineType, OpendalObjectStore}; +use super::{MediaType, OpendalObjectStore}; use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::opendal_engine::ATOMIC_WRITE_DIR; use crate::object::ObjectResult; @@ -44,7 +44,7 @@ impl OpendalObjectStore { .finish(); Ok(Self { op, - engine_type: EngineType::Fs, + media_type: MediaType::Fs, config, metrics, }) diff --git a/src/object_store/src/object/opendal_engine/gcs.rs b/src/object_store/src/object/opendal_engine/gcs.rs index a3876b30ef56..ee0d155058dd 100644 --- a/src/object_store/src/object/opendal_engine/gcs.rs +++ b/src/object_store/src/object/opendal_engine/gcs.rs @@ -19,7 +19,7 @@ use opendal::services::Gcs; use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; -use super::{EngineType, OpendalObjectStore}; +use super::{MediaType, OpendalObjectStore}; use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::ObjectResult; @@ -49,7 +49,7 @@ impl OpendalObjectStore { .finish(); Ok(Self { op, - engine_type: EngineType::Gcs, + media_type: MediaType::Gcs, config, metrics, }) diff --git a/src/object_store/src/object/opendal_engine/hdfs.rs b/src/object_store/src/object/opendal_engine/hdfs.rs index b7b28ef08a05..d7b3655c0fc3 100644 --- a/src/object_store/src/object/opendal_engine/hdfs.rs +++ b/src/object_store/src/object/opendal_engine/hdfs.rs @@ -19,7 +19,7 @@ use opendal::services::Hdfs; use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; -use super::{EngineType, OpendalObjectStore}; +use super::{MediaType, OpendalObjectStore}; // use crate::object::opendal_engine::ATOMIC_WRITE_DIR; use crate::object::ObjectResult; @@ -53,7 +53,7 @@ impl OpendalObjectStore { .finish(); Ok(Self { op, - engine_type: EngineType::Hdfs, + media_type: MediaType::Hdfs, config, metrics, }) diff --git a/src/object_store/src/object/opendal_engine/obs.rs b/src/object_store/src/object/opendal_engine/obs.rs index 03919ec57d37..31c86109c820 100644 --- a/src/object_store/src/object/opendal_engine/obs.rs +++ b/src/object_store/src/object/opendal_engine/obs.rs @@ -19,7 +19,7 @@ use opendal::services::Obs; use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; -use super::{EngineType, OpendalObjectStore}; +use super::{MediaType, OpendalObjectStore}; use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::ObjectResult; @@ -55,7 +55,7 @@ impl OpendalObjectStore { .finish(); Ok(Self { op, - engine_type: EngineType::Obs, + media_type: MediaType::Obs, config, metrics, }) diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 6ea0cbb6fe8f..6855ae951956 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -37,14 +37,14 @@ use crate::object::{ #[derive(Clone)] pub struct OpendalObjectStore { pub(crate) op: Operator, - pub(crate) engine_type: EngineType, + pub(crate) media_type: MediaType, pub(crate) config: Arc, pub(crate) metrics: Arc, } #[derive(Clone)] -pub enum EngineType { +pub enum MediaType { Memory, Hdfs, Gcs, @@ -57,6 +57,23 @@ pub enum EngineType { Fs, } +impl MediaType { + pub fn as_str(&self) -> &'static str { + match self { + MediaType::Memory => "Memory", + MediaType::Hdfs => "Hdfs", + MediaType::Gcs => "Gcs", + MediaType::Minio => "Minio", + MediaType::S3 => "S3", + MediaType::Obs => "Obs", + MediaType::Oss => "Oss", + MediaType::Webhdfs => "Webhdfs", + MediaType::Azblob => "Azblob", + MediaType::Fs => "Fs", + } + } +} + impl OpendalObjectStore { /// create opendal memory engine, used for unit tests. pub fn test_new_memory_engine() -> ObjectResult { @@ -65,7 +82,7 @@ impl OpendalObjectStore { let op: Operator = Operator::new(builder)?.finish(); Ok(Self { op, - engine_type: EngineType::Memory, + media_type: MediaType::Memory, config: Arc::new(ObjectStoreConfig::default()), metrics: Arc::new(ObjectStoreMetrics::unused()), }) @@ -77,17 +94,17 @@ impl ObjectStore for OpendalObjectStore { type StreamingUploader = OpendalStreamingUploader; fn get_object_prefix(&self, obj_id: u64, use_new_object_prefix_strategy: bool) -> String { - match self.engine_type { - EngineType::S3 => prefix::s3::get_object_prefix(obj_id), - EngineType::Minio => prefix::s3::get_object_prefix(obj_id), - EngineType::Memory => String::default(), - EngineType::Hdfs - | EngineType::Gcs - | EngineType::Obs - | EngineType::Oss - | EngineType::Webhdfs - | EngineType::Azblob - | EngineType::Fs => { + match self.media_type { + MediaType::S3 => prefix::s3::get_object_prefix(obj_id), + MediaType::Minio => prefix::s3::get_object_prefix(obj_id), + MediaType::Memory => String::default(), + MediaType::Hdfs + | MediaType::Gcs + | MediaType::Obs + | MediaType::Oss + | MediaType::Webhdfs + | MediaType::Azblob + | MediaType::Fs => { prefix::opendal_engine::get_object_prefix(obj_id, use_new_object_prefix_strategy) } } @@ -248,18 +265,7 @@ impl ObjectStore for OpendalObjectStore { } fn store_media_type(&self) -> &'static str { - match self.engine_type { - EngineType::Memory => "Memory", - EngineType::Hdfs => "Hdfs", - EngineType::Minio => "Minio", - EngineType::S3 => "S3", - EngineType::Gcs => "Gcs", - EngineType::Obs => "Obs", - EngineType::Oss => "Oss", - EngineType::Webhdfs => "Webhdfs", - EngineType::Azblob => "Azblob", - EngineType::Fs => "Fs", - } + self.media_type.as_str() } fn support_streaming_upload(&self) -> bool { diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 183496d08673..43629bbf5157 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -21,7 +21,7 @@ use opendal::services::S3; use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; -use super::{EngineType, OpendalObjectStore}; +use super::{MediaType, OpendalObjectStore}; use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::ObjectResult; @@ -53,7 +53,7 @@ impl OpendalObjectStore { Ok(Self { op, - engine_type: EngineType::S3, + media_type: MediaType::S3, config, metrics, }) @@ -97,7 +97,7 @@ impl OpendalObjectStore { Ok(Self { op, - engine_type: EngineType::Minio, + media_type: MediaType::Minio, config, metrics, }) @@ -146,7 +146,7 @@ impl OpendalObjectStore { Ok(Self { op, - engine_type: EngineType::S3, + media_type: MediaType::S3, config, metrics, }) diff --git a/src/object_store/src/object/opendal_engine/oss.rs b/src/object_store/src/object/opendal_engine/oss.rs index c4fc5d500b11..cc0ecdfb7a94 100644 --- a/src/object_store/src/object/opendal_engine/oss.rs +++ b/src/object_store/src/object/opendal_engine/oss.rs @@ -19,7 +19,7 @@ use opendal::services::Oss; use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; -use super::{EngineType, OpendalObjectStore}; +use super::{MediaType, OpendalObjectStore}; use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::ObjectResult; @@ -55,7 +55,7 @@ impl OpendalObjectStore { .finish(); Ok(Self { op, - engine_type: EngineType::Oss, + media_type: MediaType::Oss, config, metrics, }) diff --git a/src/object_store/src/object/opendal_engine/webhdfs.rs b/src/object_store/src/object/opendal_engine/webhdfs.rs index f083102a3ed2..b214bcfad2cc 100644 --- a/src/object_store/src/object/opendal_engine/webhdfs.rs +++ b/src/object_store/src/object/opendal_engine/webhdfs.rs @@ -19,7 +19,7 @@ use opendal::services::Webhdfs; use opendal::Operator; use risingwave_common::config::ObjectStoreConfig; -use super::{EngineType, OpendalObjectStore}; +use super::{MediaType, OpendalObjectStore}; use crate::object::object_metrics::ObjectStoreMetrics; use crate::object::opendal_engine::ATOMIC_WRITE_DIR; use crate::object::ObjectResult; @@ -47,7 +47,7 @@ impl OpendalObjectStore { .finish(); Ok(Self { op, - engine_type: EngineType::Webhdfs, + media_type: MediaType::Webhdfs, config, metrics, }) diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs index 001eb8128a5b..7ea687559ab3 100644 --- a/src/object_store/src/object/s3.rs +++ b/src/object_store/src/object/s3.rs @@ -103,6 +103,8 @@ pub struct S3StreamingUploader { } impl S3StreamingUploader { + const MEDIA_TYPE: &'static str = "s3"; + pub fn new( client: Client, bucket: String, @@ -159,6 +161,7 @@ impl S3StreamingUploader { &self.config, OperationType::StreamingUploadInit, self.metrics.clone(), + Self::MEDIA_TYPE, ) .await; @@ -221,7 +224,14 @@ impl S3StreamingUploader { }) }; - let res = retry_request(builder, &config, operation_type, metrics.clone()).await; + let res = retry_request( + builder, + &config, + operation_type, + metrics.clone(), + Self::MEDIA_TYPE, + ) + .await; try_update_failure_metric(&metrics, &res, operation_type_str); Ok((part_id, res?)) })); @@ -280,7 +290,14 @@ impl S3StreamingUploader { }) }; - let res = retry_request(builder, &self.config, operation_type, self.metrics.clone()).await; + let res = retry_request( + builder, + &self.config, + operation_type, + self.metrics.clone(), + Self::MEDIA_TYPE, + ) + .await; try_update_failure_metric(&self.metrics, &res, operation_type.as_str()); let _res = res?; @@ -353,9 +370,14 @@ impl StreamingUploader for S3StreamingUploader { }) }; - let res = - retry_request(builder, &self.config, operation_type, self.metrics.clone()) - .await; + let res = retry_request( + builder, + &self.config, + operation_type, + self.metrics.clone(), + Self::MEDIA_TYPE, + ) + .await; try_update_failure_metric(&self.metrics, &res, operation_type.as_str()); res?; Ok(())