Skip to content
This repository has been archived by the owner on Aug 15, 2018. It is now read-only.

Commit

Permalink
Encode hierarchy of task collections in db table
Browse files Browse the repository at this point in the history
  • Loading branch information
HACKERMD committed Apr 6, 2017
1 parent f713804 commit 4fd7bf2
Show file tree
Hide file tree
Showing 14 changed files with 655 additions and 330 deletions.
1 change: 1 addition & 0 deletions tmlib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ def storage_home(self, value):
def cpu_memory(self):
'''int: amount of memory in Megabyte per CPU core that should be
allocated for a single job (default: ``2000``)
'''
return self._config.getint(self._section, 'cpu_memory')

Expand Down
6 changes: 5 additions & 1 deletion tmlib/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ class Job(gc3libs.Application):

__meta__ = ABCMeta

def __init__(self, arguments, output_dir, submission_id, user_name):
def __init__(self, arguments, output_dir, submission_id, user_name,
parent_id=None):
'''
Parameters
----------
Expand All @@ -29,10 +30,13 @@ def __init__(self, arguments, output_dir, submission_id, user_name):
ID of the corresponding submission
user_name: str
name of the submitting user
parent_id: int, optional
ID of the parent job collection
'''
t = create_datetimestamp()
self.user_name = user_name
self.submission_id = submission_id
self.parent_id = parent_id
super(Job, self).__init__(
jobname=self.name,
arguments=arguments,
Expand Down
43 changes: 19 additions & 24 deletions tmlib/models/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,14 @@ def __repr__(self):

class Task(MainModel, DateMixIn):

'''A *task* represents a computational job that can be submitted to a
cluster for processing. Its state will be monitored while being processed.
'''A *task* represents a computational job that can be submitted to
computational resources for processing.
Its state will be monitored while being processed and statistics will be
collected.
Warning
-------
This table is managed by GC3Pie. Don't modify it manually!
'''

__tablename__ = 'tasks'
Expand All @@ -172,39 +177,29 @@ class Task(MainModel, DateMixIn):
#: int: exitcode
exitcode = Column(Integer, index=True)

#: datetime.timedelta: total time of task (sum of all subtasks in case
#: of a task collection)
time = Column(Interval, index=True)
#: datetime.timedelta: total time of task
time = Column(Interval)

#: int: total memory in MG of task (sum of all subtasks in case
#: of a task collection)
memory = Column(Integer, index=True)
#: int: total memory in MG of task
memory = Column(Integer)

#: datetime.timedelta: total CPU time of task (sum of all subtasks in case
#: of a task collection)
cpu_time = Column(Interval, index=True)
#: datetime.timedelta: total CPU time of task
cpu_time = Column(Interval)

#: str: name of the corresponding Python object
type = Column(String, index=True)

#: bool: whether the task is a collection of tasks
is_collection = Column(Boolean, index=True)

#: Pickeled Python `gc3libs.Task` or `gc3libs.workflow.TaskCollection` object
#: int: ID of the parent task
parent_id = Column(Integer, index=True)

#: Pickeled Python `gc3libs.Task` object
data = Column(LargeBinary)

#: int: ID of parent submission
submission_id = Column(
Integer,
ForeignKey('submissions.id', onupdate='CASCADE', ondelete='CASCADE'),
index=True
)

#: tmlib.models.submission.Submission: parent submission
submission = relationship(
'Submission',
backref=backref('tasks', cascade='all, delete-orphan')
)
submission_id = Column(Integer, index=True)

def __repr__(self):
return (
Expand Down
4 changes: 2 additions & 2 deletions tmlib/workflow/align/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ def create_run_batches(self, args):
def delete_previous_job_output(self):
logger.info('delete existing site shifts and intersections')
with tm.utils.ExperimentSession(self.experiment_id) as session:
session.drop_and_recreate(tm.SiteShift)
session.drop_and_recreate(tm.SiteIntersection)
session.query(tm.SiteShift).delete()
session.query(tm.SiteIntersection).delete()

def run_job(self, batch):
'''Calculates shift and overhang values for the given sites.
Expand Down
120 changes: 89 additions & 31 deletions tmlib/workflow/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
JobDescriptionError, CliArgError
)
from tmlib.workflow.jobs import (
InitJob, RunJob, CollectJob, SingleRunJobCollection
InitJob, RunJob, CollectJob,
SingleRunPhase, InitPhase, RunPhase, CollectPhase
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -491,35 +492,81 @@ def create_step(self, submission_id, user_name, description):
description=description
)

