Skip to content

Commit

Permalink
replaced "input_data" field with "data" in Container; updated the loc…
Browse files Browse the repository at this point in the history
…ation-aware scheduler to support multiple input data files(#27); minor changes on the Docker file
  • Loading branch information
dcvan24 committed May 17, 2018
1 parent 1b40aa8 commit 8170c9f
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 41 deletions.
62 changes: 45 additions & 17 deletions container/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class Volume:
"""

def __init__(self, container_path, host_path, mode):
def __init__(self, container_path, host_path, mode, *args, **kwargs):
self.__container_path = container_path
self.__host_path = host_path
self.__mode = mode
Expand Down Expand Up @@ -150,7 +150,7 @@ class Endpoint:
"""

def __init__(self, host, container_port, host_port, protocol='tcp'):
def __init__(self, host, container_port, host_port, protocol='tcp', *args, **kwargs):
self.__host = host
self.__host_port = host_port
self.__container_port = container_port
Expand Down Expand Up @@ -228,7 +228,7 @@ class Port:
"""

def __init__(self, container_port, host_port=0, protocol='tcp'):
def __init__(self, container_port, host_port=0, protocol='tcp', *args, **kwargs):
self.__container_port = container_port
self.__host_port = host_port
self.__protocol = protocol
Expand Down Expand Up @@ -303,7 +303,7 @@ class Resources:
"""

def __init__(self, cpus, mem, disk=0, gpu=0):
def __init__(self, cpus, mem, disk=0, gpu=0, *args, **kwargs):
self.__cpus = cpus
self.__mem = mem
self.__disk = disk
Expand Down Expand Up @@ -374,6 +374,38 @@ def to_request(self):
return self.to_render()


@swagger.model
class Data:
"""
Data specifications
"""
def __init__(self, input=[], *args, **kwargs):
self.__input=list(input)

@property
@swagger.property
def input(self):
"""
Paths of input data objects
---
type: list
items: str
default: []
example:
- /tempZone/rods/a.file
- /tempZone/rods/b.file
"""
return list(self.__input)

def to_render(self):
return dict(input=self.input)

def to_save(self):
return self.to_render()


@swagger.model
class Container:
"""
Expand Down Expand Up @@ -401,7 +433,7 @@ def parse(cls, data):
def __init__(self, id, appliance, type, image, resources, cmd=None, args=[], env={},
volumes=[], network_mode=NetworkMode.HOST, endpoints=[], ports=[],
state=ContainerState.SUBMITTED, is_privileged=False, force_pull_image=True,
dependencies=[], input_data=[], rack=None, host=None, last_update=None,
dependencies=[], data=None, rack=None, host=None, last_update=None,
constraints=[], **kwargs):
self.__id = id
self.__appliance = appliance
Expand All @@ -424,7 +456,7 @@ def __init__(self, id, appliance, type, image, resources, cmd=None, args=[], env
self.__is_privileged = is_privileged
self.__force_pull_image = force_pull_image
self.__dependencies = list(dependencies)
self.__input_data = list(input_data)
self.__data = data and Data(**data)
self.__last_update = parse_datetime(last_update)
self.__constraints = list(constraints)

Expand Down Expand Up @@ -676,18 +708,14 @@ def dependencies(self):

@property
@swagger.property
def input_data(self):
def data(self):
"""
Input data files consumed by the container
Data consumed by the container
---
type: list
items: str
default: []
example:
- /tempZone/rods/a.file
- /tempZone/rods/b.file
type: Data
"""
return list(self.__input_data)
return self.__data

@property
def last_update(self):
Expand Down Expand Up @@ -750,7 +778,7 @@ def to_render(self):
ports=[p.to_render() for p in self.ports],
state=self.state.value, is_privileged=self.is_privileged,
force_pull_image=self.force_pull_image, dependencies=self.dependencies,
input_data=self.input_data, rack=self.rack, host=self.host)
data=self.data and self.data.to_render(), rack=self.rack, host=self.host)

def to_save(self):
return dict(id=self.id, appliance=self.appliance, type=self.type.value,
Expand All @@ -762,7 +790,7 @@ def to_save(self):
ports=[p.to_save() for p in self.ports],
state=self.state.value, is_privileged=self.is_privileged,
force_pull_image=self.force_pull_image, dependencies=self.dependencies,
input_data=self.input_data, rack=self.rack, host=self.host,
data=self.data and self.data.to_save(), rack=self.rack, host=self.host,
last_update=self.last_update and self.last_update.isoformat(),
constraints=self.constraints)

Expand Down
2 changes: 1 addition & 1 deletion container/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def to_request(self):
for p in self.ports]
r['container']['parameters'] = parameters
if self.rack:
r.setdefault('constraints', []).append(['rack', 'EQUALS', self.rack])
r.setdefault('constraints', []).append(['cloud', 'EQUALS', self.rack])
if self.host:
r.setdefault('constraints', []).append(['hostname', 'EQUALS', self.host])
for k, v in self.constraints:
Expand Down
4 changes: 2 additions & 2 deletions container/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ async def _parse_service_state(self, body):
for t in tasks:
hosts = await self.__cluster_db.find_agents(hostname=t['host'])
if not hosts: continue
host, rack = hosts[0], hosts[0].attributes.get('rack', None)
public_ip = host.attributes.get('public_ip', None)
host, rack = hosts[0], hosts[0].attributes.get('rack')
public_ip = host.attributes.get('public_ip')
if not public_ip: continue
if 'portDefinitions' in body:
for i, p in enumerate(body['portDefinitions']):
Expand Down
2 changes: 1 addition & 1 deletion container/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def to_request(self):
for i, p in enumerate(self.ports)]
r['container']['docker']['portMappings'] = port_mappings
if self.rack:
r.setdefault('constraints', []).append(['rack', 'CLUSTER', self.rack])
r.setdefault('constraints', []).append(['cloud', 'CLUSTER', self.rack])
if self.host:
r.setdefault('constraints', []).append(['hostname', 'CLUSTER', self.host])
for k, v in self.constraints:
Expand Down
8 changes: 4 additions & 4 deletions docker/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ def parse_args():
parser.add_argument('--n_parallel', dest='n_parallel', type=int,
default=multiprocessing.cpu_count(),
help='PIVOT parallelism level')
parser.add_argument('--irods_host', dest='irods_host', type=str, help='iRODS API host')
parser.add_argument('--irods_port', dest='irods_port', type=int, help='iRODS API port')
parser.add_argument('--irods_api_host', dest='irods_api_host', type=str, help='iRODS API host')
parser.add_argument('--irods_api_port', dest='irods_api_port', type=int, help='iRODS API port')
return parser.parse_args()


def create_pivot_config(args):
pivot_cfg_f = '/opt/pivot/config.yml'
pivot_cfg = yaml.load(open(pivot_cfg_f))
pivot_cfg['pivot'].update(master=args.master, port=args.port, n_parallel=args.n_parallel)
if args.irods_host and args.irods_port:
pivot_cfg['irods'] = dict(host=args.irods_host, port=args.irods_port)
if args.irods_api_host and args.irods_api_port:
pivot_cfg['irods'] = dict(host=args.irods_api_host, port=args.irods_api_port)
yaml.dump(pivot_cfg, open(pivot_cfg_f, 'w'), default_flow_style=False)

def check_mongodb_port():
Expand Down
47 changes: 33 additions & 14 deletions scheduler/plugin/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,36 @@ async def schedule(self, plans):
for c in contrs:
if c.id in plans:
continue
if c.input_data:
status, locations, err = await self.__api.get_replica_locations(c.input_data[0])
if status != 200:
self.logger.error(err)
continue
if c.data:
regions = {}
for lfn in c.data.input:
status, data_obj, err = await self.__api.get_data_object(lfn)
if status != 200:
self.logger.error(err)
continue
for r in await self.__api.get_replica_regions(data_obj.get('replicas', [])):
regions[r] = regions.setdefault(r, 0) + data_obj.get('size', 0)
agents = [agent for agent in
await self.__cluster_mgr.find_agents(region=locations[0],ttl=0)
await self.__cluster_mgr.find_agents(
region={'$in': list(regions.keys())}, ttl=0)
if agent.resources.cpus >= c.resources.cpus
and agent.resources.mem >= c.resources.mem
and agent.resources.disk >= c.resources.disk]
if agents:
self.logger.info("Container '%s' will land on %s"%(c.id, agents[0].hostname))
c.host = agents[0].hostname
self.logger.info('Candidate regions:')
for a in agents:
region = a.attributes.get('region')
cloud = a.attributes.get('cloud')
data_size = regions.get(region, 0)
self.logger.info('\t%s, %s, data size: %d'%(region, cloud, data_size))
agent = max(agents, key=lambda a: regions.get(a.attributes.get('region'), 0))
cloud, region = agent.attributes.get('cloud'), agent.attributes.get('region')
self.logger.info("Container '%s' will land on %s (%s, %s)"%(c.id, agent.hostname,
region, cloud))
c.host = agent.hostname
else:
self.logger.info("No matched agents have sufficient resources for '%s'"%c)
self.__contr_db.save_container(c, False)
await self.__contr_db.save_container(c, False)
new_plans += SchedulePlan(c.id, [c]),
if new_plans:
self.logger.info('New plans: %s'%[p.id for p in new_plans])
Expand All @@ -69,20 +83,25 @@ class iRODSAPIManager(APIManager):
def __init__(self):
super(iRODSAPIManager, self).__init__()

async def get_replica_locations(self, lfn):
async def get_data_object(self, lfn):
api = config.irods
endpoint = '%s/getReplicas?filename=%s'%(api.endpoint, url_escape(lfn))
status, replicas, err = await self.http_cli.get(api.host, api.port, endpoint)
endpoint = '%s/getDataObject?filename=%s'%(api.endpoint, url_escape(lfn))
status, data_obj, err = await self.http_cli.get(api.host, api.port, endpoint)
if status != 200:
return status, None, err
return status, data_obj, None

async def get_replica_regions(self, replicas):
api = config.irods
locations = []
for r in replicas['replicas']:
for r in replicas:
endpoint = '%s/getResourceMetadata?resource_name=%s'%(api.endpoint,
url_escape(r['resource_name']))
status, resc, err = await self.http_cli.get(api.host, api.port, endpoint)
if status != 200:
self.logger.error(err)
continue
locations.append(resc['region'])
return 200, locations, None
return locations


4 changes: 2 additions & 2 deletions swagger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def _parse_paths(self):
if m.__doc__:
summary, method_specs = m.__doc__.split('---')
op.summary, method_specs = _format_docstring(summary), yaml.load(method_specs)
request_body = method_specs.get('request_body', None)
request_body = method_specs.get('request_body')
if request_body:
op.request_body = RequestBody(Content(request_body['content']))
for p in in_path_params:
Expand All @@ -174,7 +174,7 @@ def _parse_paths(self):
op.add_parameter(Parameter(**p, show_in=p.pop('in', None)))
for code, r in method_specs.get('responses', {}).items():
resp = Response(code=code, description=r.get('description', ''))
content = r.get('content', None)
content = r.get('content')
if content:
resp.content = Content(content)
op.add_response(resp)
Expand Down

0 comments on commit 8170c9f

Please sign in to comment.