Skip to content

Commit

Permalink
Update execution output stream to utilize a single "generic" collection
Browse files Browse the repository at this point in the history
for storing the output instead of utilizing two collections.

This approach makes whole thing more generic and makes it more
future-proof and work with other runner which doesn't necessary have two
streams (stdout, stderr).

Now "type" argument on the document stores type of the output and it
means each runner execution can produce as little or as many output
types as it needs.

Most of the existing runners will produce two type of output (stdout,
stderr), but it's likely that in the future we will have other runners
which will just produce one type of the output.
  • Loading branch information
Kami committed Sep 11, 2017
1 parent 319eb9c commit 03c37d4
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 76 deletions.
7 changes: 5 additions & 2 deletions contrib/runners/local_runner/local_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import os
import pwd
import uuid
import functools
from StringIO import StringIO

from oslo_config import cfg
Expand All @@ -33,8 +34,7 @@
from st2common.util.green import shell
from st2common.util.shell import kill_process
from st2common.util import jsonify
from st2common.services.action import store_execution_stdout_line
from st2common.services.action import store_execution_stderr_line
from st2common.services.action import store_execution_output_data
from st2common.runners.utils import make_read_and_store_stream_func

__all__ = [
Expand Down Expand Up @@ -145,6 +145,9 @@ def run(self, action_parameters):
stdout = StringIO()
stderr = StringIO()

store_execution_stdout_line = functools.partial(store_execution_output_data, type='stdout')
store_execution_stderr_line = functools.partial(store_execution_output_data, type='stderr')

read_and_store_stdout = make_read_and_store_stream_func(execution_db=self.execution,
action_db=self.action, store_line_func=store_execution_stdout_line)
read_and_store_stderr = make_read_and_store_stream_func(execution_db=self.execution,
Expand Down
7 changes: 5 additions & 2 deletions contrib/runners/python_runner/python_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import sys
import json
import uuid
import functools
from StringIO import StringIO
from subprocess import list2cmdline

Expand All @@ -41,8 +42,7 @@
from st2common.util.sandboxing import get_sandbox_python_binary_path
from st2common.util.sandboxing import get_sandbox_virtualenv_path
from st2common.runners import python_action_wrapper
from st2common.services.action import store_execution_stdout_line
from st2common.services.action import store_execution_stderr_line
from st2common.services.action import store_execution_output_data
from st2common.runners.utils import make_read_and_store_stream_func

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -149,6 +149,9 @@ def run(self, action_parameters):
stdout = StringIO()
stderr = StringIO()

store_execution_stdout_line = functools.partial(store_execution_output_data, type='stdout')
store_execution_stderr_line = functools.partial(store_execution_output_data, type='stderr')

read_and_store_stdout = make_read_and_store_stream_func(execution_db=self.execution,
action_db=self.action, store_line_func=store_execution_stdout_line)
read_and_store_stderr = make_read_and_store_stream_func(execution_db=self.execution,
Expand Down
55 changes: 22 additions & 33 deletions st2common/st2common/models/db/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@

__all__ = [
'ActionExecutionDB',
'ActionExecutionStdoutOutputDB',
'ActionExecutionStderrOutputDB',
'ActionExecutionOutputDB'
]


Expand Down Expand Up @@ -123,49 +122,39 @@ def get_masked_parameters(self):
return serializable_dict['parameters']


class ActionExecutionStdoutOutputDB(stormbase.StormFoundationDB):
class ActionExecutionOutputDB(stormbase.StormFoundationDB):
"""
Stores stdout output of a particular action.
New document is inserted dynamically when a new line is received which means you can simulate
tail behavior by periodically reading from this collection.
"""
execution_id = me.StringField(required=True)
action_ref = me.StringField(required=True)
timestamp = me.DateTimeField(required=True, default=date_utils.get_datetime_utc_now)

line = me.StringField()

meta = {
'indexes': [
{'fields': ['execution_id']},
{'fields': ['timestamp']},
{'fields': ['action_ref']}
]
}


class ActionExecutionStderrOutputDB(stormbase.StormFoundationDB):
"""
Stores stderr output of a particular action.
New document is inserted dynamically when a new line is received which means you can simulate
tail behavior by periodically reading from this collection.
Stores output of a particular execution.
New document is inserted dynamically when a new chunk / line is received which means you can
simulate tail behavior by periodically reading from this collection.
Attribute:
execution_id: ID of the execution to which this output belongs.
action_ref: Parent action reference.
runner_ref: Parent action runner reference.
timestamp: Timestamp when this output has been produced / received.
type: Type of the output (e.g. stdout, stderr, output)
data: Actual output data. This could either be line, chunk or similar, depending on the
runner.
"""
execution_id = me.StringField(required=True)
action_ref = me.StringField(required=True)
runner_ref = me.StringField(required=True)
timestamp = me.DateTimeField(required=True, default=date_utils.get_datetime_utc_now)
type = me.StringField(required=True, default='output')

line = me.StringField()
data = me.StringField()

meta = {
'indexes': [
{'fields': ['execution_id']},
{'fields': ['action_ref']},
{'fields': ['runner_ref']},
{'fields': ['timestamp']},
{'fields': ['action_ref']}
{'fields': ['type']}
]
}


MODELS = [ActionExecutionDB, ActionExecutionStdoutOutputDB,
ActionExecutionStderrOutputDB]
MODELS = [ActionExecutionDB, ActionExecutionOutputDB]
11 changes: 5 additions & 6 deletions st2common/st2common/runners/paramiko_ssh_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
from st2common.constants.action import LIVEACTION_STATUS_FAILED
from st2common.constants.runners import REMOTE_RUNNER_DEFAULT_ACTION_TIMEOUT
from st2common.exceptions.actionrunner import ActionRunnerPreRunError
from st2common.services.action import store_execution_stdout_line
from st2common.services.action import store_execution_stderr_line
from st2common.services.action import store_execution_output_data

__all__ = [
'BaseParallelSSHRunner'
Expand Down Expand Up @@ -128,16 +127,16 @@ def pre_run(self):
def make_store_stdout_line_func(execution_db, action_db):
def store_stdout_line(line):
if cfg.CONF.actionrunner.stream_output:
store_execution_stdout_line(execution_db=execution_db, action_db=action_db,
line=line)
store_execution_output_data(execution_db=execution_db, action_db=action_db,
data=line, type='stdout')

return store_stdout_line

def make_store_stderr_line_func(execution_db, action_db):
def store_stderr_line(line):
if cfg.CONF.actionrunner.stream_output:
store_execution_stderr_line(execution_db=execution_db, action_db=action_db,
line=line)
store_execution_output_data(execution_db=execution_db, action_db=action_db,
data=line, type='stderr')

return store_stderr_line

Expand Down
48 changes: 15 additions & 33 deletions st2common/st2common/services/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
from st2common.exceptions import trace as trace_exc
from st2common.persistence.liveaction import LiveAction
from st2common.persistence.execution import ActionExecution
from st2common.persistence.execution import ActionExecutionStdoutOutput
from st2common.persistence.execution import ActionExecutionStderrOutput
from st2common.models.db.execution import ActionExecutionStdoutOutputDB
from st2common.models.db.execution import ActionExecutionStderrOutputDB
from st2common.persistence.execution import ActionExecutionOutput
from st2common.models.db.execution import ActionExecutionOutputDB
from st2common.services import executions
from st2common.services import trace as trace_service
from st2common.util import date as date_utils
Expand All @@ -42,8 +40,7 @@
'request_pause',
'request_resume',

'store_execution_stdout_line',
'store_execution_stderr_line'
'store_execution_output_data',
]

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -318,40 +315,25 @@ def request_resume(liveaction, requester):
return (liveaction, execution)


def store_execution_stdout_line(execution_db, action_db, line, timestamp=None):
def store_execution_output_data(execution_db, action_db, data, type='output', timestamp=None):
"""
Store a line from stdout from a particular execution in the database.
Store output from an execution as a new document in the collection.
"""
execution_id = str(execution_db.id)
action_ref = action_db.ref
runner_ref = action_db.runner_type['name']
timestamp = timestamp or date_utils.get_datetime_utc_now()

model_db = ActionExecutionStdoutOutputDB(execution_id=execution_id,
action_ref=action_ref,
timestamp=timestamp,
line=line)
model_db = ActionExecutionStdoutOutput.add_or_update(model_db, publish=True,
dispatch_trigger=False)
output_db = ActionExecutionOutputDB(execution_id=execution_id,
action_ref=action_ref,
runner_ref=runner_ref,
timestamp=timestamp,
type=type,
data=data)
output_db = ActionExecutionOutput.add_or_update(output_db, publish=True,
dispatch_trigger=False)

return model_db


def store_execution_stderr_line(execution_db, action_db, line, timestamp=None):
"""
Store a line from stderr from a particular execution in the database.
"""
execution_id = str(execution_db.id)
action_ref = action_db.ref
timestamp = timestamp or date_utils.get_datetime_utc_now()

model_db = ActionExecutionStderrOutputDB(execution_id=execution_id,
action_ref=action_ref,
timestamp=timestamp,
line=line)
model_db = ActionExecutionStderrOutput.add_or_update(model_db, publish=True,
dispatch_trigger=False)

return model_db
return output_db


def _cleanup_liveaction(liveaction):
Expand Down

0 comments on commit 03c37d4

Please sign in to comment.