Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better retries for s3 #1283

Merged
merged 4 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion hub/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from botocore.config import Config
import numpy as np

__pdoc__ = {
Expand All @@ -6,6 +7,7 @@
"cli": False,
"client": False,
"constants": False,
"config": False,
"integrations": False,
"tests": False,
"Dataset.clear_cache": False,
Expand All @@ -16,7 +18,6 @@
"Dataset.token": False,
"Dataset.num_samples": False,
}

from .api.dataset import dataset
from .api.read import read
from .core.dataset import Dataset
Expand Down Expand Up @@ -54,10 +55,13 @@
"ingest_kaggle",
"compressions",
"htypes",
"config",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

]

__version__ = "2.0.14"
__encoded_version__ = np.array(__version__)
config = {"s3": Config(max_pool_connections=50)}


hub_reporter.tags.append(f"version:{__version__}")
hub_reporter.system_report(publish=True)
Expand Down
34 changes: 26 additions & 8 deletions hub/api/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def __new__(
creds (dict, optional): A dictionary containing credentials used to access the dataset at the path.
This takes precedence over credentials present in the environment. Currently only works with s3 paths.
It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url' and 'region' as keys.
token (str, optional): Activeloop token, used for fetching credentials for Hub datasets. This is optional, tokens are normally autogenerated.
token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.

Returns:
Dataset object created using the arguments provided.
Expand Down Expand Up @@ -106,7 +106,7 @@ def empty(
creds (dict, optional): A dictionary containing credentials used to access the dataset at the path.
This takes precedence over credentials present in the environment. Currently only works with s3 paths.
It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url' and 'region' as keys.
token (str, optional): Activeloop token, used for fetching credentials for Hub datasets. This is optional, tokens are normally autogenerated.
token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.

Returns:
Dataset object created using the arguments provided.
Expand Down Expand Up @@ -171,7 +171,7 @@ def load(
creds (dict, optional): A dictionary containing credentials used to access the dataset at the path.
This takes precedence over credentials present in the environment. Currently only works with s3 paths.
It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url' and 'region' as keys.
token (str, optional): Activeloop token, used for fetching credentials for Hub datasets. This is optional, tokens are normally autogenerated.
token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.

Returns:
Dataset object created using the arguments provided.
Expand Down Expand Up @@ -206,7 +206,13 @@ def load(
)

@staticmethod
def delete(path: str, force: bool = False, large_ok: bool = False) -> None:
def delete(
path: str,
force: bool = False,
large_ok: bool = False,
creds: Optional[dict] = None,
token: Optional[str] = None,
) -> None:
"""Deletes a dataset at a given path.
This is an IRREVERSIBLE operation. Data once deleted can not be recovered.

Expand All @@ -215,7 +221,13 @@ def delete(path: str, force: bool = False, large_ok: bool = False) -> None:
force (bool): Delete data regardless of whether
it looks like a hub dataset. All data at the path will be removed.
large_ok (bool): Delete datasets larger than 1GB. Disabled by default.
creds (dict, optional): A dictionary containing credentials used to access the dataset at the path.
This takes precedence over credentials present in the environment. Currently only works with s3 paths.
It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url' and 'region' as keys.
token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.
"""
if creds is None:
creds = {}

feature_report_path(path, "delete", {"Force": force, "Large_OK": large_ok})

Expand All @@ -225,7 +237,7 @@ def delete(path: str, force: bool = False, large_ok: bool = False) -> None:
except:
if force:
base_storage = storage_provider_from_path(
path, creds={}, read_only=False, token=None
path, creds=creds, read_only=False, token=token
)
base_storage.clear()
else:
Expand All @@ -235,24 +247,30 @@ def delete(path: str, force: bool = False, large_ok: bool = False) -> None:
def like(
path: str,
source: Union[str, Dataset],
creds: dict = None,
overwrite: bool = False,
creds: Optional[dict] = None,
token: Optional[str] = None,
) -> Dataset:
"""Copies the `source` dataset's structure to a new location. No samples are copied, only the meta/info for the dataset and it's tensors.

Args:
path (str): Path where the new dataset will be created.
source (Union[str, Dataset]): Path or dataset object that will be used as the template for the new dataset.
creds (dict): Credentials that will be used to create the new dataset.
overwrite (bool): If True and a dataset exists at `destination`, it will be overwritten. Defaults to False.
creds (dict, optional): A dictionary containing credentials used to access the dataset at the path.
This takes precedence over credentials present in the environment. Currently only works with s3 paths.
It supports 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', 'endpoint_url' and 'region' as keys.
token (str, optional): Activeloop token, used for fetching credentials to the dataset at path if it is a Hub dataset. This is optional, tokens are normally autogenerated.

Returns:
Dataset: New dataset object.
"""

feature_report_path(path, "like", {"Overwrite": overwrite})

destination_ds = dataset.empty(path, creds=creds, overwrite=overwrite)
destination_ds = dataset.empty(
path, creds=creds, overwrite=overwrite, token=token
)
source_ds = source
if isinstance(source, str):
source_ds = dataset.load(source)
Expand Down
119 changes: 69 additions & 50 deletions hub/core/storage/s3.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import hub
import time
import boto3
import botocore # type: ignore
Expand Down Expand Up @@ -32,6 +33,22 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
raise self.error_class(exc_value).with_traceback(exc_traceback)


class S3ResetClientManager:
"""Resets the client, if error still occurs, it raises it."""

def __init__(self, s3p, error_class: S3Error):
self.error_class = error_class
self.s3p = s3p

def __enter__(self):
self.s3p._initialize_s3_parameters()
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
if exc_type is not None:
raise self.error_class(exc_value).with_traceback(exc_traceback)


class S3Provider(StorageProvider):
"""Provider class for using S3 storage."""

Expand All @@ -44,7 +61,6 @@ def __init__(
endpoint_url: Optional[str] = None,
aws_region: Optional[str] = None,
token: Optional[str] = None,
max_pool_connections: int = 50,
):
"""Initializes the S3Provider

Expand All @@ -64,21 +80,18 @@ def __init__(
aws_region (str, optional): Specifies the AWS Region to send requests to.
token (str, optional): Activeloop token, used for fetching credentials for Hub datasets (if this is underlying storage for Hub dataset).
This is optional, tokens are normally autogenerated.
max_pool_connections (int): The maximum number of connections to keep in a connection pool.
If this value is not set, the default value of 10 is used.
"""
self.root = root
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.aws_session_token = aws_session_token
self.aws_region: Optional[str] = aws_region
self.endpoint_url: Optional[str] = endpoint_url
self.max_pool_connections = max_pool_connections
self.expiration: Optional[str] = None
self.tag: Optional[str] = None
self.token: Optional[str] = token
self.loaded_creds_from_environment = False

self.client_config = hub.config["s3"]
self._initialize_s3_parameters()

def __setitem__(self, path, content):
Expand All @@ -105,17 +118,18 @@ def __setitem__(self, path, content):
)
# catch expired token error
except botocore.exceptions.ClientError as err:
if self.need_to_reload_creds(err):
with S3ReloadCredentialsManager(self, S3SetError):
self.client.put_object(
Bucket=self.bucket,
Body=content,
Key=path,
ContentType="application/octet-stream", # signifies binary data
)
else:
raise S3SetError(err)

manager = (
S3ReloadCredentialsManager
if self.need_to_reload_creds(err)
else S3ResetClientManager
)
with manager(self, S3SetError):
self.client.put_object(
Bucket=self.bucket,
Body=content,
Key=path,
ContentType="application/octet-stream", # signifies binary data
)
except Exception as err:
raise S3SetError(err)

Expand Down Expand Up @@ -144,15 +158,18 @@ def __getitem__(self, path):
except botocore.exceptions.ClientError as err:
if err.response["Error"]["Code"] == "NoSuchKey":
raise KeyError(err)
elif self.need_to_reload_creds(err):
with S3ReloadCredentialsManager(self, S3GetError):
resp = self.client.get_object(
Bucket=self.bucket,
Key=path,
)
return resp["Body"].read()
else:
raise S3GetError(err)

manager = (
S3ReloadCredentialsManager
if self.need_to_reload_creds(err)
else S3ResetClientManager
)
with manager(self, S3GetError):
resp = self.client.get_object(
Bucket=self.bucket,
Key=path,
)
return resp["Body"].read()
except Exception as err:
raise S3GetError(err)

Expand All @@ -174,11 +191,13 @@ def __delitem__(self, path):
self.client.delete_object(Bucket=self.bucket, Key=path)
# catch expired token error
except botocore.exceptions.ClientError as err:
if self.need_to_reload_creds(err):
with S3ReloadCredentialsManager(self, S3DeletionError):
self.client.delete_object(Bucket=self.bucket, Key=path)
else:
raise S3DeletionError(err)
manager = (
S3ReloadCredentialsManager
if self.need_to_reload_creds(err)
else S3ResetClientManager
)
with manager(self, S3DeletionError):
self.client.delete_object(Bucket=self.bucket, Key=path)
except Exception as err:
raise S3DeletionError(err)

Expand All @@ -197,13 +216,15 @@ def _all_keys(self):
items = self.client.list_objects_v2(Bucket=self.bucket, Prefix=self.path)
# catch expired token error
except botocore.exceptions.ClientError as err:
if self.need_to_reload_creds(err):
with S3ReloadCredentialsManager(self, S3ListError):
items = self.client.list_objects_v2(
Bucket=self.bucket, Prefix=self.path
)
else:
raise S3ListError(err)
manager = (
S3ReloadCredentialsManager
if self.need_to_reload_creds(err)
else S3ResetClientManager
)
with manager(self, S3ListError):
items = self.client.list_objects_v2(
Bucket=self.bucket, Prefix=self.path
)
except Exception as err:
raise S3ListError(err)

Expand Down Expand Up @@ -246,13 +267,15 @@ def clear(self):
bucket = self.resource.Bucket(self.bucket)
bucket.objects.filter(Prefix=self.path).delete()
# catch expired token error
except Exception as e:
if self.loaded_creds_from_environment:
with S3ReloadCredentialsManager(self, S3DeletionError):
bucket = self.resource.Bucket(self.bucket)
bucket.objects.filter(Prefix=self.path).delete()
else:
raise S3DeletionError(e)
except Exception as err:
manager = (
S3ReloadCredentialsManager
if self.need_to_reload_creds(err)
else S3ResetClientManager
)
with manager(self, S3DeletionError):
bucket = self.resource.Bucket(self.bucket)
bucket.objects.filter(Prefix=self.path).delete()

else:
super().clear()
Expand All @@ -265,7 +288,7 @@ def __getstate__(self):
self.aws_session_token,
self.aws_region,
self.endpoint_url,
self.max_pool_connections,
self.client_config,
self.expiration,
self.tag,
self.token,
Expand All @@ -279,7 +302,7 @@ def __setstate__(self, state):
self.aws_session_token = state[3]
self.aws_region = state[4]
self.endpoint_url = state[5]
self.max_pool_connections = state[6]
self.client_config = state[6]
self.expiration = state[7]
self.tag = state[8]
self.token = state[9]
Expand Down Expand Up @@ -309,10 +332,6 @@ def _set_hub_creds_info(self, hub_path: str, expiration: str):
def _initialize_s3_parameters(self):
self._set_bucket_and_path()

self.client_config = botocore.config.Config(
max_pool_connections=self.max_pool_connections,
)

if self.aws_access_key_id is None and self.aws_secret_access_key is None:
self._locate_and_load_creds()
self.loaded_creds_from_environment = True
Expand Down