From 7d664ffde1d97b4a93f769e73ffde53fe9aba462 Mon Sep 17 00:00:00 2001 From: dcvan Date: Wed, 13 Jun 2018 12:30:15 -0400 Subject: [PATCH] WIP on #30 and #39; Changes related to #19 --- appliance/base.py | 16 ++- appliance/manager.py | 20 ++- commons.py | 4 +- config.py | 15 +- config.yml | 1 - container/handler.py | 6 +- container/manager.py | 24 ++-- schedule/__init__.py | 130 ++++++++---------- schedule/default.py | 27 ---- schedule/local/__init__.py | 77 +++++++++++ schedule/plugin/{ => local}/location_aware.py | 4 +- server.py | 15 +- 12 files changed, 216 insertions(+), 123 deletions(-) delete mode 100644 schedule/default.py create mode 100644 schedule/local/__init__.py rename schedule/plugin/{ => local}/location_aware.py (97%) diff --git a/appliance/base.py b/appliance/base.py index a8a59cf..d5414ae 100644 --- a/appliance/base.py +++ b/appliance/base.py @@ -43,9 +43,11 @@ def parse(cls, data): app = Appliance(data['id'], containers) return 200, app, None - def __init__(self, id, containers=[], **kwargs): + def __init__(self, id, containers=[], + scheduler='schedule.local.DefaultApplianceScheduler', **kwargs): self.__id = id self.__containers = list(containers) + self.__scheduler = scheduler @property @swagger.property @@ -75,6 +77,18 @@ def containers(self): """ return self.__containers + @property + @swagger.property + def scheduler(self): + """ + Appliance-level scheduler for the appliance + + --- + type: str + + """ + return self.__scheduler + @containers.setter def containers(self, contrs): self.__containers = list(contrs) diff --git a/appliance/manager.py b/appliance/manager.py index adb6c76..f3a7311 100644 --- a/appliance/manager.py +++ b/appliance/manager.py @@ -1,9 +1,12 @@ +import importlib +import schedule + from config import config from commons import MongoClient, AutonomousMonitor from commons import Manager, APIManager from appliance.base import Appliance from container.manager import ContainerManager -from schedule import ApplianceScheduleNegotiator +from schedule.local import ApplianceScheduleExecutor class ApplianceManager(Manager): @@ -18,7 +21,7 @@ async def get_appliance(self, app_id): if status != 200: return status, app, err app = Appliance(**app) - status, app.containers, err = await self.__contr_mgr.get_containers(app_id) + status, app.containers, err = await self.__contr_mgr.get_containers(appliance=app_id) if not app.containers: self.logger.info("Empty appliance '%s', deleting"%app_id) status, msg, err = await self.delete_appliance(app_id) @@ -51,7 +54,7 @@ async def create_appliance(self, data): self.logger.error(err) return status, None, err self.logger.info(msg) - ApplianceScheduleNegotiator(app.id, 3000).start() + ApplianceScheduleExecutor(app.id, 3000).start() return 201, app, None async def delete_appliance(self, app_id): @@ -69,7 +72,8 @@ async def delete_appliance(self, app_id): if err and status != 404: self.logger.error(err) return 400, None, "Failed to deprovision appliance '%s'"%app_id - ApplianceDeletionChecker(app_id).start() + scheduler = self._get_scheduler(app.scheduler) + ApplianceDeletionChecker(app_id, scheduler).start() return status, msg, None async def save_appliance(self, app, upsert=True): @@ -93,6 +97,14 @@ def validate_dependencies(contrs): return validate_dependencies(app.containers) + def _get_scheduler(self, scheduler_name): + try: + sched_mod = '.'.join(scheduler_name.split('.')[:-1]) + sched_class = scheduler_name.split('.')[-1] + return getattr(importlib.import_module(sched_mod), sched_class) + except Exception as e: + self.logger.error(str(e)) + return schedule.DefaultApplianceScheduler class ApplianceAPIManager(APIManager): diff --git a/commons.py b/commons.py index da0c10b..4d65700 100644 --- a/commons.py +++ b/commons.py @@ -3,7 +3,7 @@ import logging import tornado -from abc import ABCMeta, abstractmethod +from abc import abstractmethod from tornado.httpclient import AsyncHTTPClient, HTTPError from tornado.ioloop import PeriodicCallback from motor.motor_tornado import MotorClient @@ -95,7 +95,7 @@ def __init__(self): self.http_cli = AsyncHttpClientWrapper() -class AutonomousMonitor(Loggable, metaclass=ABCMeta): +class AutonomousMonitor(Loggable): def __init__(self, interval): self.__interval = interval diff --git a/config.py b/config.py index 1d678de..4a657da 100644 --- a/config.py +++ b/config.py @@ -1,5 +1,6 @@ import sys import yaml +import importlib from util import dirname @@ -70,8 +71,7 @@ def __init__(self, port=0, *args, **kwargs): class GeneralConfig: def __init__(self, master, port=9090, n_parallel=1, - scheduler='scheduler.appliance.DefaultApplianceScheduler', - ha=False, *args, **kwargs): + scheduler='schedule.DefaultGlobalScheduler', ha=False, *args, **kwargs): self.__master = master self.__port = port self.__n_parallel = n_parallel @@ -188,3 +188,14 @@ def irods(self): config = Configuration.read_config('%s/config.yml'%dirname(__file__)) + + +def get_global_scheduler(): + try: + sched_mod = '.'.join(config.pivot.scheduler.split('.')[:-1]) + sched_class = config.pivot.scheduler.split('.')[-1] + return getattr(importlib.import_module(sched_mod), sched_class)() + except Exception as e: + sys.stderr.write(str(e) + '\n') + from schedule import DefaultGlobalScheduler + return DefaultGlobalScheduler() diff --git a/config.yml b/config.yml index c39c88d..539d61b 100644 --- a/config.yml +++ b/config.yml @@ -2,7 +2,6 @@ pivot: master: m1.dcos port: 9090 n_parallel: 8 - scheduler: schedule.plugin.location_aware.LocationAwareApplianceScheduler ha: false db: host: localhost diff --git a/container/handler.py b/container/handler.py index f3c45c3..2807d12 100644 --- a/container/handler.py +++ b/container/handler.py @@ -39,7 +39,8 @@ async def get(self, app_id): application/json: schema: Error """ - status, services, err = await self.__contr_mgr.get_containers(app_id, type='service') + status, services, err = await self.__contr_mgr.get_containers(appliance=app_id, + type='service') self.set_status(status) self.write(json.dumps([s.to_render() for s in services] if status == 200 else error(err))) @@ -76,7 +77,8 @@ async def get(self, app_id): application/json: schema: Error """ - status, services, err = await self.__contr_mgr.get_containers(app_id, type='job') + status, services, err = await self.__contr_mgr.get_containers(appliance=app_id, + type='job') self.set_status(status) self.write(json.dumps([s.to_render() for s in services] if status == 200 else error(err))) diff --git a/container/manager.py b/container/manager.py index 5ad8046..d08878b 100644 --- a/container/manager.py +++ b/container/manager.py @@ -12,19 +12,18 @@ class ContainerManager(Manager): - def __init__(self, contr_info_ttl=timedelta(seconds=3)): + def __init__(self): self.__service_api = ServiceAPIManager() self.__job_api = JobAPIManager() self.__contr_db = ContainerDBManager() self.__cluster_db = AgentDBManager() - self.__contr_info_ttl = contr_info_ttl - async def get_container(self, app_id, contr_id): + async def get_container(self, app_id, contr_id, ttl=0): status, contr, err = await self.__contr_db.get_container(app_id, contr_id) if status == 404: return status, contr, err if not contr.last_update or \ - datetime.datetime.now(tz=None) - contr.last_update > self.__contr_info_ttl: + datetime.datetime.now(tz=None) - contr.last_update > timedelta(seconds=ttl): status, contr, err = await self._get_updated_container(contr) if status == 404 and contr.state != ContainerState.SUBMITTED: self.logger.info("Deleted ghost container: %s"%contr) @@ -32,18 +31,18 @@ async def get_container(self, app_id, contr_id): return 404, None, err if status == 200: contr.last_update = datetime.datetime.now(tz=None) - await self.__contr_db.save_container(contr, False) + await self.save_container(contr) elif status != 404: self.logger.error("Failed to update container '%s'"%contr) self.logger.error(err) return 200, contr, None - async def get_containers(self, app_id, **kwargs): - contrs = await self.__contr_db.get_containers(appliance=app_id, **kwargs) + async def get_containers(self, ttl=0, **kwargs): + contrs = await self.__contr_db.get_containers(**kwargs) contrs_to_del, contrs_to_update = [], [], cur_time = datetime.datetime.now(tz=None) for c in contrs: - if c.last_update and cur_time - c.last_update <= self.__contr_info_ttl: + if c.last_update and cur_time - c.last_update <= timedelta(seconds=ttl): continue status, c, err = await self._get_updated_container(c) if status == 404 and c.state != ContainerState.SUBMITTED: @@ -51,7 +50,7 @@ async def get_containers(self, app_id, **kwargs): if status == 200: contrs_to_update.append(c) if contrs_to_del: - filters = dict(id={'$in': [c.id for c in contrs_to_del]}, appliance=app_id) + filters = dict(id={'$in': [c.id for c in contrs_to_del]}) status, msg, err = await self.__contr_db.delete_containers(**filters) if err: self.logger.error(err) @@ -59,7 +58,7 @@ async def get_containers(self, app_id, **kwargs): self.logger.info(msg) for c in contrs_to_update: c.last_update = datetime.datetime.now(tz=None) - await self.__contr_db.save_container(c, upsert=False) + await self.save_container(c) return 200, contrs, None async def create_container(self, data): @@ -69,7 +68,7 @@ async def create_container(self, data): status, contr, err = Container.parse(data) if err: return status, None, err - await self.__contr_db.save_container(contr, True) + await self.save_container(contr, True) return 201, contr, None async def delete_container(self, app_id, contr_id): @@ -121,6 +120,9 @@ async def provision_container(self, contr): return status, None, err return status, contr, None + async def save_container(self, contr, upsert=False): + await self.__contr_db.save_container(contr, upsert=upsert) + async def _get_updated_container(self, contr): assert isinstance(contr, Container) self.logger.debug('Update container info: %s'%contr) diff --git a/schedule/__init__.py b/schedule/__init__.py index 6501e92..9563c0f 100644 --- a/schedule/__init__.py +++ b/schedule/__init__.py @@ -1,75 +1,10 @@ -import sys -import importlib - +import container import cluster -import appliance - -from abc import ABCMeta from commons import AutonomousMonitor, Singleton, Loggable -from container.manager import ContainerManager -from config import config - - -def get_scheduler(): - try: - sched_mod = '.'.join(config.pivot.scheduler.split('.')[:-1]) - sched_class = config.pivot.scheduler.split('.')[-1] - return getattr(importlib.import_module(sched_mod), sched_class) - except Exception as e: - sys.stderr.write(str(e) + '\n') - from schedule.default import DefaultApplianceScheduler - return DefaultApplianceScheduler - - -class ApplianceScheduleNegotiator(AutonomousMonitor): - def __init__(self, app_id, interval=3000): - super(ApplianceScheduleNegotiator, self).__init__(interval) - self.__app_id = app_id - self.__executor = ApplianceScheduleExecutor() - self.__scheduler = get_scheduler()() - self.__cluster_mgr = cluster.manager.ClusterManager() - self.__app_mgr = appliance.manager.ApplianceManager() - async def callback(self): - # get appliance - status, app, err = await self.__app_mgr.get_appliance(self.__app_id) - if not app: - if status == 404: - self.logger.info('Appliance %s no longer exists'%self.__app_id) - else: - self.logger.error(err) - self.stop() - return - # get cluster info - agents = await self.__cluster_mgr.get_cluster(ttl=0) - # contact the scheduler for new schedule - sched = await self.__scheduler.schedule(app, agents) - self.logger.debug('Containers to be scheduled: %s'%[c.id for c in sched.containers]) - # if the scheduling is done - if sched.done: - self.logger.info('Scheduling is done for appliance %s'%self.__app_id) - self.stop() - return - # execute the new schedule - await self.__executor.execute(sched) - - -class ApplianceScheduleExecutor(Loggable, metaclass=Singleton): - - def __init__(self): - self.__contr_mgr = ContainerManager() - - async def execute(self, sched): - for c in sched.containers: - _, msg, err = await self.__contr_mgr.provision_container(c) - if err: - self.logger.error(err) - self.logger.info('Container %s is being provisioned'%c.id) - - -class Schedule: +class SchedulePlan: def __init__(self, done=False, containers=[]): self.__done = done @@ -91,8 +26,63 @@ def add_containers(self, contrs): self.__containers += list(contrs) -class ApplianceScheduler(Loggable, metaclass=ABCMeta): +class GlobalScheduler(Loggable, metaclass=Singleton): + + async def schedule(self, contr, agents): + # generate a schedule plan + # return the schedule plan + pass + + + async def reschedule(self, contrs, agents): + # collect evidence for scheduling + # generate a reschedule plan + # return the reschedule plan + pass + + +class GlobalScheduleExecutor(AutonomousMonitor, metaclass=Singleton): + + def __init__(self, scheduler, interval=5000): + super(GlobalScheduleExecutor, self).__init__(interval) + self.__contr_mgr = container.manager.ContainerManager() + self.__cluster_mgr = cluster.manager.ClusterManager() + self.__scheduler = scheduler + + async def schedule(self, contr): + agents = await self.get_agents() + plan = await self.__scheduler.schedule(contr, agents) + for c in plan.containers: + await self._provision_container(c) + + async def callback(self): + agents = await self.get_agents() + contrs = await self._get_containers() + if not contrs: return + plan = await self.__scheduler.reschedule(contrs, agents) + for c in plan.containers: + await self._provision_container(c) + + async def get_agents(self): + return await self.__cluster_mgr.get_cluster(0) + + async def _get_containers(self, **kwargs): + status, contrs, err = await self.__contr_mgr.get_containers(**kwargs) + if err: + self.logger.error(err) + return contrs + + async def _provision_container(self, contr): + await self.__contr_mgr.save_container(contr) + status, contr, err = await self.__contr_mgr.provision_container(contr) + if err: + self.logger.error(err) + + +class DefaultGlobalScheduler(GlobalScheduler): - async def schedule(self, app, agents): - raise NotImplemented + async def schedule(self, contr, agents): + pass + async def reschedule(self, contrs, agents): + pass diff --git a/schedule/default.py b/schedule/default.py deleted file mode 100644 index 3e18256..0000000 --- a/schedule/default.py +++ /dev/null @@ -1,27 +0,0 @@ -from container.base import ContainerType, ContainerState -from schedule import ApplianceScheduler, Schedule - - -class DefaultApplianceScheduler(ApplianceScheduler): - - async def schedule(self, app, agents): - sched = Schedule() - free_contrs = self.resolve_dependencies(app) - self.logger.debug('Free containers: %s'%[c.id for c in free_contrs]) - if not free_contrs: - sched.done = True - return sched - sched.add_containers([c for c in free_contrs if c.state in - (ContainerState.SUBMITTED, ContainerState.FAILED)]) - return sched - - def resolve_dependencies(self, app): - contrs = {c.id: c for c in app.containers - if (c.type == ContainerType.JOB and c.state != ContainerState.SUCCESS) - or (c.type == ContainerType.SERVICE and c.state != ContainerState.RUNNING)} - parents = {} - for c in contrs.values(): - parents.setdefault(c.id, set()).update([d for d in c.dependencies if d in contrs]) - return [contrs[k] for k, v in parents.items() if not v] - - diff --git a/schedule/local/__init__.py b/schedule/local/__init__.py new file mode 100644 index 0000000..cc26e09 --- /dev/null +++ b/schedule/local/__init__.py @@ -0,0 +1,77 @@ +import appliance + +from abc import ABCMeta + +from schedule import GlobalScheduleExecutor, GlobalScheduler, SchedulePlan +from commons import AutonomousMonitor, Loggable +from config import get_global_scheduler + + +class ApplianceScheduleExecutor(AutonomousMonitor): + + def __init__(self, app_id, scheduler, interval=3000): + super(ApplianceScheduleExecutor, self).__init__(interval) + self.__app_id = app_id + self.__app_mgr = appliance.manager.ApplianceManager() + self.__local_sched = scheduler + self.__global_sched = GlobalScheduleExecutor(get_global_scheduler()) + + async def callback(self): + # get appliance + status, app, err = await self.__app_mgr.get_appliance(self.__app_id) + if not app: + if status == 404: + self.logger.info('Appliance %s no longer exists'%self.__app_id) + else: + self.logger.error(err) + self.stop() + return + # get cluster info + agents = await self.__global_sched.get_agents() + # contact the scheduler for new schedule + sched = await self.__local_sched.schedule(app, agents) + self.logger.debug('Containers to be scheduled: %s'%[c.id for c in sched.containers]) + # if the scheduling is done + if sched.done: + self.logger.info('Scheduling is done for appliance %s'%self.__app_id) + self.stop() + return + # execute the new schedule + await self._execute(sched) + + async def _execute(self, sched): + for c in sched.containers: + await self.__global_sched.schedule(c) + self.logger.info('Container %s is being scheduled'%c.id) + + +class ApplianceScheduler(Loggable, metaclass=ABCMeta): + + async def schedule(self, app, agents): + raise NotImplemented + + +from container.base import ContainerState, ContainerType + + +class DefaultApplianceScheduler(ApplianceScheduler): + + async def schedule(self, app, agents): + sched = SchedulePlan() + free_contrs = self.resolve_dependencies(app) + self.logger.debug('Free containers: %s'%[c.id for c in free_contrs]) + if not free_contrs: + sched.done = True + return sched + sched.add_containers([c for c in free_contrs if c.state in + (ContainerState.SUBMITTED, ContainerState.FAILED)]) + return sched + + def resolve_dependencies(self, app): + contrs = {c.id: c for c in app.containers + if (c.type == ContainerType.JOB and c.state != ContainerState.SUCCESS) + or (c.type == ContainerType.SERVICE and c.state != ContainerState.RUNNING)} + parents = {} + for c in contrs.values(): + parents.setdefault(c.id, set()).update([d for d in c.dependencies if d in contrs]) + return [contrs[k] for k, v in parents.items() if not v] diff --git a/schedule/plugin/location_aware.py b/schedule/plugin/local/location_aware.py similarity index 97% rename from schedule/plugin/location_aware.py rename to schedule/plugin/local/location_aware.py index 6291956..76e419b 100644 --- a/schedule/plugin/location_aware.py +++ b/schedule/plugin/local/location_aware.py @@ -2,7 +2,7 @@ from config import config from commons import APIManager -from schedule.default import DefaultApplianceScheduler +from schedule.local import DefaultApplianceScheduler class LocationAwareApplianceScheduler(DefaultApplianceScheduler): @@ -13,6 +13,8 @@ def __init__(self): async def schedule(self, app, agents): sched = await super(LocationAwareApplianceScheduler, self).schedule(app, agents) + if sched.done: + return sched if not config.irods.host or not config.irods.port: self.logger.info('iRODS API is not properly set. Fallback to default scheduler') return sched diff --git a/server.py b/server.py index f17e422..d7f6715 100644 --- a/server.py +++ b/server.py @@ -12,10 +12,20 @@ from index.handler import IndexHandler from ping.handler import PingHandler from swagger.handler import SwaggerAPIHandler, SwaggerUIHandler -from config import config +from config import config, get_global_scheduler +from schedule import GlobalScheduleExecutor from util import dirname +def start_cluster_monitor(): + tornado.ioloop.IOLoop.instance().add_callback(ClusterManager().start_monitor) + + +def start_global_scheduler(): + scheduler = GlobalScheduleExecutor(get_global_scheduler()) + tornado.ioloop.IOLoop.instance().add_callback(scheduler.start) + + def start_server(): app = Application([ (r'\/*', IndexHandler), @@ -36,7 +46,8 @@ def start_server(): server = tornado.httpserver.HTTPServer(app) server.bind(config.pivot.port) server.start(config.pivot.n_parallel) - tornado.ioloop.IOLoop.instance().add_callback(ClusterManager().start_monitor) + start_cluster_monitor() + start_global_scheduler() tornado.ioloop.IOLoop.instance().start()