diff --git a/contrib/runners/local_runner/local_runner.py b/contrib/runners/local_runner/local_runner.py index bb5afb6799..49c2c9d7ad 100644 --- a/contrib/runners/local_runner/local_runner.py +++ b/contrib/runners/local_runner/local_runner.py @@ -16,6 +16,7 @@ import os import pwd import uuid +import functools from StringIO import StringIO from oslo_config import cfg @@ -33,8 +34,7 @@ from st2common.util.green import shell from st2common.util.shell import kill_process from st2common.util import jsonify -from st2common.services.action import store_execution_stdout_line -from st2common.services.action import store_execution_stderr_line +from st2common.services.action import store_execution_output_data from st2common.runners.utils import make_read_and_store_stream_func __all__ = [ @@ -145,6 +145,9 @@ def run(self, action_parameters): stdout = StringIO() stderr = StringIO() + store_execution_stdout_line = functools.partial(store_execution_output_data, type='stdout') + store_execution_stderr_line = functools.partial(store_execution_output_data, type='stderr') + read_and_store_stdout = make_read_and_store_stream_func(execution_db=self.execution, action_db=self.action, store_line_func=store_execution_stdout_line) read_and_store_stderr = make_read_and_store_stream_func(execution_db=self.execution, diff --git a/contrib/runners/python_runner/python_runner.py b/contrib/runners/python_runner/python_runner.py index a926c65f60..db1f1cf6a7 100644 --- a/contrib/runners/python_runner/python_runner.py +++ b/contrib/runners/python_runner/python_runner.py @@ -18,6 +18,7 @@ import sys import json import uuid +import functools from StringIO import StringIO from subprocess import list2cmdline @@ -41,8 +42,7 @@ from st2common.util.sandboxing import get_sandbox_python_binary_path from st2common.util.sandboxing import get_sandbox_virtualenv_path from st2common.runners import python_action_wrapper -from st2common.services.action import store_execution_stdout_line -from st2common.services.action import store_execution_stderr_line +from st2common.services.action import store_execution_output_data from st2common.runners.utils import make_read_and_store_stream_func LOG = logging.getLogger(__name__) @@ -149,6 +149,9 @@ def run(self, action_parameters): stdout = StringIO() stderr = StringIO() + store_execution_stdout_line = functools.partial(store_execution_output_data, type='stdout') + store_execution_stderr_line = functools.partial(store_execution_output_data, type='stderr') + read_and_store_stdout = make_read_and_store_stream_func(execution_db=self.execution, action_db=self.action, store_line_func=store_execution_stdout_line) read_and_store_stderr = make_read_and_store_stream_func(execution_db=self.execution, diff --git a/st2common/st2common/models/db/execution.py b/st2common/st2common/models/db/execution.py index 782b454ff4..b68be5125c 100644 --- a/st2common/st2common/models/db/execution.py +++ b/st2common/st2common/models/db/execution.py @@ -27,8 +27,7 @@ __all__ = [ 'ActionExecutionDB', - 'ActionExecutionStdoutOutputDB', - 'ActionExecutionStderrOutputDB', + 'ActionExecutionOutputDB' ] @@ -123,49 +122,39 @@ def get_masked_parameters(self): return serializable_dict['parameters'] -class ActionExecutionStdoutOutputDB(stormbase.StormFoundationDB): +class ActionExecutionOutputDB(stormbase.StormFoundationDB): """ - Stores stdout output of a particular action. - - New document is inserted dynamically when a new line is received which means you can simulate - tail behavior by periodically reading from this collection. - """ - execution_id = me.StringField(required=True) - action_ref = me.StringField(required=True) - timestamp = me.DateTimeField(required=True, default=date_utils.get_datetime_utc_now) - - line = me.StringField() - - meta = { - 'indexes': [ - {'fields': ['execution_id']}, - {'fields': ['timestamp']}, - {'fields': ['action_ref']} - ] - } - - -class ActionExecutionStderrOutputDB(stormbase.StormFoundationDB): - """ - Stores stderr output of a particular action. - - New document is inserted dynamically when a new line is received which means you can simulate - tail behavior by periodically reading from this collection. + Stores output of a particular execution. + + New document is inserted dynamically when a new chunk / line is received which means you can + simulate tail behavior by periodically reading from this collection. + + Attribute: + execution_id: ID of the execution to which this output belongs. + action_ref: Parent action reference. + runner_ref: Parent action runner reference. + timestamp: Timestamp when this output has been produced / received. + type: Type of the output (e.g. stdout, stderr, output) + data: Actual output data. This could either be line, chunk or similar, depending on the + runner. """ execution_id = me.StringField(required=True) action_ref = me.StringField(required=True) + runner_ref = me.StringField(required=True) timestamp = me.DateTimeField(required=True, default=date_utils.get_datetime_utc_now) + type = me.StringField(required=True, default='output') - line = me.StringField() + data = me.StringField() meta = { 'indexes': [ {'fields': ['execution_id']}, + {'fields': ['action_ref']}, + {'fields': ['runner_ref']}, {'fields': ['timestamp']}, - {'fields': ['action_ref']} + {'fields': ['type']} ] } -MODELS = [ActionExecutionDB, ActionExecutionStdoutOutputDB, - ActionExecutionStderrOutputDB] +MODELS = [ActionExecutionDB, ActionExecutionOutputDB] diff --git a/st2common/st2common/runners/paramiko_ssh_runner.py b/st2common/st2common/runners/paramiko_ssh_runner.py index 74569d7fec..aacb77ceb8 100644 --- a/st2common/st2common/runners/paramiko_ssh_runner.py +++ b/st2common/st2common/runners/paramiko_ssh_runner.py @@ -26,8 +26,7 @@ from st2common.constants.action import LIVEACTION_STATUS_FAILED from st2common.constants.runners import REMOTE_RUNNER_DEFAULT_ACTION_TIMEOUT from st2common.exceptions.actionrunner import ActionRunnerPreRunError -from st2common.services.action import store_execution_stdout_line -from st2common.services.action import store_execution_stderr_line +from st2common.services.action import store_execution_output_data __all__ = [ 'BaseParallelSSHRunner' @@ -128,16 +127,16 @@ def pre_run(self): def make_store_stdout_line_func(execution_db, action_db): def store_stdout_line(line): if cfg.CONF.actionrunner.stream_output: - store_execution_stdout_line(execution_db=execution_db, action_db=action_db, - line=line) + store_execution_output_data(execution_db=execution_db, action_db=action_db, + data=line, type='stdout') return store_stdout_line def make_store_stderr_line_func(execution_db, action_db): def store_stderr_line(line): if cfg.CONF.actionrunner.stream_output: - store_execution_stderr_line(execution_db=execution_db, action_db=action_db, - line=line) + store_execution_output_data(execution_db=execution_db, action_db=action_db, + data=line, type='stderr') return store_stderr_line diff --git a/st2common/st2common/services/action.py b/st2common/st2common/services/action.py index 96a8c5ccb5..5c6b324271 100644 --- a/st2common/st2common/services/action.py +++ b/st2common/st2common/services/action.py @@ -22,10 +22,8 @@ from st2common.exceptions import trace as trace_exc from st2common.persistence.liveaction import LiveAction from st2common.persistence.execution import ActionExecution -from st2common.persistence.execution import ActionExecutionStdoutOutput -from st2common.persistence.execution import ActionExecutionStderrOutput -from st2common.models.db.execution import ActionExecutionStdoutOutputDB -from st2common.models.db.execution import ActionExecutionStderrOutputDB +from st2common.persistence.execution import ActionExecutionOutput +from st2common.models.db.execution import ActionExecutionOutputDB from st2common.services import executions from st2common.services import trace as trace_service from st2common.util import date as date_utils @@ -42,8 +40,7 @@ 'request_pause', 'request_resume', - 'store_execution_stdout_line', - 'store_execution_stderr_line' + 'store_execution_output_data', ] LOG = logging.getLogger(__name__) @@ -318,40 +315,25 @@ def request_resume(liveaction, requester): return (liveaction, execution) -def store_execution_stdout_line(execution_db, action_db, line, timestamp=None): +def store_execution_output_data(execution_db, action_db, data, type='output', timestamp=None): """ - Store a line from stdout from a particular execution in the database. + Store output from an execution as a new document in the collection. """ execution_id = str(execution_db.id) action_ref = action_db.ref + runner_ref = action_db.runner_type['name'] timestamp = timestamp or date_utils.get_datetime_utc_now() - model_db = ActionExecutionStdoutOutputDB(execution_id=execution_id, - action_ref=action_ref, - timestamp=timestamp, - line=line) - model_db = ActionExecutionStdoutOutput.add_or_update(model_db, publish=True, - dispatch_trigger=False) + output_db = ActionExecutionOutputDB(execution_id=execution_id, + action_ref=action_ref, + runner_ref=runner_ref, + timestamp=timestamp, + type=type, + data=data) + output_db = ActionExecutionOutput.add_or_update(output_db, publish=True, + dispatch_trigger=False) - return model_db - - -def store_execution_stderr_line(execution_db, action_db, line, timestamp=None): - """ - Store a line from stderr from a particular execution in the database. - """ - execution_id = str(execution_db.id) - action_ref = action_db.ref - timestamp = timestamp or date_utils.get_datetime_utc_now() - - model_db = ActionExecutionStderrOutputDB(execution_id=execution_id, - action_ref=action_ref, - timestamp=timestamp, - line=line) - model_db = ActionExecutionStderrOutput.add_or_update(model_db, publish=True, - dispatch_trigger=False) - - return model_db + return output_db def _cleanup_liveaction(liveaction):