Skip to content

Commit

Permalink
Refactor handling of stop and is_running.
Browse files Browse the repository at this point in the history
This makes it so we stop all the subprocesses on exit.  First, this
uses a context manager on the controller to stop it when we leave the
block.  Then this also adds a __del__ method to cleanup when we shutdown
if we can.

This also changes how we track the process.  This adds an Errored state
in addition to whether it is running.  This allows us to catch when they
exit with a non-zero exit code.  Now if one of the subprocesses exits
with an error, we will return an error.

Fixes #20

Change-Id: I74bbf54f802648c81d8eea5df999d4648bc424b0
  • Loading branch information
TheModMaker committed Oct 3, 2019
1 parent 09f388e commit 045bef3
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 34 deletions.
20 changes: 18 additions & 2 deletions run_end_to_end_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import threading
import urllib

from streamer import node_base
from streamer.controller_node import ControllerNode

OUTPUT_DIR = 'output_files/'
Expand Down Expand Up @@ -143,15 +144,30 @@ def start():

@app.route('/stop')
def stop():
global controller
resp = createCrossOriginResponse()
if controller is not None:
# Check status to see if one of the processes exited.
if controller.check_status() == node_base.ProcessStatus.Errored:
resp = createCrossOriginResponse(
status=500, body='Some processes exited with non-zero exit codes')

cleanup()
return createCrossOriginResponse()
return resp

@app.route('/output_files/<path:filename>', methods = ['GET','OPTIONS'])
def send_file(filename):
if controller.is_vod():
# If streaming mode is vod, needs to wait until packager is completely
# done packaging contents.
while controller.is_running():
while True:
status = controller.check_status()
if status == node_base.ProcessStatus.Finished:
break
elif status != node_base.ProcessStatus.Running:
return createCrossOriginResponse(
status=500, body='Some processes exited with non-zero exit codes')

time.sleep(1)
else:
# If streaming mode is live, needs to wait for specific content in
Expand Down
30 changes: 11 additions & 19 deletions shaka_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import argparse
import os
import shutil
import sys
import time
import yaml

from streamer import VERSION
from streamer import node_base
from streamer.controller_node import ControllerNode


Expand Down Expand Up @@ -80,25 +82,15 @@ def main():
if not args.cloud_url.startswith('gs://'):
parser.error('Invalid cloud URL, only gs:// URLs are supported currently')

try:
controller.start(args.output, input_config_dict, pipeline_config_dict,
args.cloud_url)
except:
# If the controller throws an exception during startup, we want to call
# stop() to shut down any external processes that have already been started.
# Then, re-raise the exception.
controller.stop()
raise

# Sleep so long as the pipeline is still running.
while controller.is_running():
try:
with controller.start(args.output, input_config_dict, pipeline_config_dict,
args.cloud_url):
# Sleep so long as the pipeline is still running.
while True:
status = controller.check_status()
if status != node_base.ProcessStatus.Running:
return 0 if status == node_base.ProcessStatus.Finished else 1

time.sleep(1)
except KeyboardInterrupt:
# Sometimes ffmpeg/packager take a while to be killed, so this signal
# handler will kill both running processes as there is SIGINT signal.
controller.stop()
break

if __name__ == '__main__':
main()
sys.exit(main())
7 changes: 5 additions & 2 deletions streamer/cloud_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,5 +156,8 @@ def stop(self):
self._running = False
self._thread.join()

def is_running(self):
return self._running
def check_status(self):
if self._running:
return node_base.ProcessStatus.Running
else:
return node_base.ProcessStatus.Finished
20 changes: 17 additions & 3 deletions streamer/controller_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from . import input_configuration
from . import loop_input_node
from . import metadata
from . import node_base
from . import packager_node
from . import pipeline_configuration
from . import transcoder_node
Expand All @@ -53,6 +54,12 @@ def __del__(self):
# Clean up named pipes by removing the temp directory we placed them in.
shutil.rmtree(self._temp_dir)

def __enter__(self):
return self

def __exit__(self, *unused_args):
self.stop()

def _create_pipe(self):
"""Create a uniquely-named named pipe in the node's temp directory.
Expand Down Expand Up @@ -172,10 +179,17 @@ def start(self, output_dir, input_config_dict, pipeline_config_dict, bucket_url=

for node in self._nodes:
node.start()
return self

def is_running(self):
"""Return True if we have nodes and all of them are still running."""
return self._nodes and all(n.is_running() for n in self._nodes)
def check_status(self):
"""Checks the status of all the nodes.
If one node is errored, this returns Errored; otherwise if one node is
finished, this returns Finished; this only returns Running if all nodes are
running.
"""
value = max(node.check_status().value for node in self._nodes)
return node_base.ProcessStatus(value)

def stop(self):
"""Stop all nodes."""
Expand Down
33 changes: 25 additions & 8 deletions streamer/node_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,29 @@
"""A base class for nodes that run a single subprocess."""

import abc
import enum
import shlex
import subprocess
import time

class ProcessStatus(enum.Enum):
# Use number values so we can sort based on value.
Running = 0
Finished = 1
Errored = 2


class NodeBase(object):

@abc.abstractmethod
def __init__(self):
self._process = None

def __del__(self):
# If the process isn't stopped by now, stop it here. It is preferable to
# explicitly call stop().
self.stop()

@abc.abstractmethod
def start(self):
"""Start the subprocess.
Expand All @@ -48,28 +62,31 @@ def _create_process(self, args):
print('+ ' + ' '.join([shlex.quote(arg) for arg in args]))
return subprocess.Popen(args, stdin = subprocess.DEVNULL)

def is_running(self):
"""Returns True if the subprocess is still running, and False otherwise."""
def check_status(self):
"""Returns the current ProcessStatus of the node."""
if not self._process:
return False
raise ValueError('Must have a process to check')

self._process.poll()
if self._process.returncode is not None:
return False
if self._process.returncode is None:
return ProcessStatus.Running

return True
if self._process.returncode == 0:
return ProcessStatus.Finished
else:
return ProcessStatus.Errored

def stop(self):
"""Stop the subprocess if it's still running."""
if self._process:
# Slightly more polite than kill. Try this first.
self._process.terminate()

if self.is_running():
if self.check_status() == ProcessStatus.Running:
# If it's not dead yet, wait 1 second.
time.sleep(1)

if self.is_running():
if self.check_status() == ProcessStatus.Running:
# If it's still not dead, use kill.
self._process.kill()
# Wait for the process to die and read its exit code. There is no way
Expand Down

0 comments on commit 045bef3

Please sign in to comment.