Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Action runner output streaming single collection approach #3729

Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
3226389
Turn garbage collection on by default for action execution output
Kami Sep 1, 2017
0d366a5
Re-generate sample config file.
Kami Sep 5, 2017
f71a4a8
Add CI check which verifies that all the automatically generated files
Kami Sep 5, 2017
acf173f
Run new target on Travis CI.
Kami Sep 5, 2017
f3295df
Remove out of order comment.
Kami Sep 5, 2017
fe0623b
Update the comment.
Kami Sep 5, 2017
0daa95a
Add missing exit statement;
Kami Sep 5, 2017
93c02fb
Update config gen script to use static values for config options which
Kami Sep 5, 2017
55e3b02
Merge pull request #3715 from StackStorm/generated_files_ci_check
Kami Sep 6, 2017
78af14d
Fix mistral callback failure when result contains unicode
m4dcoder Sep 7, 2017
eb3c5c1
Add more unit tests to cover various unicode use cases
m4dcoder Sep 7, 2017
238207c
Fix call to keystone auth in mistral runner
m4dcoder Sep 8, 2017
5287893
Debug Travis
Sep 8, 2017
0a51b75
Merge pull request #3721 from StackStorm/mistral-callback-unicode
m4dcoder Sep 8, 2017
3a2bed7
Debug env for Travis
Sep 8, 2017
ebd2d44
Prevent TravisCI caching for master branch
Sep 8, 2017
9177b5c
Merge pull request #3728 from StackStorm/fix/travis-caching
armab Sep 8, 2017
aa00588
Merge branch 'master' into fix-mistralclient-version
armab Sep 8, 2017
8933deb
Use $TRAVIS_PULL_REQUEST env var for Travis to remove the cache
Sep 8, 2017
2920f00
Remove debug 'env' for Travis
Sep 8, 2017
2038476
Fix missing then
Sep 8, 2017
eef8e3e
Clean up unit tests for execution cancellation
m4dcoder Sep 8, 2017
f01a109
Set status to canceling instead of canceled for execution with parent
m4dcoder Sep 8, 2017
29cf5ad
Separate the imports in test_execution_cancellation
m4dcoder Sep 8, 2017
fe81842
Add change log entry for cancellation of delayed action execution
m4dcoder Sep 8, 2017
447bb72
Merge pull request #3727 from StackStorm/fix-mistralclient-version
armab Sep 8, 2017
4ccf6e2
Merge pull request #3726 from StackStorm/fix-delayed-cancel
m4dcoder Sep 8, 2017
bc73c99
Merge branch 'master' into python_runner_actions_real_time_output
Kami Sep 11, 2017
319eb9c
Merge branch 'python_runner_actions_real_time_output' of github.com:S…
Kami Sep 11, 2017
03c37d4
Update execution output stream to utilize a single "generic" collection
Kami Sep 11, 2017
61f0128
Use complex date time field for larger precision.
Kami Sep 12, 2017
09eff50
Change field name to output_type from type so it doesn't clash with
Kami Sep 12, 2017
bfdd1b1
Update affected streaming code to utilize one model approach.
Kami Sep 12, 2017
ec46475
Update affected runner tests.
Kami Sep 12, 2017
c8b2b9f
Update affected stream tests.
Kami Sep 12, 2017
52a6552
Add more tests / asserts for event stream API endpoint.
Kami Sep 12, 2017
3b05bfb
Update changelog.
Kami Sep 12, 2017
284e23e
Re-generate openapi.yaml file.
Kami Sep 12, 2017
15c4d6b
Update affected garbage collection code.
Kami Sep 12, 2017
7d2b219
Implement output type filter on the executions output API endpoint.
Kami Sep 12, 2017
2f6be57
Update more affected code.
Kami Sep 12, 2017
3389d6e
Re-generate sample config.
Kami Sep 12, 2017
e04403a
Add tests for action execution output API endpoint.
Kami Sep 12, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ install:
script:
- make ${TASK}

# Don't store cache for target PR branch (typically `master`), because it will be re-used for opened PRs
# See: https://docs.travis-ci.com/user/caching/#Pull-request-builds-and-caches
# Alternative: use strict pip pinning, including git-based pip packages
before_cache:
- if [ ${TRAVIS_PULL_REQUEST} = 'false' ]; then rm -rf virtualenv/; fi

after_success:
- if [ ${TASK} = 'ci-unit' ] || [ ${TASK} = 'ci-integration' ]; then codecov; fi

Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ Fixed
performed the API operation.(bug fix) #3693 #3696

