Skip to content

Commit

Permalink
Merge pull request #55 from zenotech/develop
Browse files Browse the repository at this point in the history
update to intel mpi launch options on LSF clusters
  • Loading branch information
mike-jt79 authored Nov 4, 2019
2 parents 217c75e + bcc4ea4 commit 1aefe12
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 35 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
language: python
dist: "xenial"
python:
- "2.7"
- "3.7"

notifications:
webhooks:
Expand Down
13 changes: 8 additions & 5 deletions mycluster/lsf.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from __future__ import print_function

from builtins import str
import os
import re
import math
Expand Down Expand Up @@ -117,8 +119,9 @@ def create_submit(queue_id, **kwargs):

tpn = tasks_per_node(queue_id)
queue_tpn = tpn

if 'tasks_per_node' in kwargs:
tpn = min(tpn, kwargs['tasks_per_node'])
tpn = kwargs['tasks_per_node']

nc = node_config(queue_id)
qc = available_tasks(queue_id)
Expand Down Expand Up @@ -190,7 +193,7 @@ def submit(script_name, immediate, depends_on=None,
job_id = int(output.split(' ')[1].replace(
'<', '').replace('>', ''))
except:
print 'Job submission failed: ' + output
print('Job submission failed: ' + output)
elif depends_on is not None:
cmd = 'bsub -w "done(%s)" < %s ' % (depends_on, script_name)
with os.popen(cmd) as f:
Expand All @@ -199,15 +202,15 @@ def submit(script_name, immediate, depends_on=None,
job_id = int(output.split(' ')[1].replace(
'<', '').replace('>', ''))
except:
print 'Job submission failed: ' + output
print('Job submission failed: ' + output)
else:
with os.popen('bsub <' + script_name) as f:
output = f.readline()
try:
job_id = int(output.split(' ')[1].replace(
'<', '').replace('>', ''))
except:
print 'Job submission failed: ' + output
print('Job submission failed: ' + output)
return job_id


Expand All @@ -230,7 +233,7 @@ def status():
else:
status_dict[job_id] = state
except e:
print e
print(e)

return status_dict

Expand Down
16 changes: 10 additions & 6 deletions mycluster/mycluster.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from __future__ import print_function
from __future__ import division
from builtins import str
from past.utils import old_div
import sys
import os
from subprocess import Popen, PIPE, check_output
Expand Down Expand Up @@ -208,9 +212,9 @@ def printjobs(num_lines):
if time_ratio:
try:
efficiency = (
time_ratio /
(int(jobs[j].num_tasks) * int(jobs[j].threads_per_task)) *
100.0)
old_div(time_ratio,
(int(jobs[j].num_tasks) * int(jobs[j].threads_per_task)) *
100.0))
efficiency = '{:.1f}'.format(efficiency)
except:
pass
Expand Down Expand Up @@ -240,9 +244,9 @@ def printjobs(num_lines):
if time_ratio:
try:
efficiency = (
time_ratio /
(int(jobs[j].num_tasks) *
int(jobs[j].threads_per_task)) * 100.0)
old_div(time_ratio,
(int(jobs[j].num_tasks) *
int(jobs[j].threads_per_task)) * 100.0))
efficiency = '{:.1f}'.format(efficiency)
except:
pass
Expand Down
93 changes: 83 additions & 10 deletions mycluster/pbs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import print_function
from builtins import str
import math
import os
from string import Template
Expand All @@ -24,8 +26,8 @@ def queues():
for queue in lines:
queue_list.append(queue.split(' ')[0])
except Exception as e:
print "ERROR"
print e
print("ERROR")
print(e)
pass
return queue_list

Expand All @@ -34,22 +36,91 @@ def accounts():
return []


def _get_vnode_name(queue_id):
try:
output = subprocess.check_output(['qstat', '-Qf', queue_id])
for line in output.splitlines():
if line.strip().startswith("default_chunk.vntype"):
return line.split("=")[-1].strip()
except Exception as e:
print("ERROR")
print(e)
return None


def available_tasks(queue_id):
free_tasks = 0
max_tasks = 0
assigned_tasks = 0
try:
vnode_type = _get_vnode_name(queue_id)
if vnode_type is not None:
output = subprocess.check_output(
'pbsnodes -a -F dsv | grep {}'.format(vnode_type), shell=True)
for line in output.splitlines():
for item in line.split("|"):
[key, value] = item.strip().split('=')
if key.strip() == 'resources_available.ncpus':
max_tasks += int(value)
elif key.strip() == 'resources_assigned.ncpus':
assigned_tasks += int(value)
free_tasks = max_tasks - assigned_tasks
except Exception as e:
print("ERROR")
print(e)
pass
return {'available': free_tasks, 'max tasks': max_tasks}


