Skip to content

Commit

Permalink
Merge pull request #3729 from StackStorm/python_runner_actions_real_t…
Browse files Browse the repository at this point in the history
…ime_output_single_db_model

Action runner output streaming single collection approach
  • Loading branch information
Kami authored Sep 12, 2017
2 parents 8af7baf + e04403a commit 9a24954
Show file tree
Hide file tree
Showing 35 changed files with 836 additions and 629 deletions.
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ install:
script:
- make ${TASK}

# Don't store cache for target PR branch (typically `master`), because it will be re-used for opened PRs
# See: https://docs.travis-ci.com/user/caching/#Pull-request-builds-and-caches
# Alternative: use strict pip pinning, including git-based pip packages
before_cache:
- if [ ${TRAVIS_PULL_REQUEST} = 'false' ]; then rm -rf virtualenv/; fi

after_success:
- if [ ${TASK} = 'ci-unit' ] || [ ${TASK} = 'ci-integration' ]; then codecov; fi

Expand Down
12 changes: 7 additions & 5 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ Added
* Add new feature which allows runner action output (stdout and stderr) to be streamed
and consumed in real-time by using one of the following approaches:

- ``/v1/executions/<execution id>/stdout`` and ``/v1/executions/<execution id>/stderr`` API
endpoint.
- ``/v1/executions/<execution id>/output[?type=stdout/stderr]`` API endpoint.
- ``/v1/stream/`` stream endpoint and listening for ``st2.execution.stdout__create`` and
``st2.execution.stdout__create`` events.
- ``st2 execution tail <execution id>`` CLI command (underneath it uses stream API endpoint).
``st2.execution.output__create`` ``/v1/stream`` stream API endpoint events.
- ``st2 execution tail <execution id> [--type=stdout/stderr]`` CLI command (underneath it uses
stream API endpoint).

Right now this functionality is available for the following runners:

Expand All @@ -27,7 +27,7 @@ Added
Note: This feature is still experimental and it's disabled by default (opt-in). To enable it,
set ``actionrunner.stream_output`` config option to ``True``.

(new feature) #2175 #3657
(new feature) #2175 #3657 #3729

Fixed
~~~~~
Expand All @@ -37,6 +37,8 @@ Fixed
performed the API operation.(bug fix) #3693 #3696

Reported by theuiz.
* Fix mistral callback failure when result contains unicode. (bug fix)
* Fix cancellation of delayed action execution for tasks in workflow. (bug fix)

Changed
~~~~~~~
Expand Down
21 changes: 19 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ configgen: requirements .configgen
@echo
@echo "================== config gen ===================="
@echo
# Lint st2 components
echo "# Sample config which contains all the available options which the corresponding descriptions" > conf/st2.conf.sample;
echo "# Note: This file is automatically generated using tools/config_gen.py - DO NOT UPDATE MANUALLY" >> conf/st2.conf.sample
echo "" >> conf/st2.conf.sample
Expand Down Expand Up @@ -450,7 +449,7 @@ debs:
ci: ci-checks ci-unit ci-integration ci-mistral ci-packs-tests

.PHONY: ci-checks
ci-checks: compile .pylint .flake8 .bandit .st2client-dependencies-check .st2common-circular-dependencies-check circle-lint-api-spec .rst-check
ci-checks: compile .generated-files-check .pylint .flake8 .bandit .st2client-dependencies-check .st2common-circular-dependencies-check circle-lint-api-spec .rst-check

.PHONY: .rst-check
.rst-check:
Expand All @@ -459,6 +458,24 @@ ci-checks: compile .pylint .flake8 .bandit .st2client-dependencies-check .st2com
@echo
. $(VIRTUALENV_DIR)/bin/activate; rstcheck --report warning CHANGELOG.rst

.PHONY: .generated-files-check
.generated-files-check:
# Verify that all the files which are automatically generated have indeed been re-generated and
# committed
@echo "==================== generated-files-check ===================="

# 1. Sample config - conf/st2.conf.sample
cp conf/st2.conf.sample /tmp/st2.conf.sample.upstream
make .configgen
diff conf/st2.conf.sample /tmp/st2.conf.sample.upstream || (echo "conf/st2.conf.sample hasn't been re-generated and committed. Please run \"make configgen\" and include and commit the generated file." && exit 1)
# 2. OpenAPI definition file - st2common/st2common/openapi.yaml (generated from
# st2common/st2common/openapi.yaml.j2)
cp st2common/st2common/openapi.yaml /tmp/openapi.yaml.upstream
make .generate-api-spec
diff st2common/st2common/openapi.yaml /tmp/openapi.yaml.upstream || (echo "st2common/st2common/openapi.yaml hasn't been re-generated and committed. Please run \"make generate-api-spec\" and include and commit the generated file." && exit 1)

