From d8caedc4399f36738c7fc1de651b58a57b693438 Mon Sep 17 00:00:00 2001 From: Tony Tung Date: Thu, 19 Oct 2017 13:33:11 -0700 Subject: [PATCH] Nuke the blobstore code, replace with cloud_blobstore library. (#567) Need the blobstore code elsewhere, so it makes sense to library-ize it. --- dss/api/bundles.py | 2 +- dss/api/files.py | 8 +- dss/api/subscriptions.py | 2 +- dss/blobstore/__init__.py | 119 ------------ dss/blobstore/gs.py | 126 ------------- dss/blobstore/s3.py | 244 ------------------------- dss/config.py | 7 +- dss/events/chunkedtask/s3copyclient.py | 3 +- dss/events/handlers/index.py | 4 +- dss/events/handlers/sync.py | 2 +- dss/hcablobstore/gs.py | 3 +- dss/hcablobstore/s3.py | 3 +- requirements-dev.txt | 1 + requirements.txt | 1 + requirements.txt.in | 1 + tests/infra/storage_mixin.py | 2 +- tests/test_blobstore.py | 119 ------------ tests/test_gsblobstore.py | 43 ----- tests/test_gshcablobstore.py | 3 +- tests/test_hcablobstore.py | 2 +- tests/test_s3blobstore.py | 139 -------------- tests/test_s3hcablobstore.py | 3 +- 22 files changed, 27 insertions(+), 810 deletions(-) delete mode 100644 dss/blobstore/__init__.py delete mode 100644 dss/blobstore/gs.py delete mode 100644 dss/blobstore/s3.py delete mode 100644 tests/test_blobstore.py delete mode 100644 tests/test_gsblobstore.py delete mode 100755 tests/test_s3blobstore.py diff --git a/dss/api/bundles.py b/dss/api/bundles.py index 4c2f8db2aa..0ab5274241 100644 --- a/dss/api/bundles.py +++ b/dss/api/bundles.py @@ -5,10 +5,10 @@ import iso8601 import requests +from cloud_blobstore import BlobNotFoundError from flask import jsonify, make_response from .. import DSSException, dss_handler -from ..blobstore import BlobNotFoundError from ..config import Config from ..hcablobstore import BundleFileMetadata, BundleMetadata, FileMetadata from ..util import UrlBuilder diff --git a/dss/api/files.py b/dss/api/files.py index 1409bf53d7..888f1d2cab 100644 --- a/dss/api/files.py +++ b/dss/api/files.py @@ -3,19 +3,17 @@ import json import re import typing - -import chainedawslambda.aws from enum import Enum, auto import iso8601 import requests - +import chainedawslambda.aws +from cloud_blobstore import BlobAlreadyExistsError, BlobNotFoundError, BlobStore +from cloud_blobstore.s3 import S3BlobStore from flask import jsonify, make_response, redirect, request from werkzeug.exceptions import BadRequest from .. import DSSException, dss_handler -from ..blobstore import BlobAlreadyExistsError, BlobNotFoundError, BlobStore -from ..blobstore.s3 import S3BlobStore from ..config import Config from ..events.chunkedtask import s3copyclient from ..hcablobstore import FileMetadata, HCABlobStore diff --git a/dss/api/subscriptions.py b/dss/api/subscriptions.py index 18bfeaef46..e288c9d800 100644 --- a/dss/api/subscriptions.py +++ b/dss/api/subscriptions.py @@ -8,6 +8,7 @@ import iso8601 import requests +from cloud_blobstore import BlobNotFoundError from elasticsearch import Elasticsearch from elasticsearch.exceptions import ElasticsearchException, NotFoundError from elasticsearch_dsl import Search @@ -15,7 +16,6 @@ from werkzeug.exceptions import BadRequest from .. import Config, Replica, ESIndexType, ESDocType, get_logger -from ..blobstore import BlobNotFoundError from ..error import DSSException, dss_handler from ..hcablobstore import FileMetadata, HCABlobStore from ..util.es import ElasticsearchClient, get_elasticsearch_index diff --git a/dss/blobstore/__init__.py b/dss/blobstore/__init__.py deleted file mode 100644 index cb6340c52c..0000000000 --- a/dss/blobstore/__init__.py +++ /dev/null @@ -1,119 +0,0 @@ -import typing - - -class BlobStore: - """Abstract base class for all blob stores.""" - def __init__(self): - pass - - def list( - self, - bucket: str, - prefix: str=None, - delimiter: str=None, - ) -> typing.Iterator[str]: - """ - Returns an iterator of all blob entries in a bucket that match a given prefix. Do not return any keys that - contain the delimiter past the prefix. - """ - raise NotImplementedError() - - def generate_presigned_GET_url( - self, - bucket: str, - object_name: str, - **kwargs) -> str: - # TODO: things like http ranges need to be explicit parameters. - # users of this API should not need to know the argument names presented - # to the cloud API. - """ - Retrieves a presigned URL for the given HTTP method for blob `object_name`. Raises BlobNotFoundError if the blob - is not present. - """ - raise NotImplementedError() - - def upload_file_handle( - self, - bucket: str, - object_name: str, - src_file_handle: typing.BinaryIO): - """ - Saves the contents of a file handle as the contents of an object in a bucket. - """ - raise NotImplementedError() - - def delete(self, bucket: str, object_name: str): - """ - Deletes an object in a bucket. If the operation definitely did not delete anything, return False. Any other - return value is treated as something was possibly deleted. - """ - raise NotImplementedError() - - def get(self, bucket: str, object_name: str) -> bytes: - """ - Retrieves the data for a given object in a given bucket. - :param bucket: the bucket the object resides in. - :param object_name: the name of the object for which metadata is being - retrieved. - :return: the data - """ - raise NotImplementedError() - - def get_cloud_checksum( - self, - bucket: str, - object_name: str - ) -> str: - """ - Retrieves the cloud-provided checksum for a given object in a given bucket. - :param bucket: the bucket the object resides in. - :param object_name: the name of the object for which checksum is being retrieved. - :return: the cloud-provided checksum - """ - raise NotImplementedError() - - def get_user_metadata( - self, - bucket: str, - object_name: str - ) -> typing.Dict[str, str]: - """ - Retrieves the user metadata for a given object in a given bucket. If the platform has any mandatory prefixes or - suffixes for the metadata keys, they should be stripped before being returned. - :param bucket: the bucket the object resides in. - :param object_name: the name of the object for which metadata is being - retrieved. - :return: a dictionary mapping metadata keys to metadata values. - """ - raise NotImplementedError() - - def copy( - self, - src_bucket: str, src_object_name: str, - dst_bucket: str, dst_object_name: str, - **kwargs): - raise NotImplementedError() - - -class BlobStoreError(Exception): - pass - - -class BlobStoreUnknownError(BlobStoreError): - pass - - -class BlobStoreCredentialError(BlobStoreError): - pass - - -class BlobBucketNotFoundError(BlobStoreError): - pass - - -class BlobNotFoundError(BlobStoreError): - pass - - -class BlobAlreadyExistsError(BlobStoreError): - pass diff --git a/dss/blobstore/gs.py b/dss/blobstore/gs.py deleted file mode 100644 index 1ef2b0eb1a..0000000000 --- a/dss/blobstore/gs.py +++ /dev/null @@ -1,126 +0,0 @@ -import base64 -import binascii -import datetime -import typing - -from google.cloud.storage import Client -from google.cloud.storage.bucket import Bucket - -from . import BlobNotFoundError, BlobStore - - -class GSBlobStore(BlobStore): - def __init__(self, json_keyfile: str) -> None: - super(GSBlobStore, self).__init__() - - self.gcp_client = Client.from_service_account_json(json_keyfile) - self.bucket_map = dict() # type: typing.MutableMapping[str, Bucket] - - def _ensure_bucket_loaded(self, bucket: str): - cached_bucket_obj = self.bucket_map.get(bucket, None) - if cached_bucket_obj is not None: - return cached_bucket_obj - bucket_obj = self.gcp_client.bucket(bucket) # type: Bucket - self.bucket_map[bucket] = bucket_obj - return bucket_obj - - def list( - self, - bucket: str, - prefix: str=None, - delimiter: str=None, - ) -> typing.Iterator[str]: - """ - Returns an iterator of all blob entries in a bucket that match a given prefix. Do not return any keys that - contain the delimiter past the - prefix. - """ - kwargs = dict() - if prefix is not None: - kwargs['prefix'] = prefix - if delimiter is not None: - kwargs['delimiter'] = delimiter - bucket_obj = self._ensure_bucket_loaded(bucket) - for blob_obj in bucket_obj.list_blobs(**kwargs): - yield blob_obj.name - - def generate_presigned_GET_url( - self, - bucket: str, - object_name: str, - **kwargs) -> str: - bucket_obj = self._ensure_bucket_loaded(bucket) - blob_obj = bucket_obj.get_blob(object_name) - return blob_obj.generate_signed_url(datetime.timedelta(days=1)) - - def upload_file_handle( - self, - bucket: str, - object_name: str, - src_file_handle: typing.BinaryIO): - bucket_obj = self._ensure_bucket_loaded(bucket) - blob_obj = bucket_obj.blob(object_name, chunk_size=1 * 1024 * 1024) - blob_obj.upload_from_file(src_file_handle) - - def get(self, bucket: str, object_name: str) -> bytes: - """ - Retrieves the data for a given object in a given bucket. - :param bucket: the bucket the object resides in. - :param object_name: the name of the object for which metadata is being - retrieved. - :return: the data - """ - bucket_obj = self._ensure_bucket_loaded(bucket) - blob_obj = bucket_obj.get_blob(object_name) - if blob_obj is None: - raise BlobNotFoundError() - - return blob_obj.download_as_string() - - def get_cloud_checksum( - self, - bucket: str, - object_name: str - ) -> str: - """ - Retrieves the cloud-provided checksum for a given object in a given bucket. - :param bucket: the bucket the object resides in. - :param object_name: the name of the object for which checksum is being retrieved. - :return: the cloud-provided checksum - """ - bucket_obj = self._ensure_bucket_loaded(bucket) - blob_obj = bucket_obj.get_blob(object_name) - if blob_obj is None: - raise BlobNotFoundError() - - return binascii.hexlify(base64.b64decode(blob_obj.crc32c)).decode("utf-8").lower() - - def get_user_metadata( - self, - bucket: str, - object_name: str - ) -> typing.Dict[str, str]: - """ - Retrieves the user metadata for a given object in a given bucket. If the platform has any mandatory prefixes or - suffixes for the metadata keys, they should be stripped before being returned. - :param bucket: the bucket the object resides in. - :param object_name: the name of the object for which metadata is being - retrieved. - :return: a dictionary mapping metadata keys to metadata values. - """ - bucket_obj = self._ensure_bucket_loaded(bucket) - response = bucket_obj.get_blob(object_name) - if response is None: - raise BlobNotFoundError() - return response.metadata - - def copy( - self, - src_bucket: str, src_object_name: str, - dst_bucket: str, dst_object_name: str, - **kwargs - ): - src_bucket_obj = self._ensure_bucket_loaded(src_bucket) - src_blob_obj = src_bucket_obj.get_blob(src_object_name) - dst_bucket_obj = self._ensure_bucket_loaded(dst_bucket) - src_bucket_obj.copy_blob(src_blob_obj, dst_bucket_obj, new_name=dst_object_name) diff --git a/dss/blobstore/s3.py b/dss/blobstore/s3.py deleted file mode 100644 index 31cf3c8aeb..0000000000 --- a/dss/blobstore/s3.py +++ /dev/null @@ -1,244 +0,0 @@ -import boto3 -import botocore -import requests -import typing - -from boto3.s3.transfer import TransferConfig - -from . import ( - BlobNotFoundError, - BlobStore, - BlobStoreCredentialError, - BlobStoreUnknownError, -) - - -class S3BlobStore(BlobStore): - def __init__(self) -> None: - super(S3BlobStore, self).__init__() - - # verify that the credentials are valid. - try: - boto3.client('sts').get_caller_identity() - except botocore.exceptions.ClientError as e: - if e.response['Error']['Code'] == "InvalidClientTokenId": - raise BlobStoreCredentialError() - - self.s3_client = boto3.client("s3") - - def list( - self, - bucket: str, - prefix: str=None, - delimiter: str=None, - ) -> typing.Iterator[str]: - """ - Returns an iterator of all blob entries in a bucket that match a given prefix. Do not return any keys that - contain the delimiter past the prefix. - """ - kwargs = dict() - if prefix is not None: - kwargs['Prefix'] = prefix - if delimiter is not None: - kwargs['Delimiter'] = delimiter - for item in ( - boto3.resource("s3").Bucket(bucket). - objects. - filter(**kwargs)): - yield item.key - - def generate_presigned_GET_url( - self, - bucket: str, - object_name: str, - **kwargs) -> str: - return self._generate_presigned_url( - bucket, - object_name, - "get_object" - ) - - def _generate_presigned_url( - self, - bucket: str, - object_name: str, - method: str, - **kwargs) -> str: - args = kwargs.copy() - args['Bucket'] = bucket - args['Key'] = object_name - return self.s3_client.generate_presigned_url( - ClientMethod=method, - Params=args, - ) - - def upload_file_handle( - self, - bucket: str, - object_name: str, - src_file_handle: typing.BinaryIO): - self.s3_client.upload_fileobj( - src_file_handle, - Bucket=bucket, - Key=object_name, - ) - - def delete(self, bucket: str, object_name: str): - self.s3_client.delete_object( - Bucket=bucket, - key=object_name - ) - - def get(self, bucket: str, object_name: str) -> bytes: - """ - Retrieves the data for a given object in a given bucket. - :param bucket: the bucket the object resides in. - :param object_name: the name of the object for which metadata is being - retrieved. - :return: the data - """ - try: - response = self.s3_client.get_object( - Bucket=bucket, - Key=object_name - ) - return response['Body'].read() - except botocore.exceptions.ClientError as ex: - if ex.response['Error']['Code'] == "NoSuchKey": - raise BlobNotFoundError(ex) - raise BlobStoreUnknownError(ex) - - def get_all_metadata( - self, - bucket: str, - object_name: str - ) -> dict: - """ - Retrieves all the metadata for a given object in a given bucket. - :param bucket: the bucket the object resides in. - :param object_name: the name of the object for which metadata is being retrieved. - :return: the metadata - """ - try: - return self.s3_client.head_object( - Bucket=bucket, - Key=object_name - ) - except botocore.exceptions.ClientError as ex: - if str(ex.response['Error']['Code']) == \ - str(requests.codes.not_found): - raise BlobNotFoundError(ex) - raise BlobStoreUnknownError(ex) - - def get_cloud_checksum( - self, - bucket: str, - object_name: str - ) -> str: - """ - Retrieves the cloud-provided checksum for a given object in a given bucket. - :param bucket: the bucket the object resides in. - :param object_name: the name of the object for which checksum is being retrieved. - :return: the cloud-provided checksum - """ - response = self.get_all_metadata(bucket, object_name) - # hilariously, the ETag is quoted. Unclear why. - return response['ETag'].strip("\"") - - def get_user_metadata( - self, - bucket: str, - object_name: str - ) -> typing.Dict[str, str]: - """ - Retrieves the user metadata for a given object in a given bucket. If the platform has any mandatory prefixes or - suffixes for the metadata keys, they should be stripped before being returned. - :param bucket: the bucket the object resides in. - :param object_name: the name of the object for which metadata is being - retrieved. - :return: a dictionary mapping metadata keys to metadata values. - """ - try: - response = self.get_all_metadata(bucket, object_name) - metadata = response['Metadata'].copy() - - response = self.s3_client.get_object_tagging( - Bucket=bucket, - Key=object_name, - ) - for tag in response['TagSet']: - key, value = tag['Key'], tag['Value'] - metadata[key] = value - - return metadata - except botocore.exceptions.ClientError as ex: - if str(ex.response['Error']['Code']) == \ - str(requests.codes.not_found): - raise BlobNotFoundError(ex) - raise BlobStoreUnknownError(ex) - - def copy( - self, - src_bucket: str, src_object_name: str, - dst_bucket: str, dst_object_name: str, - **kwargs - ): - self.s3_client.copy( - dict( - Bucket=src_bucket, - Key=src_object_name, - ), - Bucket=dst_bucket, - Key=dst_object_name, - ExtraArgs=kwargs, - Config=TransferConfig( - multipart_threshold=64 * 1024 * 1024, - multipart_chunksize=64 * 1024 * 1024, - ), - ) - - def find_next_missing_parts( - self, - bucket: str, - key: str, - upload_id: str, - part_count: int, - search_start: int=1, - return_count: int=1) -> typing.Sequence[int]: - """ - Given a `bucket`, `key`, and `upload_id`, find the next N missing parts of a multipart upload, where - N=`return_count`. If `search_start` is provided, start the search at part M, where M=`search_start`. - `part_count` is the number of parts expected for the upload. - - Note that the return value may contain fewer than N parts. - """ - if part_count < search_start: - raise ValueError("") - result = list() - while True: - kwargs = dict(Bucket=bucket, Key=key, UploadId=upload_id) # type: dict - if search_start > 1: - kwargs['PartNumberMarker'] = search_start - 1 - - # retrieve all the parts after the one we *think* we need to start from. - parts_resp = self.s3_client.list_parts(**kwargs) - - # build a set of all the parts known to be uploaded, detailed in this request. - parts_map = set() # type: typing.Set[int] - for part_detail in parts_resp.get('Parts', []): - parts_map.add(part_detail['PartNumber']) - - while True: - if search_start not in parts_map: - # not found, add it to the list of parts we still need. - result.append(search_start) - - # have we met our requirements? - if len(result) == return_count or search_start == part_count: - return result - - search_start += 1 - - if parts_resp['IsTruncated'] and search_start == parts_resp['NextPartNumberMarker']: - # finished examining the results of this batch, move onto the next one - break diff --git a/dss/config.py b/dss/config.py index 57c9c99399..27b31141fa 100644 --- a/dss/config.py +++ b/dss/config.py @@ -3,9 +3,10 @@ from contextlib import contextmanager from enum import Enum, auto -from .blobstore import BlobStore -from .blobstore.s3 import S3BlobStore -from .blobstore.gs import GSBlobStore +from cloud_blobstore import BlobStore +from cloud_blobstore.s3 import S3BlobStore +from cloud_blobstore.gs import GSBlobStore + from .hcablobstore import HCABlobStore from .hcablobstore.s3 import S3HCABlobStore from .hcablobstore.gs import GSHCABlobStore diff --git a/dss/events/chunkedtask/s3copyclient.py b/dss/events/chunkedtask/s3copyclient.py index 0535ccc6fb..f23faef0c3 100644 --- a/dss/events/chunkedtask/s3copyclient.py +++ b/dss/events/chunkedtask/s3copyclient.py @@ -1,10 +1,11 @@ import typing +from cloud_blobstore.s3 import S3BlobStore + from chainedawslambda import Task, Runtime from chainedawslambda.s3copyclient import S3CopyTask, S3ParallelCopySupervisorTask from ...api import files -from ...blobstore.s3 import S3BlobStore # this must match the lambda name in daemons/Makefile AWS_S3_COPY_AND_WRITE_METADATA_CLIENT_PREFIX = "dss-s3-copy-write-metadata-" diff --git a/dss/events/handlers/index.py b/dss/events/handlers/index.py index 107f265a45..7bd3d400bb 100644 --- a/dss/events/handlers/index.py +++ b/dss/events/handlers/index.py @@ -9,14 +9,14 @@ from urllib.parse import urlparse, unquote import requests -from requests_http_signature import HTTPSignatureAuth +from cloud_blobstore import BlobStore, BlobStoreError from elasticsearch.helpers import scan +from requests_http_signature import HTTPSignatureAuth from dss import Config, ESIndexType, ESDocType, Replica from ...util import create_blob_key from ...hcablobstore import BundleMetadata, BundleFileMetadata from ...util.es import ElasticsearchClient, get_elasticsearch_index -from ...blobstore import BlobStore, BlobStoreError DSS_BUNDLE_KEY_REGEX = r"^bundles/[0-9A-Fa-f]{8}-[0-9A-Fa-f]{4}-4[0-9A-Fa-f]{3}-[0-9A-Fa-f]{4}-[0-9A-Fa-f]{12}\..+$" diff --git a/dss/events/handlers/sync.py b/dss/events/handlers/sync.py index 09477d2ea8..5ce1834ac5 100644 --- a/dss/events/handlers/sync.py +++ b/dss/events/handlers/sync.py @@ -12,6 +12,7 @@ import botocore.session import urllib3 import google.cloud.storage +from cloud_blobstore.gs import GSBlobStore from google.resumable_media._upload import get_content_range from google.cloud.client import ClientWithProject from google.cloud._http import JSONConnection @@ -19,7 +20,6 @@ from dss import Config from dss.util.aws import resources, clients, send_sns_msg, ARN from dss.util.streaming import get_pool_manager, S3SigningChunker -from dss.blobstore.gs import GSBlobStore presigned_url_lifetime_seconds = 3600 use_gsts = False diff --git a/dss/hcablobstore/gs.py b/dss/hcablobstore/gs.py index 43e53c6b1d..996f6dd5cb 100644 --- a/dss/hcablobstore/gs.py +++ b/dss/hcablobstore/gs.py @@ -1,7 +1,8 @@ import typing +from cloud_blobstore import BlobStore + from . import HCABlobStore -from ..blobstore import BlobStore class GSHCABlobStore(HCABlobStore): diff --git a/dss/hcablobstore/s3.py b/dss/hcablobstore/s3.py index 058937a681..5ff3fe921c 100644 --- a/dss/hcablobstore/s3.py +++ b/dss/hcablobstore/s3.py @@ -1,7 +1,8 @@ import typing +from cloud_blobstore import BlobStore + from . import HCABlobStore -from ..blobstore import BlobStore class S3HCABlobStore(HCABlobStore): diff --git a/requirements-dev.txt b/requirements-dev.txt index 6d6dc2b1da..badd4da27a 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -15,6 +15,7 @@ chalice==1.0.3 chardet==3.0.4 click==6.7 clickclick==1.2.2 +cloud-blobstore==0.0.3 colorama==0.3.7 connexion==1.1.15 cookies==2.2.1 diff --git a/requirements.txt b/requirements.txt index 9d42dc0297..b51815645b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,6 +12,7 @@ chained-aws-lambda==0.0.7 chardet==3.0.4 click==6.7 clickclick==1.2.2 +cloud-blobstore==0.0.3 connexion==1.1.15 cryptography==2.0.3 docutils==0.14 diff --git a/requirements.txt.in b/requirements.txt.in index 4f99c3918b..edfd19b083 100644 --- a/requirements.txt.in +++ b/requirements.txt.in @@ -2,6 +2,7 @@ azure-storage boto3 chained-aws-lambda +cloud-blobstore connexion elasticsearch elasticsearch_dsl diff --git a/tests/infra/storage_mixin.py b/tests/infra/storage_mixin.py index f647e38374..1012185f67 100644 --- a/tests/infra/storage_mixin.py +++ b/tests/infra/storage_mixin.py @@ -4,9 +4,9 @@ import uuid import requests +from cloud_blobstore import BlobStore from dss import Config -from dss.blobstore import BlobStore from dss.util import UrlBuilder diff --git a/tests/test_blobstore.py b/tests/test_blobstore.py deleted file mode 100644 index 36e1e70e8e..0000000000 --- a/tests/test_blobstore.py +++ /dev/null @@ -1,119 +0,0 @@ -import io - -import requests - -from dss.blobstore import BlobNotFoundError, BlobStore -from tests import infra - - -class BlobStoreTests: - """ - Common blobstore tests. We want to avoid repeating ourselves, so if we - built the abstractions correctly, common operations can all be tested here. - """ - - def test_get_metadata(self): - """ - Ensure that the ``get_metadata`` methods return sane data. - """ - handle = self.handle # type: BlobStore - metadata = handle.get_user_metadata( - self.test_fixtures_bucket, - "test_good_source_data/0") - self.assertIn('hca-dss-content-type', metadata) - - with self.assertRaises(BlobNotFoundError): - handle.get_user_metadata( - self.test_fixtures_bucket, - "test_good_source_data_DOES_NOT_EXIST") - - def testList(self): - """ - Ensure that the ```list``` method returns sane data. - """ - items = [item for item in - self.handle.list( - self.test_fixtures_bucket, - "test_good_source_data/0", - )] - self.assertTrue(len(items) > 0) - for item in items: - if item == "test_good_source_data/0": - break - else: - self.fail("did not find the requisite key") - - # fetch a bunch of items all at once. - items = [item for item in - self.handle.list( - self.test_fixtures_bucket, - "testList/prefix", - )] - self.assertEqual(len(items), 10) - - # this should fetch both testList/delimiter and testList/delimiter/test - items = [item for item in - self.handle.list( - self.test_fixtures_bucket, - "testList/delimiter", - )] - self.assertEqual(len(items), 2) - - # this should fetch only testList/delimiter - items = [item for item in - self.handle.list( - self.test_fixtures_bucket, - "testList/delimiter", - delimiter="/" - )] - self.assertEqual(len(items), 1) - - def testGetPresignedUrl(self): - presigned_url = self.handle.generate_presigned_GET_url( - self.test_fixtures_bucket, - "test_good_source_data/0", - ) - - resp = requests.get(presigned_url) - self.assertEqual(resp.status_code, requests.codes.ok) - - def testUploadFileHandle(self): - fobj = io.BytesIO(b"abcabcabc") - dst_blob_name = infra.generate_test_key() - - self.handle.upload_file_handle( - self.test_bucket, - dst_blob_name, - fobj - ) - - # should be able to get metadata for the file. - self.handle.get_user_metadata( - self.test_bucket, dst_blob_name) - - def testGet(self): - data = self.handle.get( - self.test_fixtures_bucket, - "test_good_source_data/0", - ) - self.assertEqual(len(data), 11358) - - with self.assertRaises(BlobNotFoundError): - self.handle.get( - self.test_fixtures_bucket, - "test_good_source_data_DOES_NOT_EXIST", - ) - - def testCopy(self): - dst_blob_name = infra.generate_test_key() - - self.handle.copy( - self.test_fixtures_bucket, - "test_good_source_data/0", - self.test_bucket, - dst_blob_name, - ) - - # should be able to get metadata for the file. - self.handle.get_user_metadata( - self.test_bucket, dst_blob_name) diff --git a/tests/test_gsblobstore.py b/tests/test_gsblobstore.py deleted file mode 100644 index 2d4ffb01b1..0000000000 --- a/tests/test_gsblobstore.py +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env python -# coding: utf-8 - -import os -import sys -import unittest - -pkg_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) # noqa -sys.path.insert(0, pkg_root) # noqa - -from dss.blobstore.gs import GSBlobStore -from dss.blobstore import BlobNotFoundError -from tests import infra -from tests.test_blobstore import BlobStoreTests - - -class TestGSBlobStore(unittest.TestCase, BlobStoreTests): - def setUp(self): - self.credentials = infra.get_env("GOOGLE_APPLICATION_CREDENTIALS") - self.test_bucket = infra.get_env("DSS_GS_BUCKET_TEST") - self.test_fixtures_bucket = infra.get_env("DSS_GS_BUCKET_TEST_FIXTURES") - self.handle = GSBlobStore(self.credentials) - - def tearDown(self): - pass - - def test_get_checksum(self): - """ - Ensure that the ``get_metadata`` methods return sane data. - """ - handle = self.handle # type: BlobStore - checksum = handle.get_cloud_checksum( - self.test_fixtures_bucket, - "test_good_source_data/0") - self.assertEqual(checksum, "e16e07b9") - - with self.assertRaises(BlobNotFoundError): - handle.get_user_metadata( - self.test_fixtures_bucket, - "test_good_source_data_DOES_NOT_EXIST") - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_gshcablobstore.py b/tests/test_gshcablobstore.py index 82bc927496..ef2f83c903 100644 --- a/tests/test_gshcablobstore.py +++ b/tests/test_gshcablobstore.py @@ -5,10 +5,11 @@ import sys import unittest +from cloud_blobstore.gs import GSBlobStore + pkg_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) # noqa sys.path.insert(0, pkg_root) # noqa -from dss.blobstore.gs import GSBlobStore from dss.hcablobstore.gs import GSHCABlobStore from tests import infra from tests.test_hcablobstore import HCABlobStoreTests diff --git a/tests/test_hcablobstore.py b/tests/test_hcablobstore.py index 8950f7c3fa..00a1cf014d 100644 --- a/tests/test_hcablobstore.py +++ b/tests/test_hcablobstore.py @@ -1,4 +1,4 @@ -from dss.blobstore import BlobNotFoundError +from cloud_blobstore import BlobNotFoundError class HCABlobStoreTests: diff --git a/tests/test_s3blobstore.py b/tests/test_s3blobstore.py deleted file mode 100755 index b0a0670a0c..0000000000 --- a/tests/test_s3blobstore.py +++ /dev/null @@ -1,139 +0,0 @@ -#!/usr/bin/env python -# coding: utf-8 - -import os -import sys -import unittest -import uuid - -pkg_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) # noqa -sys.path.insert(0, pkg_root) # noqa - -from dss.blobstore import BlobNotFoundError -from dss.blobstore.s3 import S3BlobStore -from tests import infra -from tests.test_blobstore import BlobStoreTests - - -class TestS3BlobStore(unittest.TestCase, BlobStoreTests): - def setUp(self): - self.test_bucket = infra.get_env("DSS_S3_BUCKET_TEST") - self.test_fixtures_bucket = infra.get_env("DSS_S3_BUCKET_TEST_FIXTURES") - - self.handle = S3BlobStore() - - def tearDown(self): - pass - - def test_get_checksum(self): - """ - Ensure that the ``get_metadata`` methods return sane data. - """ - handle = self.handle # type: BlobStore - checksum = handle.get_cloud_checksum( - self.test_fixtures_bucket, - "test_good_source_data/0") - self.assertEqual(checksum, "3b83ef96387f14655fc854ddc3c6bd57") - - with self.assertRaises(BlobNotFoundError): - handle.get_user_metadata( - self.test_fixtures_bucket, - "test_good_source_data_DOES_NOT_EXIST") - - def find_next_missing_parts_test_case(self, handle, parts_to_upload, *args, **kwargs): - key = str(uuid.uuid4()) - mpu = handle.s3_client.create_multipart_upload(Bucket=self.test_bucket, Key=key) - - try: - for part_to_upload in parts_to_upload: - handle.s3_client.upload_part( - Bucket=self.test_bucket, - Key=key, - UploadId=mpu['UploadId'], - PartNumber=part_to_upload, - Body=f"part{part_to_upload:05}".encode("utf-8")) - - return handle.find_next_missing_parts(self.test_bucket, key, mpu['UploadId'], *args, **kwargs) - finally: - handle.s3_client.abort_multipart_upload(Bucket=self.test_bucket, Key=key, UploadId=mpu['UploadId']) - - def test_find_next_missing_parts_simple(self): - handle = self.handle # type: BlobStore - - # simple test case, 2 parts, 1 part uploaded. - res = self.find_next_missing_parts_test_case(handle, [1], part_count=2) - self.assertEqual(res, [2]) - - res = self.find_next_missing_parts_test_case(handle, [1], part_count=2, search_start=1) - self.assertEqual(res, [2]) - - res = self.find_next_missing_parts_test_case(handle, [1], part_count=2, search_start=1, return_count=2) - self.assertEqual(res, [2]) - - res = self.find_next_missing_parts_test_case(handle, [1], part_count=2, search_start=2) - self.assertEqual(res, [2]) - - res = self.find_next_missing_parts_test_case(handle, [1], part_count=2, search_start=2, return_count=2) - self.assertEqual(res, [2]) - - with self.assertRaises(ValueError): - self.find_next_missing_parts_test_case(handle, [1], part_count=2, search_start=3, return_count=2) - - def test_find_next_missing_parts_multiple_requests(self): - handle = self.handle # type: BlobStore - - # 10000 parts, one is uploaded. - res = self.find_next_missing_parts_test_case(handle, [1], part_count=10000) - self.assertEqual(res, [2]) - - # 10000 parts, one is uploaded, get all the missing parts. - res = self.find_next_missing_parts_test_case(handle, [1], part_count=10000, return_count=10000) - self.assertEqual(len(res), 9999) - self.assertNotIn(1, res) - - # 10000 parts, one is uploaded, get all the missing parts. - res = self.find_next_missing_parts_test_case(handle, [1], part_count=10000, return_count=1000) - self.assertEqual(len(res), 1000) - self.assertNotIn(1, res) - - # 10000 parts, one is uploaded, get all the missing parts. - res = self.find_next_missing_parts_test_case(handle, [1], part_count=10000, search_start=100, return_count=1000) - self.assertEqual(len(res), 1000) - self.assertNotIn(1, res) - for missing_part in res: - self.assertGreaterEqual(missing_part, 100) - - # 10000 parts, all the parts numbers divisible by 2000 is uploaded, get all the missing parts. - res = self.find_next_missing_parts_test_case( - handle, - [ix - for ix in range(1, 10000 + 1) - if ix % 2000 == 0], - part_count=10000, - return_count=10000) - self.assertEqual(len(res), 9995) - for ix in range(1, 10000 + 1): - if ix % 2000 == 0: - self.assertNotIn(ix, res) - else: - self.assertIn(ix, res) - - # 10000 parts, all the parts numbers divisible by 2000 is uploaded, get all the missing parts starting at part - # 1001. - res = self.find_next_missing_parts_test_case( - handle, - [ix - for ix in range(1, 10000 + 1) - if ix % 2000 == 0], - part_count=10000, - search_start=1001, - return_count=10000) - self.assertEqual(len(res), 8995) - for ix in range(1001, 10000 + 1): - if ix % 2000 == 0: - self.assertNotIn(ix, res) - else: - self.assertIn(ix, res) - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_s3hcablobstore.py b/tests/test_s3hcablobstore.py index e289e8df13..ff76e0662b 100644 --- a/tests/test_s3hcablobstore.py +++ b/tests/test_s3hcablobstore.py @@ -5,10 +5,11 @@ import sys import unittest +from cloud_blobstore.s3 import S3BlobStore + pkg_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) # noqa sys.path.insert(0, pkg_root) # noqa -from dss.blobstore.s3 import S3BlobStore from dss.hcablobstore.s3 import S3HCABlobStore from tests import infra from tests.test_hcablobstore import HCABlobStoreTests