-
Notifications
You must be signed in to change notification settings - Fork 27
/
pbs.py
282 lines (213 loc) · 8.74 KB
/
pbs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
""" Functions to support PBS based schedulers
:copyright: Copyright 2011 Marshall Ward, see AUTHORS for details.
:license: Apache License, Version 2.0, see LICENSE for details.
"""
# Standard library
import os
import re
import sys
import shlex
import subprocess
import payu.envmod as envmod
from payu.fsops import check_exe_path
from payu.manifest import Manifest
from payu.schedulers.scheduler import Scheduler
from tenacity import retry, stop_after_delay
# TODO: This is a stub acting as a minimal port to a Scheduler class.
class PBS(Scheduler):
# TODO: __init__
def submit(self, pbs_script, pbs_config, pbs_vars=None, python_exe=None):
"""Prepare a correct PBS command string"""
pbs_env_init()
# Initialisation
if pbs_vars is None:
pbs_vars = {}
# Necessary for testing
if python_exe is None:
python_exe = sys.executable
pbs_flags = []
pbs_queue = pbs_config.get('queue', 'normal')
pbs_flags.append('-q {queue}'.format(queue=pbs_queue))
pbs_project = pbs_config.get('project', os.environ['PROJECT'])
pbs_flags.append('-P {project}'.format(project=pbs_project))
pbs_resources = ['walltime', 'ncpus', 'mem', 'jobfs']
for res_key in pbs_resources:
res_flags = []
res_val = pbs_config.get(res_key)
if res_val:
res_flags.append(
'{key}={val}'.format(key=res_key, val=res_val)
)
if res_flags:
pbs_flags.append('-l {res}'.format(res=','.join(res_flags)))
# TODO: Need to pass lab.config_path somehow...
pbs_jobname = pbs_config.get('jobname', os.path.basename(os.getcwd()))
if pbs_jobname:
# PBSPro has a 15-character jobname limit
pbs_flags.append('-N {name}'.format(name=pbs_jobname[:15]))
pbs_priority = pbs_config.get('priority')
if pbs_priority:
pbs_flags.append('-p {priority}'.format(priority=pbs_priority))
pbs_flags.append('-l wd')
pbs_join = pbs_config.get('join', 'n')
if pbs_join not in ('oe', 'eo', 'n'):
print('payu: error: unknown qsub IO stream join setting.')
sys.exit(-1)
else:
pbs_flags.append('-j {join}'.format(join=pbs_join))
# Append environment variables to qsub command
# TODO: Support full export of environment variables: `qsub -V`
pbs_vstring = ','.join('{0}={1}'.format(k, v)
for k, v in pbs_vars.items())
pbs_flags.append('-v ' + pbs_vstring)
storages = set()
storage_config = pbs_config.get('storage', {})
mounts = set(['/scratch', '/g/data'])
for mount in storage_config:
mounts.add(mount)
for project in storage_config[mount]:
storages.add(make_mount_string(encode_mount(mount), project))
# Append any additional qsub flags here
pbs_flags_extend = pbs_config.get('qsub_flags')
if pbs_flags_extend:
pbs_flags.append(pbs_flags_extend)
payu_path = pbs_vars.get('PAYU_PATH', os.path.dirname(sys.argv[0]))
pbs_script = check_exe_path(payu_path, pbs_script)
ctrl_path = pbs_config.get('control_path')
# Check for storage paths that might need to be mounted in the
# python and script paths
extra_search_paths = [python_exe, payu_path, pbs_script, ctrl_path]
laboratory_path = pbs_config.get('laboratory', None)
if laboratory_path is not None:
extra_search_paths.append(laboratory_path)
short_path = pbs_config.get('shortpath', None)
if short_path is not None:
extra_search_paths.append(short_path)
module_use_paths = pbs_config.get('modules', {}).get('use', [])
extra_search_paths.extend(module_use_paths)
remote_sync_directory = pbs_config.get('sync', {}).get('path', None)
if remote_sync_directory is not None:
extra_search_paths.append(remote_sync_directory)
storages.update(find_mounts(extra_search_paths, mounts))
storages.update(find_mounts(get_manifest_paths(), mounts))
# Add storage flags. Note that these are sorted to get predictable
# behaviour for testing
pbs_flags_extend = '+'.join(sorted(storages))
if pbs_flags_extend:
pbs_flags.append("-l storage={}".format(pbs_flags_extend))
# Set up environment modules here for PBS.
envmod.setup()
envmod.module('load', 'pbs')
# Construct job submission command
cmd = 'qsub {flags} -- {python} {script}'.format(
flags=' '.join(pbs_flags),
python=python_exe,
script=pbs_script
)
return cmd
# TODO: These support functions can probably be integrated into the class
def get_job_id(short=True):
"""
Return PBS job id
"""
jobid = os.environ.get('PBS_JOBID', '')
if short:
# Strip off '.rman2'
jobid = jobid.split('.')[0]
return(jobid)
def get_job_info():
"""
Get information about the job from the PBS server
"""
jobid = get_job_id()
info = None
if not jobid == '':
info = get_qstat_info('-ft {0}'.format(jobid), 'Job Id:')
if info is not None:
# Select the dict for this job (there should only be one
# entry in any case)
info = info['Job Id: {}'.format(jobid)]
# Add the jobid to the dict and then return
info['Job_ID'] = jobid
return info
def pbs_env_init():
# Initialise against PBS_CONF_FILE
if sys.platform == 'win32':
pbs_conf_fpath = r'C:\Program Files\PBS Pro\pbs.conf'
else:
pbs_conf_fpath = '/etc/pbs.conf'
os.environ['PBS_CONF_FILE'] = pbs_conf_fpath
try:
with open(pbs_conf_fpath) as pbs_conf:
for line in pbs_conf:
try:
key, value = line.split('=')
os.environ[key] = value.rstrip()
except ValueError:
pass
except IOError as ec:
print('Unable to find PBS_CONF_FILE ... ' + pbs_conf_fpath)
sys.exit(1)
# Wrap this in retry from tenancity. Keep trying for 10 seconds and
# even if still fails return None
@retry(stop=stop_after_delay(10), retry_error_callback=lambda a: None)
def get_qstat_info(qflag, header, projects=None, users=None):
qstat = os.path.join(os.environ['PBS_EXEC'], 'bin', 'qstat')
cmd = '{} {}'.format(qstat, qflag)
cmd = shlex.split(cmd)
output = subprocess.check_output(cmd)
if sys.version_info.major >= 3:
output = output.decode()
entries = (e for e in output.split('{}: '.format(header)) if e)
# Immediately remove any non-project entries
if projects or users:
entries = (e for e in entries
if any('project = {}'.format(p) in e for p in projects)
or any('Job_Owner = {}'.format(u) in e for u in users))
attribs = ((k.split('.')[0], v.replace('\n\t', '').split('\n'))
for k, v in (e.split('\n', 1) for e in entries))
status = {k: dict((kk.strip(), vv.strip())
for kk, vv in (att.split('=', 1) for att in v if att))
for k, v in attribs}
return status
def encode_mount(mount):
"""
Turn a mount point point into the keyword used to specify storages,
i.e. remove path separators
"""
return re.sub(os.path.sep, '', mount)
def make_mount_string(mount, project):
"""
Return mount and project string used to specify storages
"""
return "{mount}/{project}".format(mount=mount, project=project)
def find_mounts(paths, mounts):
"""
Search a path for a matching mount point and return a set of unique
NCI compatible strings to add to the qsub command
"""
if not isinstance(paths, list):
paths = [paths, ]
if not isinstance(mounts, set):
mounts = set(mounts)
storages = set()
for p in paths:
for m in mounts:
if p.startswith(m):
# Find the number of path elements in the mount string
offset = len(m.split(os.path.sep))
# Relevant project code is the next element of the path
# after the mount point. DO NOT USE os.path.split as it
# is not consistent with trailing slash
proj = p.split(os.path.sep)[offset]
storages.add(make_mount_string(encode_mount(m), proj))
break
return storages
def get_manifest_paths():
"""
Return a list of paths from manifest files to use to check for
storage paths
"""
tmpmanifest = Manifest(config={}, reproduce=False)
tmpmanifest.load_manifests()
return tmpmanifest.get_all_previous_fullpaths()