@echo "All automatically generated files are up to date."

.PHONY: ci-unit
ci-unit: .unit-tests-coverage-html

Expand Down
8 changes: 6 additions & 2 deletions conf/st2.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ key = /etc/apache2/ssl/mycert.key
use_ssl = False
# Port on which the service should listen on.
port = 9100
# Authentication backend to use in a standalone mode. Available backends: flat_file, ldap.
# Authentication backend to use in a standalone mode. Available backends: flat_file.
backend = flat_file

[cloudslang]
Expand Down Expand Up @@ -135,12 +135,16 @@ logging = conf/logging.exporter.conf
dump_dir = /opt/stackstorm/exports/

[garbagecollector]
# Action executions older than this value (days) will be automatically deleted.
# Action executions and related objects (live actions, action output objects) older than this value (days) will be automatically deleted.
action_executions_ttl = None
# Trigger instances older than this value (days) will be automatically deleted.
trigger_instances_ttl = None
# Location of the logging configuration file.
logging = conf/logging.garbagecollector.conf
# How long to wait / sleep (in seconds) between collection of different object types.
sleep_delay = 2
# Action execution output objects (ones generated by action output streaming) older than this value (days) will be automatically deleted.
action_executions_output_ttl = 7
# How often to check database for old data and perform garbage collection.
collection_interval = 600

Expand Down
13 changes: 9 additions & 4 deletions contrib/runners/local_runner/local_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import os
import pwd
import uuid
import functools
from StringIO import StringIO

from oslo_config import cfg
Expand All @@ -33,8 +34,7 @@
from st2common.util.green import shell
from st2common.util.shell import kill_process
from st2common.util import jsonify
from st2common.services.action import store_execution_stdout_line
from st2common.services.action import store_execution_stderr_line
from st2common.services.action import store_execution_output_data
from st2common.runners.utils import make_read_and_store_stream_func

__all__ = [
Expand Down Expand Up @@ -145,10 +145,15 @@ def run(self, action_parameters):
stdout = StringIO()
stderr = StringIO()

store_execution_stdout_line = functools.partial(store_execution_output_data,
output_type='stdout')
store_execution_stderr_line = functools.partial(store_execution_output_data,
output_type='stderr')

read_and_store_stdout = make_read_and_store_stream_func(execution_db=self.execution,
action_db=self.action, store_line_func=store_execution_stdout_line)
action_db=self.action, store_data_func=store_execution_stdout_line)
read_and_store_stderr = make_read_and_store_stream_func(execution_db=self.execution,
action_db=self.action, store_line_func=store_execution_stderr_line)
action_db=self.action, store_data_func=store_execution_stderr_line)

# Make sure os.setsid is called on each spawned process so that all processes
# are in the same group.
Expand Down
97 changes: 44 additions & 53 deletions contrib/runners/local_runner/tests/integration/test_localrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@

from st2actions.container.service import RunnerContainerService
from st2common.constants import action as action_constants
from st2common.persistence.execution import ActionExecutionStdoutOutput
from st2common.persistence.execution import ActionExecutionStderrOutput
from st2common.persistence.execution import ActionExecutionOutput
from st2tests.fixturesloader import FixturesLoader
from st2tests.fixturesloader import get_fixtures_base_path
from st2common.util.api import get_full_public_api_url
Expand Down Expand Up @@ -72,11 +71,8 @@ def test_shell_command_action_basic(self):
cfg.CONF.set_override(name='stream_output', group='actionrunner', override=True)

# Verify initial state
stdout_dbs = ActionExecutionStdoutOutput.get_all()
self.assertEqual(len(stdout_dbs), 0)

stderr_dbs = ActionExecutionStderrOutput.get_all()
self.assertEqual(len(stderr_dbs), 0)
output_dbs = ActionExecutionOutput.get_all()
self.assertEqual(len(output_dbs), 0)

