Skip to content

Commit

Permalink
Improve performance of CloudNode through gsutil
Browse files Browse the repository at this point in the history
This rewrites CloudNode's functionality using two calls to gsutil
rsync.  Because gsutil can perform the deletion for us and parallelize
and compress the uploads, it vastly outperforms the python module we
were using before.  This also simplifies the installation
prerequisites for cloud storage.

Closes #19

Change-Id: Iec9bc4162fbaa3c64220cd97e3c7b9d51cc90ccc
  • Loading branch information
joeyparrish committed Oct 7, 2019
1 parent 3eccb74 commit 8f7b5e4
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 102 deletions.
17 changes: 4 additions & 13 deletions PREREQS.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,7 @@ brew install ffmpeg
## Google Cloud Storage (optional)

Shaka Streamer can push content directly to a Google Cloud Storage bucket. To
use this feature, the Google Cloud SDK and the Cloud Storage python module are
required.

For Ubuntu, for example, you could install the necessary components like this:

```sh
sudo apt -y install google-cloud-sdk python3-pip python3-setuptools
pip3 install --user google-cloud
pip3 install --user google-cloud-storage
```
use this feature, the Google Cloud SDK is required.

See https://cloud.google.com/sdk/install for details on installing the Google
Cloud SDK on your platform.
Expand All @@ -84,11 +75,11 @@ log in through your browser.

```sh
gcloud init
gcloud auth login
gsutil config
```

Follow the instructions given to you by each of these tools.
Follow the instructions given to you by gcloud to initialize the environment and
login.


## Test Dependencies (optional)

Expand Down
146 changes: 60 additions & 86 deletions streamer/cloud_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,121 +14,95 @@

"""Pushes output from packager to cloud."""

from google.cloud import storage

import glob
import os
import subprocess
import time

from . import node_base

# This is the value for the HTTP header "Cache-Control" which will be attached
# to the Cloud Storage blobs uploaded by this tool. When the browser requests
# a file from Cloud Storage, the server will use this as the value of the
# "Cache-Control" header it returns.
# This is the HTTP header "Cache-Control" which will be attached to the Cloud
# Storage blobs uploaded by this tool. When the browser requests a file from
# Cloud Storage, the server will use this as the "Cache-Control" header it
# returns.
#
# Here "no-store" means that the response must not be stored in a cache, and
# "no-transform" means that the response must not be manipulated in any way
# (including Chrome's data saver features which might want to re-encode
# content).
#
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control
CACHE_CONTROL_HEADER = 'no-store, no-transform'
CACHE_CONTROL_HEADER = 'Cache-Control: no-store, no-transform'

COMMON_GSUTIL_ARGS = [
'gsutil',
'-q', # quiet mode: report errors, but not progress
'-h', CACHE_CONTROL_HEADER, # set the appropriate cache header on uploads
'-m', # parllelize the operation
'rsync', # operation to perform
'-C', # still try to push other files if one fails
'-r', # recurse into folders
]

class CloudNode(node_base.ThreadedNodeBase):
def __init__(self, input_dir, bucket_url, temp_dir):
super().__init__(thread_name='cloud', continue_on_exception=True)
self._input_dir = input_dir
self._temp_dir = temp_dir
self._storage_client = storage.Client()
self._bucket_url = bucket_url
bucket, path = self._bucket_url.replace('gs://', '').split('/', 1)
self._bucket = self._storage_client.get_bucket(bucket)
# Strip trailing slashes to make sure we don't construct paths later like
# foo//bar, which is _not_ the same as foo/bar in Google Cloud Storage.
self._subdir_path = path.rstrip('/')
self._temp_dir = temp_dir

def _thread_single_pass(self):
all_files = os.listdir(self._input_dir)
is_manifest_file = lambda x: x.endswith('.mpd') or x.endswith('.m3u8')
manifest_files = filter(is_manifest_file, all_files)
segment_files = filter(lambda x: not is_manifest_file(x), all_files)
# With recursive=True, glob's ** will also match the base dir.
manifest_files = (
glob.glob(self._input_dir + '/**/*.mpd', recursive=True) +
glob.glob(self._input_dir + '/**/*.m3u8', recursive=True))

# The manifest at any moment will reference existing segment files.
# We must be careful not to upload a manifest that references segments that
# haven't been uploaded yet. So first we will capture manifest contents,
# then upload current segments, then upload the manifest contents we
# captured.

