diff --git a/container/base.py b/container/base.py index b76a2e5..ad510f2 100644 --- a/container/base.py +++ b/container/base.py @@ -85,7 +85,7 @@ class Volume: """ - def __init__(self, container_path, host_path, mode): + def __init__(self, container_path, host_path, mode, *args, **kwargs): self.__container_path = container_path self.__host_path = host_path self.__mode = mode @@ -150,7 +150,7 @@ class Endpoint: """ - def __init__(self, host, container_port, host_port, protocol='tcp'): + def __init__(self, host, container_port, host_port, protocol='tcp', *args, **kwargs): self.__host = host self.__host_port = host_port self.__container_port = container_port @@ -228,7 +228,7 @@ class Port: """ - def __init__(self, container_port, host_port=0, protocol='tcp'): + def __init__(self, container_port, host_port=0, protocol='tcp', *args, **kwargs): self.__container_port = container_port self.__host_port = host_port self.__protocol = protocol @@ -303,7 +303,7 @@ class Resources: """ - def __init__(self, cpus, mem, disk=0, gpu=0): + def __init__(self, cpus, mem, disk=0, gpu=0, *args, **kwargs): self.__cpus = cpus self.__mem = mem self.__disk = disk @@ -374,6 +374,38 @@ def to_request(self): return self.to_render() +@swagger.model +class Data: + """ + Data specifications + + """ + def __init__(self, input=[], *args, **kwargs): + self.__input=list(input) + + @property + @swagger.property + def input(self): + """ + Paths of input data objects + --- + type: list + items: str + default: [] + example: + - /tempZone/rods/a.file + - /tempZone/rods/b.file + + """ + return list(self.__input) + + def to_render(self): + return dict(input=self.input) + + def to_save(self): + return self.to_render() + + @swagger.model class Container: """ @@ -401,7 +433,7 @@ def parse(cls, data): def __init__(self, id, appliance, type, image, resources, cmd=None, args=[], env={}, volumes=[], network_mode=NetworkMode.HOST, endpoints=[], ports=[], state=ContainerState.SUBMITTED, is_privileged=False, force_pull_image=True, - dependencies=[], input_data=[], rack=None, host=None, last_update=None, + dependencies=[], data=None, rack=None, host=None, last_update=None, constraints=[], **kwargs): self.__id = id self.__appliance = appliance @@ -424,7 +456,7 @@ def __init__(self, id, appliance, type, image, resources, cmd=None, args=[], env self.__is_privileged = is_privileged self.__force_pull_image = force_pull_image self.__dependencies = list(dependencies) - self.__input_data = list(input_data) + self.__data = data and Data(**data) self.__last_update = parse_datetime(last_update) self.__constraints = list(constraints) @@ -676,18 +708,14 @@ def dependencies(self): @property @swagger.property - def input_data(self): + def data(self): """ - Input data files consumed by the container + Data consumed by the container --- - type: list - items: str - default: [] - example: - - /tempZone/rods/a.file - - /tempZone/rods/b.file + type: Data + """ - return list(self.__input_data) + return self.__data @property def last_update(self): @@ -750,7 +778,7 @@ def to_render(self): ports=[p.to_render() for p in self.ports], state=self.state.value, is_privileged=self.is_privileged, force_pull_image=self.force_pull_image, dependencies=self.dependencies, - input_data=self.input_data, rack=self.rack, host=self.host) + data=self.data and self.data.to_render(), rack=self.rack, host=self.host) def to_save(self): return dict(id=self.id, appliance=self.appliance, type=self.type.value, @@ -762,7 +790,7 @@ def to_save(self): ports=[p.to_save() for p in self.ports], state=self.state.value, is_privileged=self.is_privileged, force_pull_image=self.force_pull_image, dependencies=self.dependencies, - input_data=self.input_data, rack=self.rack, host=self.host, + data=self.data and self.data.to_save(), rack=self.rack, host=self.host, last_update=self.last_update and self.last_update.isoformat(), constraints=self.constraints) diff --git a/container/job.py b/container/job.py index 2705d29..26b99f7 100644 --- a/container/job.py +++ b/container/job.py @@ -118,7 +118,7 @@ def to_request(self): for p in self.ports] r['container']['parameters'] = parameters if self.rack: - r.setdefault('constraints', []).append(['rack', 'EQUALS', self.rack]) + r.setdefault('constraints', []).append(['cloud', 'EQUALS', self.rack]) if self.host: r.setdefault('constraints', []).append(['hostname', 'EQUALS', self.host]) for k, v in self.constraints: diff --git a/container/manager.py b/container/manager.py index 400b89f..89e0a8c 100644 --- a/container/manager.py +++ b/container/manager.py @@ -162,8 +162,8 @@ async def _parse_service_state(self, body): for t in tasks: hosts = await self.__cluster_db.find_agents(hostname=t['host']) if not hosts: continue - host, rack = hosts[0], hosts[0].attributes.get('rack', None) - public_ip = host.attributes.get('public_ip', None) + host, rack = hosts[0], hosts[0].attributes.get('rack') + public_ip = host.attributes.get('public_ip') if not public_ip: continue if 'portDefinitions' in body: for i, p in enumerate(body['portDefinitions']): diff --git a/container/service.py b/container/service.py index 2ee93ac..48c2df5 100644 --- a/container/service.py +++ b/container/service.py @@ -279,7 +279,7 @@ def to_request(self): for i, p in enumerate(self.ports)] r['container']['docker']['portMappings'] = port_mappings if self.rack: - r.setdefault('constraints', []).append(['rack', 'CLUSTER', self.rack]) + r.setdefault('constraints', []).append(['cloud', 'CLUSTER', self.rack]) if self.host: r.setdefault('constraints', []).append(['hostname', 'CLUSTER', self.host]) for k, v in self.constraints: diff --git a/docker/run.py b/docker/run.py index e130314..971c404 100644 --- a/docker/run.py +++ b/docker/run.py @@ -19,8 +19,8 @@ def parse_args(): parser.add_argument('--n_parallel', dest='n_parallel', type=int, default=multiprocessing.cpu_count(), help='PIVOT parallelism level') - parser.add_argument('--irods_host', dest='irods_host', type=str, help='iRODS API host') - parser.add_argument('--irods_port', dest='irods_port', type=int, help='iRODS API port') + parser.add_argument('--irods_api_host', dest='irods_api_host', type=str, help='iRODS API host') + parser.add_argument('--irods_api_port', dest='irods_api_port', type=int, help='iRODS API port') return parser.parse_args() @@ -28,8 +28,8 @@ def create_pivot_config(args): pivot_cfg_f = '/opt/pivot/config.yml' pivot_cfg = yaml.load(open(pivot_cfg_f)) pivot_cfg['pivot'].update(master=args.master, port=args.port, n_parallel=args.n_parallel) - if args.irods_host and args.irods_port: - pivot_cfg['irods'] = dict(host=args.irods_host, port=args.irods_port) + if args.irods_api_host and args.irods_api_port: + pivot_cfg['irods'] = dict(host=args.irods_api_host, port=args.irods_api_port) yaml.dump(pivot_cfg, open(pivot_cfg_f, 'w'), default_flow_style=False) def check_mongodb_port(): diff --git a/scheduler/plugin/location.py b/scheduler/plugin/location.py index 298fb23..7a0ebce 100644 --- a/scheduler/plugin/location.py +++ b/scheduler/plugin/location.py @@ -42,22 +42,36 @@ async def schedule(self, plans): for c in contrs: if c.id in plans: continue - if c.input_data: - status, locations, err = await self.__api.get_replica_locations(c.input_data[0]) - if status != 200: - self.logger.error(err) - continue + if c.data: + regions = {} + for lfn in c.data.input: + status, data_obj, err = await self.__api.get_data_object(lfn) + if status != 200: + self.logger.error(err) + continue + for r in await self.__api.get_replica_regions(data_obj.get('replicas', [])): + regions[r] = regions.setdefault(r, 0) + data_obj.get('size', 0) agents = [agent for agent in - await self.__cluster_mgr.find_agents(region=locations[0],ttl=0) + await self.__cluster_mgr.find_agents( + region={'$in': list(regions.keys())}, ttl=0) if agent.resources.cpus >= c.resources.cpus and agent.resources.mem >= c.resources.mem and agent.resources.disk >= c.resources.disk] if agents: - self.logger.info("Container '%s' will land on %s"%(c.id, agents[0].hostname)) - c.host = agents[0].hostname + self.logger.info('Candidate regions:') + for a in agents: + region = a.attributes.get('region') + cloud = a.attributes.get('cloud') + data_size = regions.get(region, 0) + self.logger.info('\t%s, %s, data size: %d'%(region, cloud, data_size)) + agent = max(agents, key=lambda a: regions.get(a.attributes.get('region'), 0)) + cloud, region = agent.attributes.get('cloud'), agent.attributes.get('region') + self.logger.info("Container '%s' will land on %s (%s, %s)"%(c.id, agent.hostname, + region, cloud)) + c.host = agent.hostname else: self.logger.info("No matched agents have sufficient resources for '%s'"%c) - self.__contr_db.save_container(c, False) + await self.__contr_db.save_container(c, False) new_plans += SchedulePlan(c.id, [c]), if new_plans: self.logger.info('New plans: %s'%[p.id for p in new_plans]) @@ -69,14 +83,18 @@ class iRODSAPIManager(APIManager): def __init__(self): super(iRODSAPIManager, self).__init__() - async def get_replica_locations(self, lfn): + async def get_data_object(self, lfn): api = config.irods - endpoint = '%s/getReplicas?filename=%s'%(api.endpoint, url_escape(lfn)) - status, replicas, err = await self.http_cli.get(api.host, api.port, endpoint) + endpoint = '%s/getDataObject?filename=%s'%(api.endpoint, url_escape(lfn)) + status, data_obj, err = await self.http_cli.get(api.host, api.port, endpoint) if status != 200: return status, None, err + return status, data_obj, None + + async def get_replica_regions(self, replicas): + api = config.irods locations = [] - for r in replicas['replicas']: + for r in replicas: endpoint = '%s/getResourceMetadata?resource_name=%s'%(api.endpoint, url_escape(r['resource_name'])) status, resc, err = await self.http_cli.get(api.host, api.port, endpoint) @@ -84,5 +102,6 @@ async def get_replica_locations(self, lfn): self.logger.error(err) continue locations.append(resc['region']) - return 200, locations, None + return locations + diff --git a/swagger/__init__.py b/swagger/__init__.py index b883f2e..b87676b 100644 --- a/swagger/__init__.py +++ b/swagger/__init__.py @@ -165,7 +165,7 @@ def _parse_paths(self): if m.__doc__: summary, method_specs = m.__doc__.split('---') op.summary, method_specs = _format_docstring(summary), yaml.load(method_specs) - request_body = method_specs.get('request_body', None) + request_body = method_specs.get('request_body') if request_body: op.request_body = RequestBody(Content(request_body['content'])) for p in in_path_params: @@ -174,7 +174,7 @@ def _parse_paths(self): op.add_parameter(Parameter(**p, show_in=p.pop('in', None))) for code, r in method_specs.get('responses', {}).items(): resp = Response(code=code, description=r.get('description', '')) - content = r.get('content', None) + content = r.get('content') if content: resp.content = Content(content) op.add_response(resp)