Reported by theuiz.
* Fix mistral callback failure when result contains unicode. (bug fix)
* Fix cancellation of delayed action execution for tasks in workflow. (bug fix)

Changed
~~~~~~~
Expand Down
21 changes: 19 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ configgen: requirements .configgen
@echo
@echo "================== config gen ===================="
@echo
# Lint st2 components
echo "# Sample config which contains all the available options which the corresponding descriptions" > conf/st2.conf.sample;
echo "# Note: This file is automatically generated using tools/config_gen.py - DO NOT UPDATE MANUALLY" >> conf/st2.conf.sample
echo "" >> conf/st2.conf.sample
Expand Down Expand Up @@ -450,7 +449,7 @@ debs:
ci: ci-checks ci-unit ci-integration ci-mistral ci-packs-tests

.PHONY: ci-checks
ci-checks: compile .pylint .flake8 .bandit .st2client-dependencies-check .st2common-circular-dependencies-check circle-lint-api-spec .rst-check
ci-checks: compile .generated-files-check .pylint .flake8 .bandit .st2client-dependencies-check .st2common-circular-dependencies-check circle-lint-api-spec .rst-check

.PHONY: .rst-check
.rst-check:
Expand All @@ -459,6 +458,24 @@ ci-checks: compile .pylint .flake8 .bandit .st2client-dependencies-check .st2com
@echo
. $(VIRTUALENV_DIR)/bin/activate; rstcheck --report warning CHANGELOG.rst

.PHONY: .generated-files-check
.generated-files-check:
# Verify that all the files which are automatically generated have indeed been re-generated and
# committed
@echo "==================== generated-files-check ===================="

# 1. Sample config - conf/st2.conf.sample
cp conf/st2.conf.sample /tmp/st2.conf.sample.upstream
make .configgen
diff conf/st2.conf.sample /tmp/st2.conf.sample.upstream || (echo "conf/st2.conf.sample hasn't been re-generated and committed. Please run \"make configgen\" and include and commit the generated file." && exit 1)
# 2. OpenAPI definition file - st2common/st2common/openapi.yaml (generated from
# st2common/st2common/openapi.yaml.j2)
cp st2common/st2common/openapi.yaml /tmp/openapi.yaml.upstream
make .generate-api-spec
diff st2common/st2common/openapi.yaml /tmp/openapi.yaml.upstream || (echo "st2common/st2common/openapi.yaml hasn't been re-generated and committed. Please run \"make generate-api-spec\" and include and commit the generated file." && exit 1)

@echo "All automatically generated files are up to date."

.PHONY: ci-unit
ci-unit: .unit-tests-coverage-html

Expand Down
2 changes: 1 addition & 1 deletion conf/st2.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ key = /etc/apache2/ssl/mycert.key
use_ssl = False
# Port on which the service should listen on.
port = 9100
# Authentication backend to use in a standalone mode. Available backends: flat_file, ldap.
# Authentication backend to use in a standalone mode. Available backends: flat_file.
backend = flat_file

[cloudslang]
Expand Down
7 changes: 5 additions & 2 deletions contrib/runners/local_runner/local_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import os
import pwd
import uuid
import functools
from StringIO import StringIO

from oslo_config import cfg
Expand All @@ -33,8 +34,7 @@
from st2common.util.green import shell
from st2common.util.shell import kill_process
from st2common.util import jsonify
from st2common.services.action import store_execution_stdout_line
from st2common.services.action import store_execution_stderr_line
from st2common.services.action import store_execution_output_data
from st2common.runners.utils import make_read_and_store_stream_func