runner = self._get_runner(action_db, cmd='echo 10')
runner.pre_run()
Expand All @@ -86,12 +82,10 @@ def test_shell_command_action_basic(self):
self.assertEquals(status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
self.assertEquals(result['stdout'], 10)

stdout_dbs = ActionExecutionStdoutOutput.get_all()
self.assertEqual(len(stdout_dbs), 1)
self.assertEqual(stdout_dbs[0].line, '10\n')

stderr_dbs = ActionExecutionStderrOutput.get_all()
self.assertEqual(len(stderr_dbs), 0)
output_dbs = ActionExecutionOutput.get_all()
self.assertEqual(len(output_dbs), 1)
self.assertEqual(output_dbs[0].output_type, 'stdout')
self.assertEqual(output_dbs[0].data, '10\n')

def test_shell_script_action(self):
models = self.fixtures_loader.load_models(
Expand Down Expand Up @@ -229,16 +223,16 @@ def test_action_stdout_and_stderr_is_stored_in_the_db(self, mock_spawn, mock_pop
self.assertEqual(result['return_code'], 0)

# Verify stdout and stderr lines have been correctly stored in the db
stdout_dbs = ActionExecutionStdoutOutput.get_all()
self.assertEqual(len(stdout_dbs), 2)
self.assertEqual(stdout_dbs[0].line, mock_stdout[0])
self.assertEqual(stdout_dbs[1].line, mock_stdout[1])
output_dbs = ActionExecutionOutput.query(output_type='stdout')
self.assertEqual(len(output_dbs), 2)
self.assertEqual(output_dbs[0].data, mock_stdout[0])
self.assertEqual(output_dbs[1].data, mock_stdout[1])

stderr_dbs = ActionExecutionStderrOutput.get_all()
self.assertEqual(len(stderr_dbs), 3)
self.assertEqual(stderr_dbs[0].line, mock_stderr[0])
self.assertEqual(stderr_dbs[1].line, mock_stderr[1])
self.assertEqual(stderr_dbs[2].line, mock_stderr[2])
output_dbs = ActionExecutionOutput.query(output_type='stderr')
self.assertEqual(len(output_dbs), 3)
self.assertEqual(output_dbs[0].data, mock_stderr[0])
self.assertEqual(output_dbs[1].data, mock_stderr[1])
self.assertEqual(output_dbs[2].data, mock_stderr[2])

@mock.patch('st2common.util.green.shell.subprocess.Popen')
@mock.patch('st2common.util.green.shell.eventlet.spawn')
Expand Down Expand Up @@ -297,7 +291,7 @@ def test_action_stdout_and_stderr_is_stored_in_the_db_short_running_action(self,
self.assertEqual(result['return_code'], 0)

# Verify stdout and stderr lines have been correctly stored in the db
stdout_dbs = ActionExecutionStdoutOutput.get_all()
output_dbs = ActionExecutionOutput.query(output_type='stdout')

if index == 1:
db_index_1 = 0
Expand All @@ -312,14 +306,14 @@ def test_action_stdout_and_stderr_is_stored_in_the_db_short_running_action(self,
db_index_1 = 6
db_index_2 = 7

self.assertEqual(len(stdout_dbs), (index * 2))
self.assertEqual(stdout_dbs[db_index_1].line, mock_stdout[0])
self.assertEqual(stdout_dbs[db_index_2].line, mock_stdout[1])
self.assertEqual(len(output_dbs), (index * 2))
self.assertEqual(output_dbs[db_index_1].data, mock_stdout[0])
self.assertEqual(output_dbs[db_index_2].data, mock_stdout[1])

stderr_dbs = ActionExecutionStderrOutput.get_all()
self.assertEqual(len(stderr_dbs), (index * 2))
self.assertEqual(stderr_dbs[db_index_1].line, mock_stderr[0])
self.assertEqual(stderr_dbs[db_index_2].line, mock_stderr[1])
output_dbs = ActionExecutionOutput.query(output_type='stderr')
self.assertEqual(len(output_dbs), (index * 2))
self.assertEqual(output_dbs[db_index_1].data, mock_stderr[0])
self.assertEqual(output_dbs[db_index_2].data, mock_stderr[1])

@staticmethod
def _get_runner(action_db,
Expand Down Expand Up @@ -427,11 +421,8 @@ def test_script_with_paramters_parameter_serialization(self):
cfg.CONF.set_override(name='stream_output', group='actionrunner', override=True)

# Verify initial state
stdout_dbs = ActionExecutionStdoutOutput.get_all()
self.assertEqual(len(stdout_dbs), 0)

stderr_dbs = ActionExecutionStderrOutput.get_all()
self.assertEqual(len(stderr_dbs), 0)
output_dbs = ActionExecutionOutput.get_all()
self.assertEqual(len(output_dbs), 0)

action_parameters = {
'param_string': 'test string',
Expand All @@ -455,13 +446,13 @@ def test_script_with_paramters_parameter_serialization(self):
self.assertTrue('PARAM_LIST=a,b,c' in result['stdout'])
self.assertTrue('PARAM_OBJECT={"foo": "bar"}' in result['stdout'])

stdout_dbs = ActionExecutionStdoutOutput.get_all()
self.assertEqual(len(stdout_dbs), 6)
self.assertEqual(stdout_dbs[0].line, 'PARAM_STRING=test string\n')
self.assertEqual(stdout_dbs[5].line, 'PARAM_OBJECT={"foo": "bar"}\n')
output_dbs = ActionExecutionOutput.query(output_type='stdout')
self.assertEqual(len(output_dbs), 6)
self.assertEqual(output_dbs[0].data, 'PARAM_STRING=test string\n')
self.assertEqual(output_dbs[5].data, 'PARAM_OBJECT={"foo": "bar"}\n')

stderr_dbs = ActionExecutionStderrOutput.get_all()
self.assertEqual(len(stderr_dbs), 0)
output_dbs = ActionExecutionOutput.query(output_type='stderr')
self.assertEqual(len(output_dbs), 0)

@mock.patch('st2common.util.green.shell.subprocess.Popen')
@mock.patch('st2common.util.green.shell.eventlet.spawn')
Expand Down Expand Up @@ -522,18 +513,18 @@ def test_action_stdout_and_stderr_is_stored_in_the_db(self, mock_spawn, mock_pop
self.assertEqual(result['return_code'], 0)

# Verify stdout and stderr lines have been correctly stored in the db
stdout_dbs = ActionExecutionStdoutOutput.get_all()
self.assertEqual(len(stdout_dbs), 4)
self.assertEqual(stdout_dbs[0].line, mock_stdout[0])
self.assertEqual(stdout_dbs[1].line, mock_stdout[1])
self.assertEqual(stdout_dbs[2].line, mock_stdout[2])
self.assertEqual(stdout_dbs[3].line, mock_stdout[3])

stderr_dbs = ActionExecutionStderrOutput.get_all()
self.assertEqual(len(stderr_dbs), 3)
self.assertEqual(stderr_dbs[0].line, mock_stderr[0])
self.assertEqual(stderr_dbs[1].line, mock_stderr[1])
self.assertEqual(stderr_dbs[2].line, mock_stderr[2])
output_dbs = ActionExecutionOutput.query(output_type='stdout')
self.assertEqual(len(output_dbs), 4)
self.assertEqual(output_dbs[0].data, mock_stdout[0])
self.assertEqual(output_dbs[1].data, mock_stdout[1])
self.assertEqual(output_dbs[2].data, mock_stdout[2])
self.assertEqual(output_dbs[3].data, mock_stdout[3])

output_dbs = ActionExecutionOutput.query(output_type='stderr')
self.assertEqual(len(output_dbs), 3)
self.assertEqual(output_dbs[0].data, mock_stderr[0])
self.assertEqual(output_dbs[1].data, mock_stderr[1])
self.assertEqual(output_dbs[2].data, mock_stderr[2])

def _get_runner(self, action_db, entry_point):
runner = local_runner.LocalShellRunner(uuid.uuid4().hex)
Expand Down
24 changes: 24 additions & 0 deletions contrib/runners/mistral_v2/callback/mistral_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import json
import re
import retrying
import six

from oslo_config import cfg
from mistralclient.api import client as mistral
Expand Down Expand Up @@ -88,6 +89,27 @@ def _update_action_execution(cls, url, data):

client.action_executions.update(action_execution_id, **data)

@classmethod
def _encode(cls, value):
if isinstance(value, dict):
return {k: cls._encode(v) for k, v in six.iteritems(value)}
elif isinstance(value, list):
return [cls._encode(item) for item in value]
elif isinstance(value, six.string_types):
try:
value = value.decode('utf-8')
except Exception:
LOG.exception('Unable to decode value to utf-8.')

try:
value = value.encode('unicode_escape')
except Exception:
LOG.exception('Unable to unicode escape value.')

return value
else:
return value

@classmethod
def callback(cls, url, context, status, result):
if status not in MISTRAL_ACCEPTED_STATES:
Expand All @@ -100,7 +122,9 @@ def callback(cls, url, context, status, result):
if type(value) in [dict, list]:
result = value

result = cls._encode(result)
output = json.dumps(result) if type(result) in [dict, list] else str(result)
output = output.replace('\\\\\\\\u', '\\\\u')
data = {'state': STATUS_MAP[status], 'output': output}

cls._update_action_execution(url, data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def test_launch_workflow_with_mistral_auth(self):
'cacert': None
}

keystone.KeystoneAuthHandler.authenticate.assert_called_with(auth_req)
keystone.KeystoneAuthHandler.authenticate.assert_called_with(auth_req, session=None)

executions.ExecutionManager.create.assert_called_with(
WF1_NAME, workflow_input=workflow_input, env=env)
Loading

0 comments on commit 9a24954

Please sign in to comment.