def create_run_job_collection(self, submission_id):
def create_init_phase(self, submission_id, parent_id):
'''Creates a job collection for the "init" phase of the step.
Parameters
----------
submission_id: int
ID of the corresponding
:class:`Submission <tmlib.models.submission.Submission>`
parent_id: int
ID of the parent
:class:`WorkflowStep <tmlib.workflow.workflow.WorkflowStep>`
Returns
-------
tmlib.workflow.job.InitPhase
collection of "init" jobs
'''
return InitPhase(
step_name=self.step_name, submission_id=submission_id,
parent_id=parent_id
)

def create_run_phase(self, submission_id, parent_id):
'''Creates a job collection for the "run" phase of the step.
Parameters
----------
submission_id: int
ID of the corresponding
:class:`Submission <tmlib.models.submission.Submission>`
parent_id: int
ID of the parent
:class:`WorkflowStep <tmlib.workflow.workflow.WorkflowStep>`
Returns
-------
tmlib.workflow.job.SingleRunJobCollection
tmlib.workflow.job.SingleRunPhase
collection of "run" jobs
'''
return SingleRunJobCollection(
step_name=self.step_name, submission_id=submission_id
return SingleRunPhase(
step_name=self.step_name, submission_id=submission_id,
parent_id=parent_id
)

def create_run_jobs(self, submission_id, user_name, job_collection,
def create_collect_phase(self, submission_id, parent_id):
'''Creates a job collection for the "collect" phase of the step.
Parameters
----------
submission_id: int
ID of the corresponding
:class:`Submission <tmlib.models.submission.Submission>`
parent_id: int
ID of the parent
:class:`WorkflowStep <tmlib.workflow.workflow.WorkflowStep>`
Returns
-------
tmlib.workflow.job.CollectPhase
collection of "collect" jobs
'''
return CollectPhase(
step_name=self.step_name, submission_id=submission_id,
parent_id=parent_id
)

def create_run_jobs(self, user_name, job_collection,
verbosity, duration, memory, cores):
'''Creates jobs for the parallel "run" phase of the step.
Parameters
----------
submission_id: int
ID of the corresponding submission
user_name: str
name of the submitting user
job_collection: tmlib.workflow.job.SingleRunJobCollection
job_collection: tmlib.workflow.job.RunPhase
empty collection of *run* jobs that should be populated
verbosity: int
logging verbosity for jobs
Expand All @@ -534,10 +581,12 @@ def create_run_jobs(self, submission_id, user_name, job_collection,
Returns
-------
tmlib.workflow.jobs.SingleRunJobCollection
run jobs
tmlib.workflow.jobs.RunPhase
collection of jobs
'''
logger.info('create "run" jobs for submission %d', submission_id)
logger.info(
'create "run" jobs for submission %d', job_collection.submission_id
)
logger.debug('allocated time for run jobs: %s', duration)
logger.debug('allocated memory for run jobs: %d MB', memory)
logger.debug('allocated cores for run jobs: %d', cores)
Expand All @@ -549,25 +598,26 @@ def create_run_jobs(self, submission_id, user_name, job_collection,
arguments=self._build_run_command(j, verbosity),
output_dir=self.log_location,
job_id=j,
submission_id=submission_id,
user_name=user_name
submission_id=job_collection.submission_id,
user_name=user_name,
parent_id=job_collection.persistent_id
)
job.requested_walltime = Duration(duration)
job.requested_memory = Memory(memory, Memory.MB)
job.requested_cores = cores
job_collection.add(job)
return job_collection

def create_init_job(self, submission_id, user_name, batch_args, verbosity,
duration='12:00:00'):
def create_init_job(self, user_name, job_collection,
batch_args, verbosity, duration='12:00:00'):
'''Creates job for the "init" phase of the step.
Parameters
----------
submission_id: int
ID of the corresponding submission
user_name: str
name of the submitting user
job_collection: tmlib.workflow.job.InitPhase
empty collection of *init* jobs that should be populated
batch_args: tmlib.workflow.args.BatchArguments
step-specific implementation of
:class:`BatchArguments <tmlib.workflow.args.BatchArguments>`
Expand All @@ -579,36 +629,40 @@ def create_init_job(self, submission_id, user_name, batch_args, verbosity,
Returns
-------
tmlib.workflow.jobs.InitJob
tmlib.workflow.jobs.InitPhase
init job
'''
logger.info('create "init" job for submission %d', submission_id)
logger.info(
'create "init" job for submission %d', job_collection.submission_id
)
logger.debug('allocated time for "init" job: %s', duration)
logger.debug('allocated memory for "init" job: %d MB', cfg.cpu_memory)
logger.debug('allocated cores for "init" job: %d', cfg.cpu_cores)
job = InitJob(
step_name=self.step_name,
arguments=self._build_init_command(batch_args, verbosity),
output_dir=self.log_location,
submission_id=submission_id,
user_name=user_name
submission_id=job_collection.submission_id,
user_name=user_name,
parent_id=job_collection.persistent_id
)
job.requested_walltime = Duration(duration)
job.requested_memory = Memory(cfg.cpu_memory, Memory.MB)
job.requested_cores = cfg.cpu_cores
return job
job_collection.add(job)
return job_collection

def create_collect_job(self, submission_id, user_name, verbosity,
duration='06:00:00'):
def create_collect_job(self, user_name, job_collection,
verbosity, duration='06:00:00'):
'''Creates job for the "collect" phase of the step.
Parameters
----------
submission_id: int
ID of the corresponding submission
user_name: str
name of the submitting user
job_collection: tmlib.workflow.job.CollectPhase
empty collection of *collect* jobs that should be populated
verbosity: int
logging verbosity for jobs
duration: str, optional
Expand All @@ -621,19 +675,23 @@ def create_collect_job(self, submission_id, user_name, verbosity,
collect job
'''
logger.info('create "collect" job for submission %d', submission_id)
logger.info(
'create "collect" job for submission %d', job_collection.submission_id
)
logger.debug('allocated time for "collect" job: %s', duration)
logger.debug('allocated memory for "collect" job: %d MB', cfg.cpu_memory)
logger.debug('allocated cores for "collect" job: %d', cfg.cpu_cores)
job = CollectJob(
step_name=self.step_name,
arguments=self._build_collect_command(verbosity),
output_dir=self.log_location,
submission_id=submission_id,
user_name=user_name
submission_id=job_collection.submission_id,
user_name=user_name,
parent_id=job_collection.persistent_id
)
job.requested_walltime = Duration(duration)
job.requested_memory = Memory(cfg.cpu_memory, Memory.MB)
job.requested_cores = cfg.cpu_cores
return job
job_collection.add(job)
return job_collection

12 changes: 6 additions & 6 deletions tmlib/workflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from tmlib.workflow.submission import WorkflowSubmissionManager
from tmlib.workflow.description import WorkflowStepDescription
from tmlib.workflow.workflow import WorkflowStep
from tmlib.workflow.jobs import CliJobCollection
from tmlib.workflow.jobs import IndependentJobCollection
from tmlib.log import configure_logging
from tmlib.log import map_logging_verbosity
from tmlib.errors import WorkflowError
Expand Down Expand Up @@ -473,19 +473,19 @@ def submit(self, monitoring_depth, monitoring_interval):
submission_id, user_name = self.register_submission()
api = self.api_instance

jobs = CliJobCollection(api.step_name, submission_id)
run_job_collection = api.create_run_job_collection(submission_id)
jobs = IndependentJobCollection(api.step_name, submission_id)
run_job_collection = api.create_run_phase(submission_id)
run_jobs = api.create_run_jobs(
submission_id, user_name, run_job_collection,
self.verbosity,
user_name, run_job_collection, self.verbosity,
duration=self._submission_args.duration,
memory=self._submission_args.memory,
cores=self._submission_args.cores
)
jobs.add(run_jobs)
if api.has_collect_phase:
collect_job_collection = api.create_collect_phase(submission_id)
collect_job = api.create_collect_job(
submission_id, user_name, self.verbosity
user_name, collect_job_collection, self.verbosity
)
jobs.add(collect_job)

Expand Down
Loading

0 comments on commit 4fd7bf2

Please sign in to comment.