__all__ = [
Expand Down Expand Up @@ -145,6 +145,9 @@ def run(self, action_parameters):
stdout = StringIO()
stderr = StringIO()

store_execution_stdout_line = functools.partial(store_execution_output_data, type='stdout')
store_execution_stderr_line = functools.partial(store_execution_output_data, type='stderr')

read_and_store_stdout = make_read_and_store_stream_func(execution_db=self.execution,
action_db=self.action, store_line_func=store_execution_stdout_line)
read_and_store_stderr = make_read_and_store_stream_func(execution_db=self.execution,
Expand Down
24 changes: 24 additions & 0 deletions contrib/runners/mistral_v2/callback/mistral_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import json
import re
import retrying
import six

from oslo_config import cfg
from mistralclient.api import client as mistral
Expand Down Expand Up @@ -88,6 +89,27 @@ def _update_action_execution(cls, url, data):

client.action_executions.update(action_execution_id, **data)

@classmethod
def _encode(cls, value):
if isinstance(value, dict):
return {k: cls._encode(v) for k, v in six.iteritems(value)}
elif isinstance(value, list):
return [cls._encode(item) for item in value]
elif isinstance(value, six.string_types):
try:
value = value.decode('utf-8')
except Exception:
LOG.exception('Unable to decode value to utf-8.')

try:
value = value.encode('unicode_escape')
except Exception:
LOG.exception('Unable to unicode escape value.')

return value
else:
return value

@classmethod
def callback(cls, url, context, status, result):
if status not in MISTRAL_ACCEPTED_STATES:
Expand All @@ -100,7 +122,9 @@ def callback(cls, url, context, status, result):
if type(value) in [dict, list]:
result = value

result = cls._encode(result)
output = json.dumps(result) if type(result) in [dict, list] else str(result)
output = output.replace('\\\\\\\\u', '\\\\u')
data = {'state': STATUS_MAP[status], 'output': output}

cls._update_action_execution(url, data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def test_launch_workflow_with_mistral_auth(self):
'cacert': None
}

keystone.KeystoneAuthHandler.authenticate.assert_called_with(auth_req)
keystone.KeystoneAuthHandler.authenticate.assert_called_with(auth_req, session=None)

executions.ExecutionManager.create.assert_called_with(
WF1_NAME, workflow_input=workflow_input, env=env)
171 changes: 171 additions & 0 deletions contrib/runners/mistral_v2/tests/unit/test_mistral_v2_callback.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- coding: UTF-8 -*-
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
Expand Down Expand Up @@ -110,22 +111,65 @@ def test_callback_handler_with_result_as_text(self):
action_constants.LIVEACTION_STATUS_SUCCEEDED,
'<html></html>')

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='<html></html>'
)

@mock.patch.object(
action_executions.ActionExecutionManager, 'update',
mock.MagicMock(return_value=None))
def test_callback_handler_with_result_as_dict(self):
self.callback_class.callback('http://127.0.0.1:8989/v2/action_executions/12345', {},
action_constants.LIVEACTION_STATUS_SUCCEEDED, {'a': 1})

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='{"a": 1}'
)

@mock.patch.object(
action_executions.ActionExecutionManager, 'update',
mock.MagicMock(return_value=None))
def test_callback_handler_with_result_as_json_str(self):
self.callback_class.callback('http://127.0.0.1:8989/v2/action_executions/12345', {},
action_constants.LIVEACTION_STATUS_SUCCEEDED, '{"a": 1}')

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='{"a": 1}'
)

self.callback_class.callback('http://127.0.0.1:8989/v2/action_executions/12345', {},
action_constants.LIVEACTION_STATUS_SUCCEEDED, "{'a': 1}")

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='{"a": 1}'
)

self.callback_class.callback('http://127.0.0.1:8989/v2/action_executions/12345', {},
action_constants.LIVEACTION_STATUS_SUCCEEDED, u"{'a': 1}")

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='{"a": 1}'
)

self.callback_class.callback('http://127.0.0.1:8989/v2/action_executions/12345', {},
action_constants.LIVEACTION_STATUS_SUCCEEDED, "{u'a': u'xyz'}")

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='{"a": "xyz"}'
)

@mock.patch.object(
action_executions.ActionExecutionManager, 'update',
mock.MagicMock(return_value=None))
Expand All @@ -134,6 +178,12 @@ def test_callback_handler_with_result_as_list(self):
action_constants.LIVEACTION_STATUS_SUCCEEDED,
["a", "b", "c"])

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='["a", "b", "c"]'
)

@mock.patch.object(
action_executions.ActionExecutionManager, 'update',
mock.MagicMock(return_value=None))
Expand All @@ -142,6 +192,127 @@ def test_callback_handler_with_result_as_list_str(self):
action_constants.LIVEACTION_STATUS_SUCCEEDED,
'["a", "b", "c"]')

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='["a", "b", "c"]'
)

self.callback_class.callback('http://127.0.0.1:8989/v2/action_executions/12345', {},
action_constants.LIVEACTION_STATUS_SUCCEEDED,
u'["a", "b", "c"]')

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='["a", "b", "c"]'
)

self.callback_class.callback('http://127.0.0.1:8989/v2/action_executions/12345', {},
action_constants.LIVEACTION_STATUS_SUCCEEDED,
'[u"a", "b", "c"]')

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='["a", "b", "c"]'
)