manifest_contents = {}
for filename in manifest_files:
source_path = os.path.join(self._input_dir, filename)
contents = b''
for manifest_path in manifest_files:
# The path within the input dir.
subdir_path = os.path.relpath(manifest_path, self._input_dir)

# Capture manifest contents, and retry until the file is non-empty or
# until the thread is killed.
while not contents and self._is_running():
contents = b''
while (not contents and
self.check_status() == node_base.ProcessStatus.Running):
time.sleep(0.1)

with open(source_path, 'rb') as f:
with open(manifest_path, 'rb') as f:
contents = f.read()

manifest_contents[filename] = contents

for filename in segment_files:
# Check if the thread has been interrupted.
if not self._is_running():
return

source_path = os.path.join(self._input_dir, filename)
destination_path = self._subdir_path + '/' + filename
self._sync_file(source_path, destination_path)

for filename, contents in manifest_contents.items():
# Check if the thread has been interrupted.
if not self._is_running():
return

destination_path = self._subdir_path + '/' + filename
self._upload_string(contents, destination_path)

# Finally, list blobs and delete any that don't exist locally. This will
# help avoid excessive storage costs from content that is outside the
# availability window. We use the prefix parameter to limit ourselves to
# the folder this client is uploading to.
all_blobs = self._storage_client.list_blobs(self._bucket,
prefix=self._subdir_path + '/')
for blob in all_blobs:
# Check if the thread has been interrupted.
if not self._is_running():
return

assert blob.name.startswith(self._subdir_path + '/')
filename = blob.name.replace(self._subdir_path + '/', '')
local_path = os.path.join(self._input_dir, filename)
if not os.path.exists(local_path):
blob.delete()

def _sync_file(self, source_file, dest_blob_name):
blob = self._bucket.blob(dest_blob_name)
blob.cache_control = CACHE_CONTROL_HEADER

try:
if blob.exists(self._storage_client):
blob.reload(self._storage_client)
modified_datetime = os.path.getmtime(source_file)
if modified_datetime <= blob.updated.timestamp():
# We already have an up-to-date copy in cloud storage.
return

blob.upload_from_filename(source_file)

except FileNotFoundError:
# The file was deleted by the Packager between the time we saw it and now.
# Ignore this one.
return

def _upload_string(self, source_string, dest_blob_name):
blob = self._bucket.blob(dest_blob_name)
blob.cache_control = CACHE_CONTROL_HEADER
blob.upload_from_string(source_string)

def _is_running(self):
return self.check_status() == node_base.ProcessStatus.Running
# Now that we have manifest contents, put them into a temp file so that
# the manifests can be pushed en masse later.
temp_file_path = os.path.join(self._temp_dir, subdir_path)
# Create any necessary intermediate folders.
temp_file_dir_path = os.path.dirname(temp_file_path)
os.makedirs(temp_file_dir_path, exist_ok=True)
# Write the temp file.
with open(temp_file_path, 'wb') as f:
f.write(contents)

# Sync all files except manifest files.
args = COMMON_GSUTIL_ARGS + [
'-d', # delete remote files that are no longer needed
'-x', '.*m3u8', # skip m3u8 files, which we'll push separately later
'-x', '.*mpd', # skip mpd files, which we'll push separately later
self._input_dir, # local input folder to sync
self._bucket_url, # destination in cloud storage
]
# NOTE: The -d option above will not result in the files ignored by -x
# being deleted from the remote storage location.
subprocess.check_call(args)

# Sync the temporary copies of the manifest files.
args = COMMON_GSUTIL_ARGS + [
'-J', # compress all files in transit, since they are text
self._temp_dir, # local input folder to sync
self._bucket_url, # destination in cloud storage
]
subprocess.check_call(args)
8 changes: 5 additions & 3 deletions streamer/controller_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import tempfile
import uuid

from . import cloud_node
from . import input_configuration
from . import loop_input_node
from . import metadata
Expand Down Expand Up @@ -171,10 +172,11 @@ def start(self, output_dir, input_config_dict, pipeline_config_dict, bucket_url=
self._nodes.append(package_node)

if bucket_url:
# Import the cloud node late, so that the cloud deps are optional.
from . import cloud_node
cloud_temp_dir = os.path.join(self._temp_dir, 'cloud')
os.mkdir(cloud_temp_dir)

push_to_cloud = cloud_node.CloudNode(output_dir, bucket_url,
self._temp_dir)
cloud_temp_dir)
self._nodes.append(push_to_cloud)

for node in self._nodes:
Expand Down

0 comments on commit 8f7b5e4

Please sign in to comment.