def tasks_per_node(queue_id):
return 2
tpn = 1
try:
vnode_type = vnode_type = _get_vnode_name(queue_id)
if vnode_type is not None:
output = subprocess.check_output(
'pbsnodes -a -F dsv | grep {}'.format(vnode_type), shell=True)
for line in output.splitlines():
for item in line.split("|"):
[key, value] = item.strip().split('=')
if key.strip() == 'resources_available.ncpus':
if int(value) > tpn:
tpn = int(value)
except Exception as e:
print("ERROR")
print(e)
pass
return tpn


def min_tasks_per_node(queue_id):
return 1


def node_config(queue_id):
return {'max thread': 1, 'max memory': "Unknown"}
max_threads = 1
max_memory = 1
try:
tpn = tasks_per_node(queue_id)
vnode_type = vnode_type = vnode_type = _get_vnode_name(queue_id)
if vnode_type is not None:
output = subprocess.check_output(
'pbsnodes -a -F dsv | grep {}'.format(vnode_type), shell=True)
for line in output.splitlines():
for item in line.split("|"):
[key, value] = item.strip().split('=')
if key.strip() == 'resources_available.vps_per_ppu':
if int(value) > max_threads:
max_threads = int(value) * tpn
if key.strip() == 'resources_available.mem':
# strip kb and convert to mb
mem = float(value[:-2]) / 1024
if mem > max_memory:
max_memory = mem
except Exception as e:
print("ERROR")
print(e)
pass
return {'max thread': max_threads, 'max memory': max_memory}


def create_submit(queue_id, **kwargs):
Expand All @@ -61,8 +132,9 @@ def create_submit(queue_id, **kwargs):

tpn = tasks_per_node(queue_id)
queue_tpn = tpn

if 'tasks_per_node' in kwargs:
tpn = min(tpn, kwargs['tasks_per_node'])
tpn = kwargs['tasks_per_node']