@mock.patch.object(
action_executions.ActionExecutionManager, 'update',
mock.MagicMock(return_value=None))
def test_callback_handler_with_result_unicode_str(self):
self.callback_class.callback('http://127.0.0.1:8989/v2/action_executions/12345', {},
action_constants.LIVEACTION_STATUS_SUCCEEDED, '什麼')

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='\\u4ec0\\u9ebc'
)

@mock.patch.object(
action_executions.ActionExecutionManager, 'update',
mock.MagicMock(return_value=None))
def test_callback_handler_with_result_unicode_encoded_as_ascii_str(self):
self.callback_class.callback('http://127.0.0.1:8989/v2/action_executions/12345', {},
action_constants.LIVEACTION_STATUS_SUCCEEDED, '\u4ec0\u9ebc')

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='\\\\u4ec0\\\\u9ebc'
)

@mock.patch.object(
action_executions.ActionExecutionManager, 'update',
mock.MagicMock(return_value=None))
def test_callback_handler_with_result_unicode_encoded_as_type(self):
self.callback_class.callback('http://127.0.0.1:8989/v2/action_executions/12345', {},
action_constants.LIVEACTION_STATUS_SUCCEEDED, u'\u4ec0\u9ebc')

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='\\u4ec0\\u9ebc'
)

@mock.patch.object(
action_executions.ActionExecutionManager, 'update',
mock.MagicMock(return_value=None))
def test_callback_handler_with_result_as_list_with_unicode_str(self):
self.callback_class.callback('http://127.0.0.1:8989/v2/action_executions/12345', {},
action_constants.LIVEACTION_STATUS_SUCCEEDED,
['\u4ec0\u9ebc'])

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='["\\\\u4ec0\\\\u9ebc"]'
)

@mock.patch.object(
action_executions.ActionExecutionManager, 'update',
mock.MagicMock(return_value=None))
def test_callback_handler_with_result_as_list_with_unicode_type(self):
self.callback_class.callback('http://127.0.0.1:8989/v2/action_executions/12345', {},
action_constants.LIVEACTION_STATUS_SUCCEEDED,
[u'\u4ec0\u9ebc'])

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='["\\\\u4ec0\\\\u9ebc"]'
)

@mock.patch.object(
action_executions.ActionExecutionManager, 'update',
mock.MagicMock(return_value=None))
def test_callback_handler_with_result_as_dict_with_unicode_str(self):
self.callback_class.callback('http://127.0.0.1:8989/v2/action_executions/12345', {},
action_constants.LIVEACTION_STATUS_SUCCEEDED,
{'a': '\u4ec0\u9ebc'})

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='{"a": "\\\\u4ec0\\\\u9ebc"}'
)

@mock.patch.object(
action_executions.ActionExecutionManager, 'update',
mock.MagicMock(return_value=None))
def test_callback_handler_with_result_as_dict_with_unicode_type(self):
self.callback_class.callback('http://127.0.0.1:8989/v2/action_executions/12345', {},
action_constants.LIVEACTION_STATUS_SUCCEEDED,
{'a': u'\u4ec0\u9ebc'})

action_executions.ActionExecutionManager.update.assert_called_with(
'12345',
state='SUCCESS',
output='{"a": "\\\\u4ec0\\\\u9ebc"}'
)

@mock.patch.object(
action_executions.ActionExecutionManager, 'update',
mock.MagicMock(return_value=None))
Expand Down
7 changes: 5 additions & 2 deletions contrib/runners/python_runner/python_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import sys
import json
import uuid
import functools
from StringIO import StringIO
from subprocess import list2cmdline

Expand All @@ -41,8 +42,7 @@
from st2common.util.sandboxing import get_sandbox_python_binary_path
from st2common.util.sandboxing import get_sandbox_virtualenv_path
from st2common.runners import python_action_wrapper
from st2common.services.action import store_execution_stdout_line
from st2common.services.action import store_execution_stderr_line
from st2common.services.action import store_execution_output_data
from st2common.runners.utils import make_read_and_store_stream_func

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -149,6 +149,9 @@ def run(self, action_parameters):
stdout = StringIO()
stderr = StringIO()

store_execution_stdout_line = functools.partial(store_execution_output_data, type='stdout')
store_execution_stderr_line = functools.partial(store_execution_output_data, type='stderr')

read_and_store_stdout = make_read_and_store_stream_func(execution_db=self.execution,
action_db=self.action, store_line_func=store_execution_stdout_line)
read_and_store_stderr = make_read_and_store_stream_func(execution_db=self.execution,
Expand Down
Loading