Skip to content

Commit

Permalink
support retry_unknown_service_error for opendal s3
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 committed Aug 21, 2024
1 parent 3634497 commit 49be510
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 27 deletions.
8 changes: 6 additions & 2 deletions src/object_store/src/object/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,18 @@ impl ObjectError {
false
}

pub fn should_retry(&self) -> bool {
pub fn should_retry(&self, retry_opendal_s3_unknown_error: bool) -> bool {
match self.inner() {
ObjectErrorInner::S3 {
inner: _,
should_retry,
} => *should_retry,

ObjectErrorInner::Opendal(e) => e.is_temporary(),
ObjectErrorInner::Opendal(e) => {
e.is_temporary()
|| (retry_opendal_s3_unknown_error
&& e.kind() == opendal::ErrorKind::Unexpected)
}

ObjectErrorInner::Timeout(_) => true,

Expand Down
45 changes: 37 additions & 8 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub async fn upload(&self, path: &str, obj: Bytes) -> ObjectResult<()> {
let operation_type = OperationType::Upload;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();

This comment has been minimized.

Copy link
@Li0k

Li0k Aug 22, 2024

Contributor

nits: How about using the same variable name? media_type and engine_type are too similar.

This comment has been minimized.

Copy link
@hzxa21

hzxa21 Aug 22, 2024

Author Collaborator

I think you are referring to the opendal_engine::EngineType. I have rename it to MediaType


self.object_store_metrics
.write_bytes
Expand All @@ -578,7 +579,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type(), operation_type_str])
.with_label_values(&[media_type, operation_type_str])
.start_timer();

let builder = || async {
Expand All @@ -593,6 +594,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand Down Expand Up @@ -630,10 +632,12 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub async fn read(&self, path: &str, range: impl ObjectRangeBounds) -> ObjectResult<Bytes> {
let operation_type = OperationType::Read;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();

let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type(), operation_type_str])
.with_label_values(&[media_type, operation_type_str])
.start_timer();

let builder = || async {
Expand All @@ -648,6 +652,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand Down Expand Up @@ -701,6 +706,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand All @@ -719,10 +725,11 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub async fn metadata(&self, path: &str) -> ObjectResult<ObjectMetadata> {
let operation_type = OperationType::Metadata;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();
let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type(), operation_type_str])
.with_label_values(&[media_type, operation_type_str])
.start_timer();

let builder = || async {
Expand All @@ -737,6 +744,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand All @@ -747,10 +755,12 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub async fn delete(&self, path: &str) -> ObjectResult<()> {
let operation_type = OperationType::Delete;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();

let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type(), operation_type_str])
.with_label_values(&[media_type, operation_type_str])
.start_timer();

let builder = || async {
Expand All @@ -765,6 +775,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand All @@ -775,6 +786,8 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> {
let operation_type = OperationType::DeleteObjects;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();

let _timer = self
.object_store_metrics
.operation_latency
Expand All @@ -793,6 +806,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand All @@ -803,11 +817,12 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
pub async fn list(&self, prefix: &str) -> ObjectResult<ObjectMetadataIter> {
let operation_type = OperationType::List;
let operation_type_str = operation_type.as_str();
let media_type = self.media_type();

let _timer = self
.object_store_metrics
.operation_latency
.with_label_values(&[self.media_type(), operation_type_str])
.with_label_values(&[media_type, operation_type_str])
.start_timer();

let builder = || async {
Expand All @@ -822,6 +837,7 @@ impl<OS: ObjectStore> MonitoredObjectStore<OS> {
&self.config,
operation_type,
self.object_store_metrics.clone(),
media_type,
)
.await;

Expand Down Expand Up @@ -1101,20 +1117,26 @@ struct RetryCondition {
operation_type: OperationType,
retry_count: usize,
metrics: Arc<ObjectStoreMetrics>,
retry_opendal_s3_unknown_error: bool,
}

impl RetryCondition {
fn new(operation_type: OperationType, metrics: Arc<ObjectStoreMetrics>) -> Self {
fn new(
operation_type: OperationType,
metrics: Arc<ObjectStoreMetrics>,
retry_opendal_s3_unknown_error: bool,
) -> Self {
Self {
operation_type,
retry_count: 0,
metrics,
retry_opendal_s3_unknown_error,
}
}

#[inline(always)]
fn should_retry_inner(&mut self, err: &ObjectError) -> bool {
let should_retry = err.should_retry();
let should_retry = err.should_retry(self.retry_opendal_s3_unknown_error);
if should_retry {
self.retry_count += 1;
}
Expand Down Expand Up @@ -1145,6 +1167,7 @@ async fn retry_request<F, T, B>(
config: &ObjectStoreConfig,
operation_type: OperationType,
object_store_metrics: Arc<ObjectStoreMetrics>,
media_type: &'static str,
) -> ObjectResult<T>
where
B: Fn() -> F,
Expand All @@ -1155,7 +1178,13 @@ where
Duration::from_millis(get_attempt_timeout_by_type(config, operation_type));
let operation_type_str = operation_type.as_str();

let retry_condition = RetryCondition::new(operation_type, object_store_metrics);
let retry_condition = RetryCondition::new(
operation_type,
object_store_metrics,
config.s3.developer.retry_unknown_service_error
&& (media_type == opendal_engine::EngineType::S3.as_str()
|| media_type == opendal_engine::EngineType::Minio.as_str()),
);

let f = || async {
let future = builder();
Expand Down
30 changes: 18 additions & 12 deletions src/object_store/src/object/opendal_engine/opendal_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ pub enum EngineType {
Fs,
}

impl EngineType {
pub fn as_str(&self) -> &'static str {
match self {
EngineType::Memory => "Memory",
EngineType::Hdfs => "Hdfs",
EngineType::Gcs => "Gcs",
EngineType::Minio => "Minio",
EngineType::S3 => "S3",
EngineType::Obs => "Obs",
EngineType::Oss => "Oss",
EngineType::Webhdfs => "Webhdfs",
EngineType::Azblob => "Azblob",
EngineType::Fs => "Fs",
}
}
}

impl OpendalObjectStore {
/// create opendal memory engine, used for unit tests.
pub fn test_new_memory_engine() -> ObjectResult<Self> {
Expand Down Expand Up @@ -248,18 +265,7 @@ impl ObjectStore for OpendalObjectStore {
}

fn store_media_type(&self) -> &'static str {
match self.engine_type {
EngineType::Memory => "Memory",
EngineType::Hdfs => "Hdfs",
EngineType::Minio => "Minio",
EngineType::S3 => "S3",
EngineType::Gcs => "Gcs",
EngineType::Obs => "Obs",
EngineType::Oss => "Oss",
EngineType::Webhdfs => "Webhdfs",
EngineType::Azblob => "Azblob",
EngineType::Fs => "Fs",
}
self.engine_type.as_str()
}

fn support_streaming_upload(&self) -> bool {
Expand Down
32 changes: 27 additions & 5 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub struct S3StreamingUploader {
}

impl S3StreamingUploader {
const MEDIA_TYPE: &'static str = "s3";

pub fn new(
client: Client,
bucket: String,
Expand Down Expand Up @@ -159,6 +161,7 @@ impl S3StreamingUploader {
&self.config,
OperationType::StreamingUploadInit,
self.metrics.clone(),
Self::MEDIA_TYPE,
)
.await;

Expand Down Expand Up @@ -221,7 +224,14 @@ impl S3StreamingUploader {
})
};

let res = retry_request(builder, &config, operation_type, metrics.clone()).await;
let res = retry_request(
builder,
&config,
operation_type,
metrics.clone(),
Self::MEDIA_TYPE,
)
.await;
try_update_failure_metric(&metrics, &res, operation_type_str);
Ok((part_id, res?))
}));
Expand Down Expand Up @@ -280,7 +290,14 @@ impl S3StreamingUploader {
})
};

let res = retry_request(builder, &self.config, operation_type, self.metrics.clone()).await;
let res = retry_request(
builder,
&self.config,
operation_type,
self.metrics.clone(),
Self::MEDIA_TYPE,
)
.await;
try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
let _res = res?;

Expand Down Expand Up @@ -353,9 +370,14 @@ impl StreamingUploader for S3StreamingUploader {
})
};

let res =
retry_request(builder, &self.config, operation_type, self.metrics.clone())
.await;
let res = retry_request(
builder,
&self.config,
operation_type,
self.metrics.clone(),
Self::MEDIA_TYPE,
)
.await;
try_update_failure_metric(&self.metrics, &res, operation_type.as_str());
res?;
Ok(())
Expand Down

0 comments on commit 49be510

Please sign in to comment.