nc = node_config(queue_id)
qc = available_tasks(queue_id)
Expand Down Expand Up @@ -140,23 +212,23 @@ def submit(script_name, immediate, depends_on=None,
try:
job_id = output.strip().split('.')[0]
except:
print 'Job submission failed: ' + output
print('Job submission failed: ' + output)
elif depends_on is not None:
with os.popen('qsub -W depend=afterok:%s %s' % (depends_on, script_name)) as f:
output = f.readline()
try:
job_id = output.strip().split('.')[0]
except:
print 'Job submission failed: ' + output
print('Job submission failed: ' + output)
else:
with os.popen('qsub ' + script_name) as f:
output = f.readline()
try:
job_id = output.strip().split('.')[0]
except:
print 'Job submission failed: ' + output
print('Job submission failed: ' + output)
else:
print "immediate not yet implemented for PBS"
print("immediate not yet implemented for PBS")
return job_id


Expand Down Expand Up @@ -188,7 +260,8 @@ def job_stats_enhanced(job_id):
line = f.readline().strip()
while line:
if line.startswith('Job Id:'):
stats_dict['job_id'] = line.split(':')[1].split('.')[0].strip()
stats_dict['job_id'] = line.split(
':')[1].split('.')[0].strip()
elif line.startswith('resources_used.walltime'):
stats_dict['wallclock'] = get_timedelta(line.split('=')[1])
elif line.startswith('resources_used.cput'):
Expand Down
2 changes: 2 additions & 0 deletions mycluster/persist.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@

from builtins import str
from builtins import object
from persistent import Persistent
from ZODB import FileStorage, DB
from .mycluster import get_directory, scheduler
Expand Down
11 changes: 7 additions & 4 deletions mycluster/sge.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import print_function
from builtins import str
import os
import re
import math
Expand Down Expand Up @@ -255,8 +257,9 @@ def create_submit(queue_id, **kwargs):

tpn = tasks_per_node(queue_id)
queue_tpn = tpn

if 'tasks_per_node' in kwargs:
tpn = min(tpn, kwargs['tasks_per_node'])
tpn = kwargs['tasks_per_node']

nc = node_config(queue_id)
qc = available_tasks(queue_id)
Expand Down Expand Up @@ -330,8 +333,8 @@ def submit(script_name, immediate, depends=None):
try:
job_id = int(f.readline().strip())
except:
print 'job id not returned'
print f.readline()
print('job id not returned')
print(f.readline())
pass
# Get job id and record in database
return job_id
Expand All @@ -355,7 +358,7 @@ def status():

status_dict[job_id] = state
except e:
print e
print(e)

return status_dict

Expand Down
17 changes: 10 additions & 7 deletions mycluster/slurm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from __future__ import print_function

from builtins import str
import os
import re
import math
Expand Down Expand Up @@ -111,7 +113,7 @@ def create_submit(queue_id, **kwargs):
tpn = tasks_per_node(queue_id)
queue_tpn = tpn
if 'tasks_per_node' in kwargs:
tpn = min(tpn, kwargs['tasks_per_node'])
tpn = kwargs['tasks_per_node']

if 'num_threads_per_task' in kwargs:
num_threads_per_task = kwargs['num_threads_per_task']
Expand Down Expand Up @@ -180,21 +182,21 @@ def submit(script_name, immediate, depends_on=None,
try:
job_id = int(output.split(' ')[-1].strip())
except:
print 'Job submission failed: ' + output
print('Job submission failed: ' + output)
elif depends_on is not None:
with os.popen('sbatch %s --kill-on-invalid-dep=yes --dependency=afterok:%s %s' % (additional_cmd, depends_on, script_name)) as f:
output = f.readline()
try:
job_id = int(output.split(' ')[-1].strip())
except:
print 'Job submission failed: ' + output
print('Job submission failed: ' + output)
else:
with os.popen('sbatch %s %s' % (additional_cmd, script_name)) as f:
output = f.readline()
try:
job_id = int(output.split(' ')[-1].strip())
except:
print 'Job submission failed: ' + output
print('Job submission failed: ' + output)
# Get job id and record in database
else:
with os.popen('grep -- "SBATCH -p" ' + script_name + ' | sed \'s/#SBATCH//\'') as f:
Expand All @@ -210,7 +212,7 @@ def submit(script_name, immediate, depends_on=None,

cmd_line = 'salloc --exclusive ' + nnodes + ' ' + partition + ' ' + \
ntasks + ' ' + project + ' ' + job + ' bash ./' + script_name
print cmd_line
print(cmd_line)

import subprocess
try:
Expand Down Expand Up @@ -244,7 +246,7 @@ def status():
else:
status_dict[job_id] = state
except Exception as e:
print e
print(e)

return status_dict

Expand Down Expand Up @@ -392,7 +394,8 @@ def running_stats(job_id):
line = f.readline()
new_line = re.sub(' +', ' ', line.strip())
ntasks = int(new_line.split(' ')[2])
stats_dict['mem'] = (float(new_line.split(' ')[1].replace('K', '')) * ntasks)
stats_dict['mem'] = (
float(new_line.split(' ')[1].replace('K', '')) * ntasks)
stats_dict['cpu'] = '-' # float(new_line.split(' ')[0])*ntasks
except:
pass
Expand Down
2 changes: 2 additions & 0 deletions mycluster/templates/lsf.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ export I_MPI_PIN_DOMAIN=omp:compact # Domains are $$OMP_NUM_THREADS cores in siz
export I_MPI_PIN_ORDER=scatter # Adjacent domains have minimal sharing of caches/sockets
#export I_MPI_FABRICS=shm:ofa
export IMPI_CMD="mpiexec -n $NUM_TASKS -ppn $TASKS_PER_NODE"
export I_MPI_HYDRA_BOOTSTRAP=lsf
export I_MPI_LSF_USE_COLLECTIVE_LAUNCH=1

# Summarise environment
echo -e "JobID: $LSB_JOBID\n======"
Expand Down
1 change: 1 addition & 0 deletions mycluster/test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import print_function


from . import mycluster
Expand Down
3 changes: 2 additions & 1 deletion mycluster/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
# include RELEASE-VERSION


from __future__ import print_function
from subprocess import Popen, PIPE
import os
import sys
Expand Down Expand Up @@ -132,4 +133,4 @@ def get_git_version(abbrev=4):


if __name__ == "__main__":
print get_git_version()
print(get_git_version())
Loading

0 comments on commit 1aefe12

Please sign in to comment.