Skip to content

Commit

Permalink
Factor out threading functionality to a base class
Browse files Browse the repository at this point in the history
Currently, only the CloudNode is using a thread.  But it is easier to
read and understand the functionality of CloudNode if the threading
code is moved out to a base class.

This refactor is groundwork for improving CloudNode and switching to
gsutil for rsync/upload.

Issue #19

Change-Id: Iaa712d32ca7854cda0799011b5ca72cf6ec59b02
  • Loading branch information
joeyparrish committed Oct 7, 2019
1 parent 67ac9e4 commit 3c87c88
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 39 deletions.
47 changes: 9 additions & 38 deletions streamer/cloud_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@

from google.cloud import storage

import glob
import json
import os
import shutil
import sys
import threading
import time

from . import node_base
Expand All @@ -37,34 +32,20 @@
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control
CACHE_CONTROL_HEADER = 'no-store, no-transform'

class CloudNode(node_base.NodeBase):

class CloudNode(node_base.ThreadedNodeBase):
def __init__(self, input_dir, bucket_url, temp_dir):
node_base.NodeBase.__init__(self)
super().__init__(thread_name='cloud', continue_on_exception=True)
self._input_dir = input_dir
self._temp_dir = temp_dir
self._storage_client = storage.Client()
self._running = True
self._bucket_url = bucket_url
bucket, path = self._bucket_url.replace('gs://', '').split('/', 1)
self._bucket = self._storage_client.get_bucket(bucket)
# Strip trailing slashes to make sure we don't construct paths later like
# foo//bar, which is _not_ the same as foo/bar in Google Cloud Storage.
self._subdir_path = path.rstrip('/')
self._thread = threading.Thread(target=self._thread_main, name='cloud')

def _thread_main(self):
while self._running:
try:
self._upload_all()
except:
print('Exception in cloud upload:', sys.exc_info())
print('Cloud upload continuing.')

# Yield time to other threads.
time.sleep(1)

def _upload_all(self):
def _thread_single_pass(self):
all_files = os.listdir(self._input_dir)
is_manifest_file = lambda x: x.endswith('.mpd') or x.endswith('.m3u8')
manifest_files = filter(is_manifest_file, all_files)
Expand All @@ -83,7 +64,7 @@ def _upload_all(self):

# Capture manifest contents, and retry until the file is non-empty or
# until the thread is killed.
while not contents and self._running:
while not contents and self._is_running():
time.sleep(0.1)

with open(source_path, 'rb') as f:
Expand All @@ -93,7 +74,7 @@ def _upload_all(self):

for filename in segment_files:
# Check if the thread has been interrupted.
if not self._running:
if not self._is_running():
return

source_path = os.path.join(self._input_dir, filename)
Expand All @@ -102,7 +83,7 @@ def _upload_all(self):

for filename, contents in manifest_contents.items():
# Check if the thread has been interrupted.
if not self._running:
if not self._is_running():
return

destination_path = self._subdir_path + '/' + filename
Expand All @@ -116,7 +97,7 @@ def _upload_all(self):
prefix=self._subdir_path + '/')
for blob in all_blobs:
# Check if the thread has been interrupted.
if not self._running:
if not self._is_running():
return

assert blob.name.startswith(self._subdir_path + '/')
Expand Down Expand Up @@ -149,15 +130,5 @@ def _upload_string(self, source_string, dest_blob_name):
blob.cache_control = CACHE_CONTROL_HEADER
blob.upload_from_string(source_string)

def start(self):
self._thread.start()

def stop(self):
self._running = False
self._thread.join()

def check_status(self):
if self._running:
return node_base.ProcessStatus.Running
else:
return node_base.ProcessStatus.Finished
def _is_running(self):
return self.check_status() == node_base.ProcessStatus.Running
58 changes: 57 additions & 1 deletion streamer/node_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""A base class for nodes that run a single subprocess."""
"""Base classes for nodes."""

import abc
import enum
import os
import shlex
import subprocess
import sys
import threading
import time


class ProcessStatus(enum.Enum):
# Use number values so we can sort based on value.
Running = 0
Expand All @@ -29,6 +32,7 @@ class ProcessStatus(enum.Enum):


class NodeBase(object):
"""A base class for nodes that run a single subprocess."""

@abc.abstractmethod
def __init__(self):
Expand Down Expand Up @@ -107,3 +111,55 @@ def stop(self):
# to ignore a kill signal, so this will happen quickly. If we don't do
# this, it can create a zombie process.
self._process.wait()


class ThreadedNodeBase(NodeBase):
"""A base class for nodes that run a thread which repeats some callback in a
background thread."""

def __init__(self, thread_name, continue_on_exception):
super().__init__()
self._status = ProcessStatus.Finished
self._thread_name = thread_name
self._continue_on_exception = continue_on_exception
self._thread = threading.Thread(target=self._thread_main, name=thread_name)

def _thread_main(self):
while self._status == ProcessStatus.Running:
try:
self._thread_single_pass()
except:
print('Exception in', self._thread_name, '-', sys.exc_info())

if self._continue_on_exception:
print('Continuing.')
else:
print('Quitting.')
self._status = ProcessStatus.Errored
return

# Yield time to other threads.
time.sleep(1)

@abc.abstractmethod
def _thread_single_pass(self):
"""Runs a single step of the thread loop.
This is implemented by subclasses to do whatever it is they do. It will be
called repeatedly by the base class from the node's background thread. If
this method raises an exception, the behavior depends on the
continue_on_exception argument in the constructor. If
continue_on_exception is true, the the thread will continue. Otherwise, an
exception will stop the thread and therefore the node."""
pass

def start(self):
self._status = ProcessStatus.Running
self._thread.start()

def stop(self):
self._status = ProcessStatus.Finished
self._thread.join()

def check_status(self):
return self._status

0 comments on commit 3c87c88

Please sign in to comment.