Skip to content

Commit

Permalink
Fix Validation using old tvars fails for Cylc 7 workflows (#5313)
Browse files Browse the repository at this point in the history
config: fix issues loading template variables from cylc run dirs

* Fail more gracefully for Cylc 7 DBs.
* Use the public DB for compatibility checks.
* Avoid trying to create tables where not appropriate.
  • Loading branch information
wxtim authored Jan 31, 2023
1 parent 0473362 commit 6202e9e
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 29 deletions.
7 changes: 6 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ ones in. -->

### Fixes

[#5312](https://github.com/cylc/cylc-flow/pull/5312) - task names must be comma-separated in queue member lists. Any implicit tasks (i.e. with no task definition under runtime) assigned to a queue will generate a warning.
[#5313](https://github.com/cylc/cylc-flow/pull/5313) - Fix a bug
causing Cylc to be unable to parse previously played Cylc 7 workflows.

[#5312](https://github.com/cylc/cylc-flow/pull/5312) - task names must be
comma-separated in queue member lists. Any implicit tasks
(i.e. with no task definition under runtime) assigned to a queue will generate a warning.

[#5314](https://github.com/cylc/cylc-flow/pull/5314) - Fix broken
command option: `cylc vip --run-name`.
Expand Down
7 changes: 7 additions & 0 deletions cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,13 @@ def _resume(workflow_id, options):
}
pclient('graphql', mutation_kwargs)
sys.exit(0)
except Exception as exc:
LOG.error(exc)
LOG.critical(
'Cannot tell if the workflow is running'
'\nNote, Cylc 8 cannot restart Cylc 7 workflows.'
)
sys.exit(1)


def _version_check(
Expand Down
9 changes: 2 additions & 7 deletions cylc/flow/scripts/report_timings.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ def main(parser: COP, options: 'Values', workflow_id: str) -> None:
# No output specified - choose summary by default
options.show_summary = True

with _get_dao(workflow_id) as dao:
db_file = get_workflow_run_pub_db_path(workflow_id)
with CylcWorkflowDAO(db_file, is_public=True) as dao:
row_buf = format_rows(*dao.select_task_times())
with smart_open(options.output_filename) as output:
if options.show_raw:
Expand Down Expand Up @@ -169,12 +170,6 @@ def format_rows(header, rows):
return sio


def _get_dao(workflow):
"""Return the DAO (public) for workflow."""
return CylcWorkflowDAO(
get_workflow_run_pub_db_path(workflow), is_public=True)


class TimingSummary:
"""Base class for summarizing timing output from cylc.flow run database."""

Expand Down
27 changes: 17 additions & 10 deletions cylc/flow/scripts/validate_reinstall.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@

from cylc.flow import LOG
from cylc.flow.exceptions import ServiceFileError
from cylc.flow.id_cli import parse_id
from cylc.flow.option_parsers import (
WORKFLOW_ID_ARG_DOC,
CylcOptionParser as COP,
combine_options,
log_subcommand,
cleanup_sysargv
)
from cylc.flow.scheduler_cli import PLAY_OPTIONS, scheduler_cli
from cylc.flow.scripts.validate import (
VALIDATE_OPTIONS,
Expand All @@ -58,14 +66,6 @@
from cylc.flow.scripts.reload import (
reload_cli as cylc_reload
)
from cylc.flow.option_parsers import (
WORKFLOW_ID_ARG_DOC,
CylcOptionParser as COP,
combine_options,
log_subcommand,
cleanup_sysargv
)
from cylc.flow.id_cli import parse_id
from cylc.flow.terminal import cli_function
from cylc.flow.workflow_files import detect_old_contact_file

Expand Down Expand Up @@ -135,10 +135,17 @@ def vro_cli(parser: COP, options: 'Values', workflow_id: str):
try:
detect_old_contact_file(workflow_id)
except ServiceFileError:
# Workflow is definately running:
# Workflow is definitely running:
workflow_running = True
except Exception as exc:
LOG.error(exc)
LOG.critical(
'Cannot tell if the workflow is running'
'\nNote, Cylc 8 cannot restart Cylc 7 workflows.'
)
raise
else:
# Workflow is definately stopped:
# Workflow is definitely stopped:
workflow_running = False

# options.tvars and tvars_file are _only_ valid when playing a stopped
Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/templatevars.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from cylc.flow.exceptions import InputError
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager
from cylc.flow.workflow_files import WorkflowFiles

if TYPE_CHECKING:
Expand All @@ -36,6 +37,7 @@ def get_template_vars_from_db(run_dir: 'Path') -> dict:
template_vars: dict = {}
if not pub_db_file.exists():
return template_vars
WorkflowDatabaseManager.check_db_compatibility(pub_db_file)
with CylcWorkflowDAO(pub_db_file, is_public=True) as dao:
dao.select_workflow_template_vars(
lambda _, row: template_vars.__setitem__(row[0], eval_var(row[1]))
Expand Down
6 changes: 5 additions & 1 deletion tests/functional/database/07-incompatible.t
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
# and that suitable error message is given.

. "$(dirname "$0")/test_header"
set_test_number 5
set_test_number 6

install_workflow
# install the cylc7 restart database
SRV_DIR="${WORKFLOW_RUN_DIR}/.service"
mkdir "$SRV_DIR"
sqlite3 "${SRV_DIR}/db" < 'db.sqlite3'
sqlite3 "${SRV_DIR}/db" '.tables' > orig_tables.txt

run_ok "${TEST_NAME_BASE}-validate" cylc validate "$WORKFLOW_NAME"

Expand All @@ -35,6 +36,9 @@ grep_ok \
'Workflow database is incompatible with Cylc .*, or is corrupted' \
"${TEST_NAME}.stderr"

# Check no new tables have been created
cmp_ok orig_tables.txt <<< "$(sqlite3 "${SRV_DIR}/db" '.tables')"

TEST_NAME="${TEST_NAME_BASE}-clean-fail"
run_fail "$TEST_NAME" cylc clean "$WORKFLOW_NAME"
grep_ok \
Expand Down
37 changes: 36 additions & 1 deletion tests/integration/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from pathlib import Path
import sqlite3
import pytest

from cylc.flow.exceptions import WorkflowConfigError
from cylc.flow.exceptions import ServiceFileError, WorkflowConfigError
from cylc.flow.parsec.exceptions import ListValueError
from cylc.flow.pathutil import get_workflow_run_pub_db_path


@pytest.mark.parametrize(
Expand Down Expand Up @@ -322,3 +325,35 @@ def test_queue_treated_as_comma_separated(flow, validate):
)
with pytest.raises(ListValueError, match="cannot contain a space"):
validate(reg)


def test_validate_incompatible_db(one_conf, flow, validate):
"""Validation should fail for an incompatible DB due to not being able
to load template vars."""
wid = flow(one_conf)
# Create fake outdated DB
db_file = Path(get_workflow_run_pub_db_path(wid))
db_file.parent.mkdir(parents=True, exist_ok=True)
db_file.touch()
conn = sqlite3.connect(db_file)
try:
conn.execute(
'CREATE TABLE suite_params(key TEXT, value TEXT, PRIMARY KEY(key))'
)
conn.commit()
finally:
conn.close()

with pytest.raises(
ServiceFileError, match="Workflow database is incompatible"
):
validate(wid)

# No tables should have been created
stmt = "SELECT name FROM sqlite_master WHERE type='table'"
conn = sqlite3.connect(db_file)
try:
tables = [i[0] for i in conn.execute(stmt)]
finally:
conn.close()
assert tables == ['suite_params']
11 changes: 10 additions & 1 deletion tests/unit/parsec/test_fileparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import sqlite3
from types import SimpleNamespace

from cylc.flow import __version__ as cylc_version
from cylc.flow.parsec.exceptions import (
FileParseError,
IncludeFileNotFoundError,
Expand Down Expand Up @@ -627,6 +628,14 @@ def _inner(create_srclink=True):
"INSERT INTO workflow_template_vars VALUES"
" ('Marius', '\"Consul\"')"
)
conn.execute(
"CREATE TABLE workflow_params"
"(key TEXT, value TEXT, PRIMARY KEY(key)) ;"
)
conn.execute(
"INSERT INTO workflow_params VALUES"
f" ('cylc_version', '{cylc_version}')"
)
conn.commit()
conn.close()

Expand Down Expand Up @@ -658,7 +667,7 @@ def _inner(create_srclink=True):
),
]
)
def test__prepend_old_templatevars2(_mock_old_template_vars_db, expect, tvars):
def test__prepend_old_templatevars(_mock_old_template_vars_db, expect, tvars):
# Create a target for a source symlink
result = _prepend_old_templatevars(
_mock_old_template_vars_db(), tvars)
Expand Down
20 changes: 20 additions & 0 deletions tests/unit/test_templatevars.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import tempfile
import unittest

from cylc.flow import __version__ as cylc_version
from cylc.flow.exceptions import ServiceFileError
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.templatevars import (
get_template_vars_from_db,
Expand Down Expand Up @@ -114,6 +116,13 @@ def _setup_db(tmp_path_factory):
logfolder.mkdir()
db_path = logfolder / WorkflowFiles.LogDir.DB
with CylcWorkflowDAO(db_path, create_tables=True) as dao:
dao.connect().execute(
rf'''
INSERT INTO workflow_params
VALUES
("cylc_version", "{cylc_version}")
'''
)
dao.connect().execute(
r'''
INSERT INTO workflow_template_vars
Expand Down Expand Up @@ -141,3 +150,14 @@ def test_get_old_tvars(key, expect, _setup_db):
"""It can extract a variety of items from a workflow database.
"""
assert _setup_db[key] == expect


def test_get_old_tvars_fails_if_cylc_7_db(tmp_path):
"""get_template_vars_from_db fails with error if db file is not a valid
Cylc 8 DB.
"""
dbfile = tmp_path / WorkflowFiles.LogDir.DIRNAME / WorkflowFiles.LogDir.DB
dbfile.parent.mkdir()
dbfile.touch()
with pytest.raises(ServiceFileError, match='database is incompatible'):
get_template_vars_from_db(tmp_path)
39 changes: 31 additions & 8 deletions tests/unit/test_workflow_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import pytest
import sqlite3

from cylc.flow.exceptions import CylcError
from cylc.flow.exceptions import CylcError, ServiceFileError
from cylc.flow.workflow_db_mgr import (
CylcWorkflowDAO,
WorkflowDatabaseManager,
Expand All @@ -31,8 +31,9 @@
@pytest.fixture
def _setup_db(tmp_path):
"""Fixture to create old DB."""
def _inner(values):
db_file = tmp_path / 'sql.db'
def _inner(values, db_file_name='sql.db'):
db_file = tmp_path / db_file_name
db_file.parent.mkdir(parents=True, exist_ok=True)
# Note: cannot use CylcWorkflowDAO here as creating outdated DB
conn = sqlite3.connect(str(db_file))
conn.execute((
Expand All @@ -50,7 +51,8 @@ def _inner(values):
r' job_runner_name TEXT, job_id TEXT,'
r' PRIMARY KEY(cycle, name, submit_num));'
))
conn.execute(values)
for value in values:
conn.execute(value)
conn.execute((
r"INSERT INTO task_jobs VALUES"
r" ('10090101T0000Z', 'foo', 1, 0, 1, '2022-12-05T14:46:06Z',"
Expand All @@ -65,12 +67,12 @@ def _inner(values):


def test_upgrade_pre_810_fails_on_multiple_flows(_setup_db):
values = (
values = [(
r'INSERT INTO task_states VALUES'
r" ('foo', '10050101T0000Z', '[1, 3]',"
r" '2022-12-05T14:46:33Z',"
r" '2022-12-05T14:46:40Z', 1, 'succeeded', 0, 0)"
)
)]
db_file_name = _setup_db(values)
with CylcWorkflowDAO(db_file_name) as dao, pytest.raises(
CylcError,
Expand All @@ -80,16 +82,37 @@ def test_upgrade_pre_810_fails_on_multiple_flows(_setup_db):


def test_upgrade_pre_810_pass_on_single_flow(_setup_db):
values = (
values = [(
r'INSERT INTO task_states VALUES'
r" ('foo', '10050101T0000Z', '[1]',"
r" '2022-12-05T14:46:33Z',"
r" '2022-12-05T14:46:40Z', 1, 'succeeded', 0, 0)"
)
)]
db_file_name = _setup_db(values)
with CylcWorkflowDAO(db_file_name) as dao:
WorkflowDatabaseManager.upgrade_pre_810(dao)
result = dao.connect().execute(
'SELECT DISTINCT flow_nums FROM task_jobs;'
).fetchall()[0][0]
assert result == '[1]'


def test_check_workflow_db_compat(_setup_db, capsys):
"""method can pick private or public db to check.
"""
# Create public and private databases with different cylc versions:
create = r'CREATE TABLE workflow_params(key TEXT, value TEXT)'
insert = (
r'INSERT INTO workflow_params VALUES'
r'("cylc_version", "{}")'
)
pri_path = _setup_db(
[create, insert.format('7.99.99')], db_file_name='private/db')
pub_path = _setup_db(
[create, insert.format('7.99.98')], db_file_name='public/db')

with pytest.raises(ServiceFileError, match='99.98'):
WorkflowDatabaseManager.check_db_compatibility(pub_path)

with pytest.raises(ServiceFileError, match='99.99'):
WorkflowDatabaseManager.check_db_compatibility(pri_path)

0 comments on commit 6202e9e

Please sign in to comment.