diff --git a/backend/conf/config.sample.properties b/backend/conf/config.sample.properties index 5a8cbc30a6..2159d5a9e7 100644 --- a/backend/conf/config.sample.properties +++ b/backend/conf/config.sample.properties @@ -24,6 +24,17 @@ general.workingdir = {full path to "odcs" (home) dir of the project}/backend/wor backend.host = 127.0.0.1 backend.port = 5010 +# Backend(s) run in cluster mode - developed and tested only for PostgreSQL +# With this property enabled, 2 or even N backends can work with one database +# All backends work simultaneously, they can process pipeline schedules and executions +# If backend cluster enabled, backend ID must be set - mandatory parameter +# backend.cluster.mode = false +# backend.id = BackendServer1 + +# If backend should restart running executions on startup +# By default running executions are restarted, if false, executions are failed by backend at startup +# backend.startup.restart.running = true + # Connection configuration setting for relational database # for mysql { database.sql.driver = com.mysql.jdbc.Driver diff --git a/backend/src/main/java/cz/cuni/mff/xrg/odcs/backend/execution/Engine.java b/backend/src/main/java/cz/cuni/mff/xrg/odcs/backend/execution/Engine.java index 0eb848e2fe..86387d601b 100644 --- a/backend/src/main/java/cz/cuni/mff/xrg/odcs/backend/execution/Engine.java +++ b/backend/src/main/java/cz/cuni/mff/xrg/odcs/backend/execution/Engine.java @@ -41,7 +41,9 @@ import cz.cuni.mff.xrg.odcs.commons.app.ScheduledJobsPriority; import cz.cuni.mff.xrg.odcs.commons.app.conf.AppConfig; import cz.cuni.mff.xrg.odcs.commons.app.conf.ConfigProperty; +import cz.cuni.mff.xrg.odcs.commons.app.conf.MissingConfigPropertyException; import cz.cuni.mff.xrg.odcs.commons.app.execution.log.Log; +import cz.cuni.mff.xrg.odcs.commons.app.facade.ExecutionFacade; import cz.cuni.mff.xrg.odcs.commons.app.facade.PipelineFacade; import cz.cuni.mff.xrg.odcs.commons.app.facade.RuntimePropertiesFacade; import cz.cuni.mff.xrg.odcs.commons.app.pipeline.PipelineExecution; @@ -56,10 +58,15 @@ public class Engine implements ApplicationListener { private static final Logger LOG = LoggerFactory.getLogger(Engine.class); + private static final Integer DEFAULT_LIMIT_SHEDULED_PPL = 2; + public Integer numberOfRunningJobs = 0; + private final Object LockRunningJobs = new Object(); + private boolean clusterMode = false; + /** * Publisher instance. */ @@ -83,13 +90,16 @@ public class Engine implements ApplicationListener { */ @Autowired protected PipelineFacade pipelineFacade; - + /** * Runtime properties facade. */ @Autowired protected RuntimePropertiesFacade runtimePropertiesFacade; + @Autowired + protected ExecutionFacade executionFacade; + /** * Thread pool. */ @@ -105,6 +115,11 @@ public class Engine implements ApplicationListener { */ protected Boolean startUpDone; + /** + * Backend identifier + */ + protected String backendID; + @PostConstruct private void propertySetter() { this.executorService = Executors.newCachedThreadPool(); @@ -112,12 +127,22 @@ private void propertySetter() { workingDirectory = new File( appConfig.getString(ConfigProperty.GENERAL_WORKINGDIR)); -// limitOfScheduledPipelines = appConfig.getInteger(ConfigProperty.BACKEND_LIMIT_OF_SCHEDULED_PIPELINES); + // limitOfScheduledPipelines = appConfig.getInteger(ConfigProperty.BACKEND_LIMIT_OF_SCHEDULED_PIPELINES); LOG.info("Working dir: {}", workingDirectory.toString()); // make sure that our working directory exist if (workingDirectory.isDirectory()) { workingDirectory.mkdirs(); } + + try { + this.clusterMode = this.appConfig.getBoolean(ConfigProperty.BACKEND_CLUSTER_MODE); + } catch (MissingConfigPropertyException e) { + LOG.info("Running in single mode because cluster mode property is missing in config.properties, {}", e.getLocalizedMessage()); + } + if (this.clusterMode) { + this.backendID = this.appConfig.getString(ConfigProperty.BACKEND_ID); + LOG.info("Backend ID: {}", this.backendID); + } } /** @@ -135,11 +160,11 @@ protected void run(PipelineExecution execution) { /** * Check database for new task (PipelineExecutions to run). Can run - * concurrently. Check database every 20 seconds. + * concurrently. Check database every 2 seconds. */ @Async - @Scheduled(fixedDelay = 20000) + @Scheduled(fixedDelay = 2000) protected void checkJobs() { synchronized (LockRunningJobs) { LOG.debug(">>> Entering checkJobs()"); @@ -152,21 +177,35 @@ protected void checkJobs() { Integer limitOfScheduledPipelines = getLimitOfScheduledPipelines(); LOG.debug("limit of scheduled pipelines: " + limitOfScheduledPipelines); - - List jobs = pipelineFacade.getAllExecutionsByPriorityLimited(PipelineExecutionStatus.QUEUED); - // run pipeline executions .. - for (PipelineExecution job : jobs) { - if (job.getOrderNumber() == ScheduledJobsPriority.IGNORE.getValue()) { - run(job); - numberOfRunningJobs++; - continue; + LOG.debug("Number of running jobs: {}", this.numberOfRunningJobs); + + List jobs = null; + if (this.clusterMode) { + // Update backend activity timestamp in DB + this.executionFacade.updateBackendTimestamp(this.backendID); + int limit = limitOfScheduledPipelines - this.numberOfRunningJobs; + if (limit < 0) { + limit = 0; + } + long countOfUnallocated = this.executionFacade.getCountOfUnallocatedQueuedExecutionsWithIgnorePriority(); + if (limit < countOfUnallocated) { + limit = (int) countOfUnallocated; } - if (numberOfRunningJobs < limitOfScheduledPipelines) { + int allocated = this.executionFacade.allocateQueuedExecutionsForBackend(this.backendID, limit); + LOG.debug("Allocated {} executions by backend '{}'", allocated, this.backendID); + + LOG.debug("Going to find all allocated QUEUED executions"); + jobs = this.pipelineFacade.getAllExecutionsByPriorityLimited(PipelineExecutionStatus.QUEUED, this.backendID); + LOG.debug("Found {} executions planned for execution", jobs.size()); + } else { + jobs = this.pipelineFacade.getAllExecutionsByPriorityLimited(PipelineExecutionStatus.QUEUED); + } + // run pipeline executions .. + for (PipelineExecution job : jobs) { + if (this.numberOfRunningJobs < limitOfScheduledPipelines || ScheduledJobsPriority.IGNORE.getValue() == job.getOrderNumber()) { run(job); - numberOfRunningJobs++; - } else { - break; + this.numberOfRunningJobs++; } } @@ -186,7 +225,7 @@ protected Integer getLimitOfScheduledPipelines() { return DEFAULT_LIMIT_SHEDULED_PPL; } try { - return Integer.parseInt(limit.getValue()); + return Integer.parseInt(limit.getValue()); } catch (NumberFormatException e) { LOG.error("Value not a number of RuntimeProperty: " + ConfigProperty.BACKEND_LIMIT_OF_SCHEDULED_PIPELINES.toString() + ", error: " + e.getMessage()); @@ -210,8 +249,12 @@ protected synchronized void startUp() { ExecutionSanitizer sanitizer = beanFactory.getBean(ExecutionSanitizer.class); // list executions - List running = pipelineFacade - .getAllExecutions(PipelineExecutionStatus.RUNNING); + List running = null; + if (this.clusterMode) { + running = this.pipelineFacade.getAllExecutions(PipelineExecutionStatus.RUNNING, this.backendID); + } else { + running = this.pipelineFacade.getAllExecutions(PipelineExecutionStatus.RUNNING); + } for (PipelineExecution execution : running) { MDC.put(Log.MDC_EXECUTION_KEY_NAME, execution.getId().toString()); // hanging pipeline .. @@ -226,8 +269,12 @@ protected synchronized void startUp() { MDC.remove(Log.MDC_EXECUTION_KEY_NAME); } - List cancelling = pipelineFacade - .getAllExecutions(PipelineExecutionStatus.CANCELLING); + List cancelling = null; + if (this.clusterMode) { + cancelling = this.pipelineFacade.getAllExecutions(PipelineExecutionStatus.CANCELLING, this.backendID); + } else { + cancelling = this.pipelineFacade.getAllExecutions(PipelineExecutionStatus.CANCELLING); + } for (PipelineExecution execution : cancelling) { MDC.put(Log.MDC_EXECUTION_KEY_NAME, execution.getId().toString()); diff --git a/backend/src/main/java/cz/cuni/mff/xrg/odcs/backend/execution/ExecutionSanitizer.java b/backend/src/main/java/cz/cuni/mff/xrg/odcs/backend/execution/ExecutionSanitizer.java index eda7758f27..7b7fda11fd 100644 --- a/backend/src/main/java/cz/cuni/mff/xrg/odcs/backend/execution/ExecutionSanitizer.java +++ b/backend/src/main/java/cz/cuni/mff/xrg/odcs/backend/execution/ExecutionSanitizer.java @@ -30,6 +30,9 @@ import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineRestart; import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineSanitized; +import cz.cuni.mff.xrg.odcs.commons.app.conf.AppConfig; +import cz.cuni.mff.xrg.odcs.commons.app.conf.ConfigProperty; +import cz.cuni.mff.xrg.odcs.commons.app.conf.MissingConfigPropertyException; import cz.cuni.mff.xrg.odcs.commons.app.dataunit.DataUnitFactory; import cz.cuni.mff.xrg.odcs.commons.app.dpu.DPUInstanceRecord; import cz.cuni.mff.xrg.odcs.commons.app.execution.context.DataUnitInfo; @@ -39,8 +42,8 @@ import cz.cuni.mff.xrg.odcs.commons.app.pipeline.PipelineExecutionStatus; import cz.cuni.mff.xrg.odcs.commons.app.resource.MissingResourceException; import cz.cuni.mff.xrg.odcs.commons.app.resource.ResourceManager; -import eu.unifiedviews.commons.dataunit.ManagableDataUnit; import cz.cuni.mff.xrg.odcs.rdf.repositories.GraphUrl; +import eu.unifiedviews.commons.dataunit.ManagableDataUnit; import eu.unifiedviews.commons.rdf.repository.RDFException; import eu.unifiedviews.dataunit.DataUnitException; @@ -63,6 +66,9 @@ class ExecutionSanitizer { @Autowired private ResourceManager resourceManager; + @Autowired + private AppConfig appConfig; + /** * Fix possible problems with given execution. Logs of this method are * logged with the execution id of given {@link PipelineExecution} Method does not save the changes into database! So called must secure @@ -73,16 +79,25 @@ class ExecutionSanitizer { public void sanitize(PipelineExecution execution) { // check for flags if (execution.getStop()) { - sanitizeCancelling(execution); + sanitizeCancellingRunning(execution); return; } switch (execution.getStatus()) { case CANCELLING: - sanitizeCancelling(execution); + sanitizeCancellingRunning(execution); return; case RUNNING: - sanitizeRunning(execution); + boolean restartRunning = true; + try { + restartRunning = this.appConfig.getBoolean(ConfigProperty.BACKEND_STARTUP_RESTART_RUNNING); + } catch (MissingConfigPropertyException ignore) { + /* ignore exception*/} + if (restartRunning) { + restartRunning(execution); + } else { + sanitizeCancellingRunning(execution); + } return; default: // do nothing with such pipeline .. @@ -95,8 +110,8 @@ public void sanitize(PipelineExecution execution) { * * @param execution */ - private void sanitizeRunning(PipelineExecution execution) { - eventPublisher.publishEvent(new PipelineRestart(execution, this)); + private void restartRunning(PipelineExecution execution) { + this.eventPublisher.publishEvent(new PipelineRestart(execution, this)); // set state back to scheduled execution.setStatus(PipelineExecutionStatus.QUEUED); @@ -107,7 +122,7 @@ private void sanitizeRunning(PipelineExecution execution) { * * @param execution */ - private void sanitizeCancelling(PipelineExecution execution) { + private void sanitizeCancellingRunning(PipelineExecution execution) { // publish event about this .. eventPublisher.publishEvent(new PipelineSanitized(execution, this)); @@ -133,7 +148,12 @@ private void sanitizeCancelling(PipelineExecution execution) { // this means that the execution does not run at all } // set canceled state - execution.setStatus(PipelineExecutionStatus.CANCELLED); + if (execution.getStatus() == PipelineExecutionStatus.CANCELLING) { + execution.setStatus(PipelineExecutionStatus.CANCELLED); + } else if (execution.getStatus() == PipelineExecutionStatus.RUNNING) { + execution.setStatus(PipelineExecutionStatus.FAILED); + } + execution.setEnd(now); } diff --git a/backend/src/main/java/cz/cuni/mff/xrg/odcs/backend/execution/pipeline/Executor.java b/backend/src/main/java/cz/cuni/mff/xrg/odcs/backend/execution/pipeline/Executor.java index e7eefc2259..c8074cb06d 100644 --- a/backend/src/main/java/cz/cuni/mff/xrg/odcs/backend/execution/pipeline/Executor.java +++ b/backend/src/main/java/cz/cuni/mff/xrg/odcs/backend/execution/pipeline/Executor.java @@ -44,6 +44,9 @@ import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineFailedEvent; import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineFinished; import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineStarted; +import cz.cuni.mff.xrg.odcs.commons.app.conf.AppConfig; +import cz.cuni.mff.xrg.odcs.commons.app.conf.ConfigProperty; +import cz.cuni.mff.xrg.odcs.commons.app.conf.MissingConfigPropertyException; import cz.cuni.mff.xrg.odcs.commons.app.execution.log.Log; import cz.cuni.mff.xrg.odcs.commons.app.facade.LogFacade; import cz.cuni.mff.xrg.odcs.commons.app.facade.PipelineFacade; @@ -125,11 +128,29 @@ public class Executor implements Runnable { */ private Date lastSuccessfulExTime; + @Autowired + private AppConfig appConfig; + + /** + * Backend identifier + */ + private String backendID; + + private boolean clusterMode = false; + /** * Sort pre/post executors. */ @PostConstruct public void init() { + try { + this.clusterMode = this.appConfig.getBoolean(ConfigProperty.BACKEND_CLUSTER_MODE); + } catch (MissingConfigPropertyException e) { + LOG.info("Running in single mode because cluster mode property is missing in config.properties, {}", e.getLocalizedMessage()); + } + if (this.clusterMode) { + this.backendID = this.appConfig.getString(ConfigProperty.BACKEND_ID); + } if (preExecutors != null) { Collections.sort(preExecutors, AnnotationAwareOrderComparator.INSTANCE); @@ -155,6 +176,9 @@ public void bind(PipelineExecution execution) { // update state and set start time this.execution.setStart(new Date()); this.execution.setStatus(PipelineExecutionStatus.RUNNING); + if (this.clusterMode) { + this.execution.setBackendId(this.backendID); + } try { pipelineFacade.save(this.execution); diff --git a/backend/src/main/java/cz/cuni/mff/xrg/odcs/backend/scheduling/Scheduler.java b/backend/src/main/java/cz/cuni/mff/xrg/odcs/backend/scheduling/Scheduler.java index b5df274df8..609ea357fe 100644 --- a/backend/src/main/java/cz/cuni/mff/xrg/odcs/backend/scheduling/Scheduler.java +++ b/backend/src/main/java/cz/cuni/mff/xrg/odcs/backend/scheduling/Scheduler.java @@ -25,13 +25,15 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEvent; -import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationListener; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.transaction.annotation.Transactional; import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineFinished; -import cz.cuni.mff.xrg.odcs.commons.app.facade.PipelineFacade; +import cz.cuni.mff.xrg.odcs.commons.app.conf.AppConfig; +import cz.cuni.mff.xrg.odcs.commons.app.conf.ConfigProperty; +import cz.cuni.mff.xrg.odcs.commons.app.conf.MissingConfigPropertyException; import cz.cuni.mff.xrg.odcs.commons.app.facade.ScheduleFacade; import cz.cuni.mff.xrg.odcs.commons.app.scheduling.Schedule; @@ -44,22 +46,40 @@ class Scheduler implements ApplicationListener { private static final Logger LOG = LoggerFactory.getLogger(Schedule.class); - @Autowired - private ApplicationEventPublisher eventPublisher; + private AppConfig appConfig; + + private boolean clusterMode = false; + /** * Schedule facade. */ @Autowired private ScheduleFacade scheduleFacade; - @Autowired - private PipelineFacade pipelineFacade; + private String backendID; @PostConstruct + private void init() { + try { + this.clusterMode = this.appConfig.getBoolean(ConfigProperty.BACKEND_CLUSTER_MODE); + } catch (MissingConfigPropertyException e) { + LOG.info("Running in single mode because cluster mode property is missing in config.properties, {}", e.getLocalizedMessage()); + } + if (this.clusterMode) { + this.backendID = this.appConfig.getString(ConfigProperty.BACKEND_ID); + } + initialCheck(); + + } + private void initialCheck() { // do initial run-after check - scheduleFacade.executeFollowers(); + if (this.clusterMode) { + this.scheduleFacade.executeFollowers(this.backendID); + } else { + this.scheduleFacade.executeFollowers(); + } } /** @@ -69,19 +89,15 @@ private void initialCheck() { */ private synchronized void onPipelineFinished(PipelineFinished pipelineFinishedEvent) { LOG.trace("onPipelineFinished started"); - if (pipelineFinishedEvent.sucess()) { - // success continue - } else { - // execution failed -> ignore - return; + if (!pipelineFinishedEvent.sucess()) { + return; // If pipeline not successful, no post-process } if (pipelineFinishedEvent.getExecution().getSilentMode()) { // pipeline run in silent mode .. ignore } else { scheduleFacade.executeFollowers( - pipelineFinishedEvent.getExecution().getPipeline() - ); + pipelineFinishedEvent.getExecution().getPipeline()); } LOG.trace("onPipelineFinished finished"); } @@ -92,12 +108,21 @@ private synchronized void onPipelineFinished(PipelineFinished pipelineFinishedEv */ @Async @Scheduled(fixedDelay = 30000) + @Transactional protected synchronized void timeBasedCheck() { LOG.trace("onTimeCheck started"); // check DB for pipelines based on time scheduling + Date now = new Date(); // get all pipelines that are time based - List candidates = scheduleFacade.getAllTimeBasedNotQueuedRunning(); + LOG.debug("Going to check all time based not queued schedules"); + List candidates = null; + if (this.clusterMode) { + candidates = this.scheduleFacade.getAllTimeBasedNotQueuedRunningForCluster(); + } else { + candidates = this.scheduleFacade.getAllTimeBasedNotQueuedRunning(); + } + LOG.debug("Found {} schedule candidates, that could be executed", candidates.size()); // check .. for (Schedule schedule : candidates) { // we use information about next execution diff --git a/backend/src/test/java/cz/cuni/mff/xrg/odcs/backend/execution/EngineMock.java b/backend/src/test/java/cz/cuni/mff/xrg/odcs/backend/execution/EngineMock.java index d0da9e4649..0fdbe5e393 100644 --- a/backend/src/test/java/cz/cuni/mff/xrg/odcs/backend/execution/EngineMock.java +++ b/backend/src/test/java/cz/cuni/mff/xrg/odcs/backend/execution/EngineMock.java @@ -16,18 +16,21 @@ */ package cz.cuni.mff.xrg.odcs.backend.execution; +import java.util.ArrayList; +import java.util.List; + +import cz.cuni.mff.xrg.odcs.commons.app.facade.ExecutionFacade; import cz.cuni.mff.xrg.odcs.commons.app.facade.PipelineFacade; import cz.cuni.mff.xrg.odcs.commons.app.pipeline.PipelineExecution; import cz.cuni.mff.xrg.odcs.commons.app.pipeline.PipelineExecutionStatus; -import java.util.ArrayList; -import java.util.List; - public class EngineMock extends Engine { public EngineMock() { this.startUpDone = true; + this.backendID = "TestBackend"; } + public final List historyOfExecution = new ArrayList<>(); @Override @@ -45,8 +48,11 @@ public void setPipelineFacade(PipelineFacade pipelineFacade) { this.pipelineFacade = pipelineFacade; } + public void setExecutionFacade(ExecutionFacade executionFacade) { + this.executionFacade = executionFacade; + } - public void doCheck(){ + public void doCheck() { checkJobs(); } } diff --git a/backend/src/test/java/cz/cuni/mff/xrg/odcs/backend/scheduling/SchedulerTest.java b/backend/src/test/java/cz/cuni/mff/xrg/odcs/backend/scheduling/SchedulerTest.java index 3f68b838e5..b762ecf9e0 100644 --- a/backend/src/test/java/cz/cuni/mff/xrg/odcs/backend/scheduling/SchedulerTest.java +++ b/backend/src/test/java/cz/cuni/mff/xrg/odcs/backend/scheduling/SchedulerTest.java @@ -17,13 +17,17 @@ package cz.cuni.mff.xrg.odcs.backend.scheduling; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.util.Calendar; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.junit.Test; import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @@ -31,20 +35,22 @@ import org.springframework.transaction.annotation.Transactional; import cz.cuni.mff.xrg.odcs.backend.execution.EngineMock; +import cz.cuni.mff.xrg.odcs.commons.app.facade.ExecutionFacade; import cz.cuni.mff.xrg.odcs.commons.app.facade.PipelineFacade; import cz.cuni.mff.xrg.odcs.commons.app.facade.ScheduleFacade; import cz.cuni.mff.xrg.odcs.commons.app.pipeline.Pipeline; import cz.cuni.mff.xrg.odcs.commons.app.pipeline.PipelineExecution; +import cz.cuni.mff.xrg.odcs.commons.app.pipeline.PipelineExecutionStatus; import cz.cuni.mff.xrg.odcs.commons.app.scheduling.Schedule; import cz.cuni.mff.xrg.odcs.commons.app.scheduling.ScheduleType; -import static org.junit.Assert.assertTrue; - @ContextConfiguration(locations = { "classpath:backend-test-context.xml" }) @RunWith(SpringJUnit4ClassRunner.class) @TransactionConfiguration(defaultRollback = true) public class SchedulerTest { + private static final Logger LOG = LoggerFactory.getLogger(SchedulerTest.class); + public static final Integer RUNNIG_PPL_LIMIT = 2; @Autowired @@ -56,6 +62,9 @@ public class SchedulerTest { @Autowired private ScheduleFacade scheduleFacade; + @Autowired + private ExecutionFacade executionFacade; + private class EngineMockWithLimit extends EngineMock { @Override protected Integer getLimitOfScheduledPipelines() { @@ -82,8 +91,8 @@ public void test() { scheduler.timeBasedCheck(); EngineMock engine = new EngineMockWithLimit(); engine.setPipelineFacade(pipelineFacade); + engine.setExecutionFacade(this.executionFacade); engine.doCheck(); - } @Test @@ -102,15 +111,16 @@ public void test2() { EngineMock engine = new EngineMockWithLimit(); engine.setPipelineFacade(pipelineFacade); + engine.setExecutionFacade(this.executionFacade); engine.doCheck(); - assertEquals(engine.historyOfExecution.size(), 2); + assertEquals(2, engine.historyOfExecution.size()); final Set history = new HashSet<>(); history.add(schedule4.getId()); history.add(schedule3.getId()); - for (int i = 0 ; i < engine.historyOfExecution.size(); ++i) { + for (int i = 0; i < engine.historyOfExecution.size(); ++i) { assertTrue(history.contains(engine.historyOfExecution.get(i).getSchedule().getId())); } @@ -144,6 +154,7 @@ public void test3() { EngineMock engine = new EngineMockWithLimit(); engine.setPipelineFacade(pipelineFacade); + engine.setExecutionFacade(this.executionFacade); engine.doCheck(); assertEquals(3, engine.historyOfExecution.size()); @@ -153,7 +164,7 @@ public void test3() { history.add(schedule2.getId()); history.add(schedule3.getId()); - for (int i = 0 ; i < engine.historyOfExecution.size(); ++i) { + for (int i = 0; i < engine.historyOfExecution.size(); ++i) { assertTrue(history.contains(engine.historyOfExecution.get(i).getSchedule().getId())); } } @@ -161,14 +172,14 @@ public void test3() { engine.numberOfRunningJobs--; engine.doCheck(); - assertEquals(engine.historyOfExecution.size(), 3); + assertEquals(3, engine.historyOfExecution.size()); { final Set history = new HashSet<>(); history.add(schedule.getId()); history.add(schedule2.getId()); history.add(schedule3.getId()); - for (int i = 0 ; i < engine.historyOfExecution.size(); ++i) { + for (int i = 0; i < engine.historyOfExecution.size(); ++i) { assertTrue(history.contains(engine.historyOfExecution.get(i).getSchedule().getId())); } } @@ -177,7 +188,7 @@ public void test3() { engine.numberOfRunningJobs--; engine.doCheck(); - assertEquals(engine.historyOfExecution.size(), 4); + assertEquals(4, engine.historyOfExecution.size()); { final Set history = new HashSet<>(); history.add(schedule.getId()); @@ -185,11 +196,10 @@ public void test3() { history.add(schedule3.getId()); history.add(schedule4.getId()); - for (int i = 0 ; i < engine.historyOfExecution.size(); ++i) { + for (int i = 0; i < engine.historyOfExecution.size(); ++i) { assertTrue(history.contains(engine.historyOfExecution.get(i).getSchedule().getId())); } } - } @Test @@ -208,15 +218,16 @@ public void test4() { EngineMock engine = new EngineMockWithLimit(); engine.setPipelineFacade(pipelineFacade); + engine.setExecutionFacade(this.executionFacade); engine.doCheck(); - assertEquals(engine.historyOfExecution.size(), 2); + assertEquals(2, engine.historyOfExecution.size()); { final Set history = new HashSet<>(); history.add(schedule3.getId()); history.add(schedule4.getId()); - for (int i = 0 ; i < engine.historyOfExecution.size(); ++i) { + for (int i = 0; i < engine.historyOfExecution.size(); ++i) { assertTrue(history.contains(engine.historyOfExecution.get(i).getSchedule().getId())); } } @@ -224,23 +235,22 @@ public void test4() { engine.numberOfRunningJobs--; engine.doCheck(); - assertEquals(engine.historyOfExecution.size(), 3); + assertEquals(3, engine.historyOfExecution.size()); { final Set history = new HashSet<>(); history.add(schedule2.getId()); history.add(schedule3.getId()); history.add(schedule4.getId()); - for (int i = 0 ; i < engine.historyOfExecution.size(); ++i) { + for (int i = 0; i < engine.historyOfExecution.size(); ++i) { assertTrue(history.contains(engine.historyOfExecution.get(i).getSchedule().getId())); } } - engine.numberOfRunningJobs--; engine.doCheck(); - assertEquals(engine.historyOfExecution.size(), 4); + assertEquals(4, engine.historyOfExecution.size()); { final Set history = new HashSet<>(); history.add(schedule.getId()); @@ -248,10 +258,9 @@ public void test4() { history.add(schedule3.getId()); history.add(schedule4.getId()); - for (int i = 0 ; i < engine.historyOfExecution.size(); ++i) { + for (int i = 0; i < engine.historyOfExecution.size(); ++i) { assertTrue(history.contains(engine.historyOfExecution.get(i).getSchedule().getId())); } } - } } diff --git a/backend/src/test/resources/backend-test-context.xml b/backend/src/test/resources/backend-test-context.xml index ceb137bbc3..5e143c7f6e 100644 --- a/backend/src/test/resources/backend-test-context.xml +++ b/backend/src/test/resources/backend-test-context.xml @@ -54,6 +54,10 @@ + + + + diff --git a/backend/src/test/resources/db/data.sql b/backend/src/test/resources/db/data.sql index d2a5e34605..83444f12e0 100644 --- a/backend/src/test/resources/db/data.sql +++ b/backend/src/test/resources/db/data.sql @@ -49,8 +49,8 @@ INSERT INTO PPL_EDGE(id,graph_id,node_from_id,node_to_id,data_unit_name) INSERT INTO EXEC_CONTEXT_PIPELINE(id) VALUES(1); ---INSERT INTO EXEC_PIPELINE(id,status,pipeline_id,debug_mode,t_start,t_end,context_id,schedule_id,silent_mode,debugnode_id,stop) --- VALUES(1,5,1,0,NULL,NULL,1,NULL,1,NULL,0); +--INSERT INTO EXEC_PIPELINE(id,status,pipeline_id,debug_mode,t_start,t_end,context_id,schedule_id,silent_mode,debugnode_id,stop,backend_id) +-- VALUES(1,5,1,0,NULL,NULL,1,NULL,1,NULL,0,'TestBackend'); ---- schedule define by times when to run pipeline --INSERT INTO EXEC_SCHEDULE(id,description,pipeline_id,user_id,just_once,enabled,type,first_exec,last_exec,time_period,period_unit,strict_timing,strict_tolerance) @@ -113,4 +113,6 @@ INSERT INTO RDF_NS_PREFIX(id, name, uri) INSERT INTO `runtime_properties` (name, value) VALUES ('backend.scheduledPipelines.limit', '5'); INSERT INTO `runtime_properties` (name, value) VALUES ('run.now.pipeline.priority', '1'); +INSERT INTO `backend_servers` (id, backend_id, last_update) VALUES (1, 'TestBackend', CURRENT_TIMESTAMP); + diff --git a/backend/src/test/resources/db/schema.sql b/backend/src/test/resources/db/schema.sql index 1d220f7cd3..dd4ff753a5 100644 --- a/backend/src/test/resources/db/schema.sql +++ b/backend/src/test/resources/db/schema.sql @@ -99,6 +99,16 @@ CREATE INDEX `ix_EXEC_RECORD_r_type` ON `EXEC_RECORD` (`r_type`); CREATE INDEX `ix_EXEC_RECORD_dpu_id` ON `EXEC_RECORD` (`dpu_id`); CREATE INDEX `ix_EXEC_RECORD_execution_id` ON `EXEC_RECORD` (`execution_id`); +CREATE SEQUENCE `seq_backend_servers` START 1; +CREATE TABLE `BACKEND_SERVERS` +( + `id` INTEGER, + `backend_id` VARCHAR(128), + `last_update` TIMESTAMP, + PRIMARY KEY (`id`), + UNIQUE (`backend_id`) +); + CREATE SEQUENCE `seq_exec_pipeline` START WITH 100; CREATE TABLE `EXEC_PIPELINE` ( @@ -116,6 +126,7 @@ CREATE TABLE `EXEC_PIPELINE` `t_last_change` DATETIME, `owner_id` INTEGER, `user_actor_id` INTEGER, + `backend_id` VARCHAR(128), PRIMARY KEY (`id`) ); CREATE INDEX `ix_EXEC_PIPELINE_status` ON `EXEC_PIPELINE` (`status`); diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/conf/ConfigProperty.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/conf/ConfigProperty.java index 680e39eb25..dabb9fa23c 100644 --- a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/conf/ConfigProperty.java +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/conf/ConfigProperty.java @@ -32,6 +32,10 @@ public enum ConfigProperty { BACKEND_LOG_KEEP("backend.log.keepDays"), BACKEND_DEFAULTRDF("backend.defaultRdf"), BACKEND_LIMIT_OF_SCHEDULED_PIPELINES("backend.scheduledPipelines.limit"), + BACKEND_ALIVE_LIMIT("backend.alive.limit"), + BACKEND_ID("backend.id"), + BACKEND_CLUSTER_MODE("backend.cluster.mode"), + BACKEND_STARTUP_RESTART_RUNNING("backend.startup.restart.running"), LOCALE("locale"), EXECUTION_LOG_HISTORY("exec.log.history"), diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/execution/server/DbExecutionServer.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/execution/server/DbExecutionServer.java new file mode 100644 index 0000000000..83ae63fac2 --- /dev/null +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/execution/server/DbExecutionServer.java @@ -0,0 +1,61 @@ +/** + * This file is part of UnifiedViews. + * + * UnifiedViews is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * UnifiedViews is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with UnifiedViews. If not, see . + */ +package cz.cuni.mff.xrg.odcs.commons.app.execution.server; + +import java.util.List; + +import cz.cuni.mff.xrg.odcs.commons.app.dao.db.DbAccess; + +public interface DbExecutionServer extends DbAccess { + + /** + * Get backend execution server entry via its ID + * + * @param backendId + * @return Backend execution server + */ + ExecutionServer getExecutionServer(String backendId); + + /** + * Get list of all executions backend servers registered in database + * + * @return List of all backend servers + */ + List getAllExecutionServers(); + + /** + * Allocates QUEUED pipeline executions in database to backend execution server with given ID + * This method is transactional and guarantees that each pipeline execution is taken only once only by one backend + * Once allocated execution is further processed only by the allocating backend + * Other backends will never touch it + * + * @param backendID + * Backend ID to allocate executions to + * @param limit + * Limit of executions to be allocated + * @return Number of allocated executions for backend + */ + int allocateQueuedExecutionsForBackendByPriority(String backendID, int limit); + + /** + * Get count of unallocated QUEUED executions with IGNORE priority + * + * @return Count of unallocated QUEUED executions with IGNORE priority + */ + long getCountOfUnallocatedQueuedExecutionsWithIgnorePriority(); + +} diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/execution/server/DbExecutionServerImpl.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/execution/server/DbExecutionServerImpl.java new file mode 100644 index 0000000000..9a2f06b901 --- /dev/null +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/execution/server/DbExecutionServerImpl.java @@ -0,0 +1,77 @@ +/** + * This file is part of UnifiedViews. + * + * UnifiedViews is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * UnifiedViews is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with UnifiedViews. If not, see . + */ +package cz.cuni.mff.xrg.odcs.commons.app.execution.server; + +import java.util.List; + +import javax.persistence.TypedQuery; + +import org.springframework.transaction.annotation.Transactional; + +import cz.cuni.mff.xrg.odcs.commons.app.ScheduledJobsPriority; +import cz.cuni.mff.xrg.odcs.commons.app.dao.db.DbAccessBase; +import cz.cuni.mff.xrg.odcs.commons.app.pipeline.PipelineExecutionStatus; + +public class DbExecutionServerImpl extends DbAccessBase implements DbExecutionServer { + + public DbExecutionServerImpl() { + super(ExecutionServer.class); + } + + @Override + public ExecutionServer getExecutionServer(String backendId) { + final String stringQuery = "SELECT e FROM ExecutionServer e WHERE e.backendId = :backendId"; + TypedQuery query = createTypedQuery(stringQuery); + query.setParameter("backendId", backendId); + return execute(query); + } + + @Override + public List getAllExecutionServers() { + final String queryStr = "SELECT e FROM ExecutionServer e"; + return executeList(queryStr); + } + + @Override + @Transactional + public int allocateQueuedExecutionsForBackendByPriority(String backendID, int limit) { + final String queryStr = "UPDATE exec_pipeline SET backend_id = '%s'" + + " WHERE id IN (SELECT id FROM" + + " (SELECT e.id from exec_pipeline e WHERE e.backend_id IS NULL AND e.status = %d" + + " ORDER BY e.order_number ASC, e.id ASC LIMIT %d FOR UPDATE) AS temp)"; + String query = String.format(queryStr, + backendID, + 0, // = QUEUED + limit); + return this.em.createNativeQuery(query).executeUpdate(); + } + + @Override + public long getCountOfUnallocatedQueuedExecutionsWithIgnorePriority() { + final String stringQuery = "SELECT COUNT(e) FROM PipelineExecution e" + + " WHERE e.status = :status" + + " AND e.backendId IS NULL" + + " AND e.orderNumber = :priority"; + TypedQuery query = createCountTypedQuery(stringQuery); + query.setParameter("status", PipelineExecutionStatus.QUEUED); + query.setParameter("priority", ScheduledJobsPriority.IGNORE.getValue()); + Long count = (Long) query.getSingleResult(); + + return count; + } + +} diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/execution/server/ExecutionServer.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/execution/server/ExecutionServer.java new file mode 100644 index 0000000000..bc17286eed --- /dev/null +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/execution/server/ExecutionServer.java @@ -0,0 +1,80 @@ +/** + * This file is part of UnifiedViews. + * + * UnifiedViews is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * UnifiedViews is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with UnifiedViews. If not, see . + */ +package cz.cuni.mff.xrg.odcs.commons.app.execution.server; + +import java.io.Serializable; +import java.util.Date; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.SequenceGenerator; +import javax.persistence.Table; +import javax.persistence.Temporal; +import javax.persistence.TemporalType; + +import cz.cuni.mff.xrg.odcs.commons.app.dao.DataObject; + +@Entity +@Table(name = "backend_servers") +public class ExecutionServer implements Serializable, DataObject { + + private static final long serialVersionUID = 1L; + + /** + * Primary key. + */ + @Id + @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "seq_backend_servers") + @SequenceGenerator(name = "seq_backend_servers", allocationSize = 1) + private Long id; + + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "last_update") + private Date lastUpdate; + + @Column(name = "backend_id") + private String backendId; + + @Override + public Long getId() { + return this.id; + } + + public Date getLastUpdate() { + return this.lastUpdate; + } + + public void setLastUpdate(Date lastUpdate) { + this.lastUpdate = lastUpdate; + } + + public String getBackendId() { + return this.backendId; + } + + public void setBackendId(String backendId) { + this.backendId = backendId; + } + + public void setId(Long id) { + this.id = id; + } + +} diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/ExecutionFacade.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/ExecutionFacade.java new file mode 100644 index 0000000000..912aa4ccb4 --- /dev/null +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/ExecutionFacade.java @@ -0,0 +1,80 @@ +/** + * This file is part of UnifiedViews. + * + * UnifiedViews is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * UnifiedViews is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with UnifiedViews. If not, see . + */ +package cz.cuni.mff.xrg.odcs.commons.app.facade; + +import java.util.List; + +import cz.cuni.mff.xrg.odcs.commons.app.execution.server.ExecutionServer; + +public interface ExecutionFacade extends Facade { + + /** + * Get all backend execution servers registered in the database + * + * @return List of all backend servers + */ + List getAllExecutionServers(); + + /** + * Get execution server for given backend ID + * + * @param backendId + * @return Backend server entry for given ID + */ + ExecutionServer getExecutionServer(String backendId); + + /** + * Check if any of the backend executions servers registered in the database are active.
+ * By default active means, that timestamp for some of the backends have been updated in less than 20s ago
+ * Timeout for backend activity can be set via property 'backend.alive.limit' + * + * @return True if at least one backend is active, False if no backend is active + */ + boolean checkAnyBackendActive(); + + /** + * Updates timestamp of given backend to actual time + * If backend with given ID not in the database yet, new entry with this ID and current time is created + * + * @param backendId + */ + void updateBackendTimestamp(String backendId); + + /** + * Allocates QUEUED pipeline executions in database to backend execution server with given ID + * This method is transactional and guarantees that each pipeline execution is taken only once only by one backend + * Once allocated execution is further processed only by the allocating backend + * Other backends will never touch it + *

+ * Queued executions with IGNORE priority are all allocated to first backend, and only limited count of non-ignore priority executions are allocated. + * + * @param backendID + * Backend ID to allocate executions to + * @param limit + * Limit of non-ignore priority executions to be allocated + * @return Number of allocated executions for backend + */ + int allocateQueuedExecutionsForBackend(String backendID, int limit); + + /** + * Get count of unallocated QUEUED executions with IGNORE priority + * + * @return Count of unallocated QUEUED executions with IGNORE priority + */ + long getCountOfUnallocatedQueuedExecutionsWithIgnorePriority(); + +} diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/ExecutionFacadeImpl.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/ExecutionFacadeImpl.java new file mode 100644 index 0000000000..16d939faec --- /dev/null +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/ExecutionFacadeImpl.java @@ -0,0 +1,118 @@ +/** + * This file is part of UnifiedViews. + * + * UnifiedViews is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * UnifiedViews is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with UnifiedViews. If not, see . + */ +package cz.cuni.mff.xrg.odcs.commons.app.facade; + +import java.util.Date; +import java.util.List; + +import javax.annotation.PostConstruct; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Transactional; + +import cz.cuni.mff.xrg.odcs.commons.app.conf.AppConfig; +import cz.cuni.mff.xrg.odcs.commons.app.conf.ConfigProperty; +import cz.cuni.mff.xrg.odcs.commons.app.conf.MissingConfigPropertyException; +import cz.cuni.mff.xrg.odcs.commons.app.execution.server.DbExecutionServer; +import cz.cuni.mff.xrg.odcs.commons.app.execution.server.ExecutionServer; + +public class ExecutionFacadeImpl implements ExecutionFacade { + + private static Logger LOG = LoggerFactory.getLogger(ExecutionFacadeImpl.class); + + @Autowired + private DbExecutionServer dbExecutionServer; + + @Autowired + private AppConfig appConfig; + + private int backendAliveLimit; + + private static final int BACKEND_ALIVE_DEFAULT_LIMIT = 10; + + @PostConstruct + public void init() { + try { + this.backendAliveLimit = this.appConfig.getInteger(ConfigProperty.BACKEND_ALIVE_LIMIT); + } catch (MissingConfigPropertyException e) { + this.backendAliveLimit = BACKEND_ALIVE_DEFAULT_LIMIT; + } + } + + @Override + public ExecutionServer getExecutionServer(String backendId) { + return this.dbExecutionServer.getExecutionServer(backendId); + } + + @Override + public boolean checkAnyBackendActive() { + boolean alive = false; + List backends = getAllExecutionServers(); + + if (backends == null || backends.isEmpty()) { + LOG.debug("No backend has ever run with this system"); + return false; + } + + Long limitDateTime = System.currentTimeMillis() - (this.backendAliveLimit * 1000); + Date limitDate = new Date(limitDateTime); + + for (ExecutionServer backend : backends) { + if (backend.getLastUpdate().after(limitDate)) { + alive = true; + } + } + if (!alive) { + LOG.debug("No backend has been active for at least {}s.", this.backendAliveLimit); + } + + return alive; + } + + @Override + public List getAllExecutionServers() { + return this.dbExecutionServer.getAllExecutionServers(); + } + + @Override + @Transactional + public void updateBackendTimestamp(String backendId) { + ExecutionServer backend = getExecutionServer(backendId); + if (backend != null) { + backend.setLastUpdate(new Date()); + } else { + backend = new ExecutionServer(); + backend.setBackendId(backendId); + backend.setLastUpdate(new Date()); + } + + this.dbExecutionServer.save(backend); + } + + @Override + public int allocateQueuedExecutionsForBackend(String backendID, int limit) { + return this.dbExecutionServer.allocateQueuedExecutionsForBackendByPriority(backendID, limit); + } + + @Override + public long getCountOfUnallocatedQueuedExecutionsWithIgnorePriority() { + return this.dbExecutionServer.getCountOfUnallocatedQueuedExecutionsWithIgnorePriority(); + } + +} diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/PipelineFacade.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/PipelineFacade.java index e5633f6cde..6211832aa1 100644 --- a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/PipelineFacade.java +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/PipelineFacade.java @@ -199,6 +199,15 @@ public interface PipelineFacade extends Facade { List getAllExecutionsByPriorityLimited(PipelineExecutionStatus status); + /** + * Fetches all executions with given status and backend ID + * + * @param status + * @param backendID + * @return list of executions + */ + List getAllExecutions(PipelineExecutionStatus status, String backendID); + /** * Find pipeline execution in database by ID and return it. * @@ -344,4 +353,15 @@ PipelineExecution getLastExec(Schedule schedule, */ boolean hasExecutionsWithStatus(Pipeline pipeline, List statuses); + /** + * Get all executions with given status and executed by backend with given backend ID + * + * @param status + * Execution status + * @param backendID + * backend ID + * @return List of priority ordered executions + */ + List getAllExecutionsByPriorityLimited(PipelineExecutionStatus status, String backendID); + } diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/PipelineFacadeImpl.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/PipelineFacadeImpl.java index e841ca42b1..6d3e795b43 100644 --- a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/PipelineFacadeImpl.java +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/PipelineFacadeImpl.java @@ -360,8 +360,7 @@ public boolean isUpToDate(Pipeline pipeline) { Date lastChange = dbPipeline.getLastChange(); Date myLastChange = pipeline.getLastChange(); - return lastChange == null ? true : - myLastChange == null ? false : !lastChange.after(myLastChange); + return lastChange == null ? true : myLastChange == null ? false : !lastChange.after(myLastChange); } /* ******************** Methods for managing PipelineExecutions ********* */ @@ -412,12 +411,24 @@ public List getAllExecutions(PipelineExecutionStatus status) return executionDao.getAll(status); } + @PreAuthorize("hasRole('pipelineExecution.read')") + @Override + public List getAllExecutions(PipelineExecutionStatus status, String backendID) { + return this.executionDao.getAll(status, backendID); + } + @PreAuthorize("hasRole('pipelineExecution.read')") @Override public List getAllExecutionsByPriorityLimited(PipelineExecutionStatus status) { return executionDao.getAllByPriorityLimited(status); } + @PreAuthorize("hasRole('pipelineExecution.read')") + @Override + public List getAllExecutionsByPriorityLimited(PipelineExecutionStatus status, String backendID) { + return this.executionDao.getAllByPriorityLimited(status, backendID); + } + /** * Find pipeline execution in database by ID and return it. * @@ -644,4 +655,5 @@ public boolean hasExecutionsWithStatus(Pipeline pipeline, List getAllPipelines(String externalUserId) { return this.pipelineDao.getPipelinesForUser(externalUserId); } + } diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/ScheduleFacade.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/ScheduleFacade.java index 8f73dd87d8..4d163facd0 100644 --- a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/ScheduleFacade.java +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/ScheduleFacade.java @@ -63,6 +63,14 @@ public interface ScheduleFacade extends Facade { */ List getAllTimeBasedNotQueuedRunning(); + /** + * Fetches all {@link Schedule}s which are activated in certain time + * and the execution for the scheduled pipeline isn't already running. + * + * @return list of all schedules planned to launch on time + */ + List getAllTimeBasedNotQueuedRunningForCluster(); + /** * Find Schedule in database by ID and return it. * @@ -102,9 +110,16 @@ public interface ScheduleFacade extends Facade { void execute(Schedule schedule); /** - * Check for all schedule that run after some execution and run them if all - * the the pre-runs has been executed. The call of this function may be - * expensive as it check for all runAfter based pipelines. + * Checks all schedule that run after some execution and run them if all the pre-runs + * have been executed by this backend node + * + * @param backendID + */ + void executeFollowers(String backendID); + + /** + * Checks all schedule that run after some execution and run them if all the pre-runs + * have been executed */ void executeFollowers(); diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/ScheduleFacadeImpl.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/ScheduleFacadeImpl.java index 09b567c68b..b19baece19 100644 --- a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/ScheduleFacadeImpl.java +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/facade/ScheduleFacadeImpl.java @@ -117,6 +117,21 @@ public List getAllTimeBasedNotQueuedRunning() { return scheduleDao.getAllTimeBasedNotQueuedRunning(); } + /** + * Fetches all {@link Schedule}s which are activated in + * certain time and the execution for the scheduled pipeline + * isn't already queued or running. + *
+ * Fetched schedules are locked for update so other backends don't execute them too + * + * @return + */ + @PostFilter("hasPermission(filterObject, 'scheduleRule.read')") + @Override + public List getAllTimeBasedNotQueuedRunningForCluster() { + return this.scheduleDao.getAllTimeBasedNotQueuedRunningForCluster(); + } + /** * Find Schedule in database by ID and return it. * @@ -220,7 +235,7 @@ public void execute(Schedule schedule) { @Transactional @Override public void executeFollowers() { - List toRun = scheduleDao.getActiveRunAfterBased(); + List toRun = this.scheduleDao.getActiveRunAfterBased(); // filter those that should not run toRun = filterActiveRunAfter(toRun); // and execute @@ -229,6 +244,24 @@ public void executeFollowers() { } } + /** + * Check for all schedule that run after some execution and run them + * if all the the pre-runs has been executed. The call of this + * function may be expensive as it check for all runAfter based + * pipelines. + */ + @Transactional + @Override + public void executeFollowers(String backendID) { + List toRun = this.scheduleDao.getActiveRunAfterBased(); + // filter those that should not run + toRun = filterActiveRunAfter(toRun, backendID); + // and execute + for (Schedule schedule : toRun) { + execute(schedule); + } + } + /** * Executes all pipelines scheduled to follow given pipeline. * @@ -253,10 +286,24 @@ public void executeFollowers(Pipeline pipeline) { */ @PreAuthorize("hasRole('scheduleRule.read')") private List filterActiveRunAfter(List candidates) { + return filterActiveRunAfter(candidates, null); + } + + /** + * @return schedules that are of type {@link ScheduleType#AFTER_PIPELINE} and that should be executed (all their {@link Schedule#afterPipelines + * after-pipeline} executions finished). + */ + @PreAuthorize("hasRole('scheduleRule.read')") + private List filterActiveRunAfter(List candidates, String backendID) { List result = new LinkedList<>(); for (Schedule schedule : candidates) { - List times = scheduleDao.getLastExecForRunAfter(schedule); + List times = null; + if (backendID != null) { + times = this.scheduleDao.getLastExecForRunAfter(schedule, backendID); + } else { + times = this.scheduleDao.getLastExecForRunAfter(schedule); + } boolean execute = true; for (Date item : times) { if (item == null) { diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/DbExecution.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/DbExecution.java index 50af7f6e9b..b5ffaff018 100644 --- a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/DbExecution.java +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/DbExecution.java @@ -49,8 +49,37 @@ public interface DbExecution extends DbAccess { */ public List getAll(PipelineExecutionStatus status); + /** + * Get all executions with given status ordered by priority + * + * @param status + * Execution status + * @return List with priority ordered executions + */ public List getAllByPriorityLimited(PipelineExecutionStatus status); + /** + * Get all executions with given status and backend ID ordered by priority + * + * @param status + * Execution status + * @param backendID + * Backend ID + * @return List of all executions with given status and backend ID + */ + public List getAllByPriorityLimited(PipelineExecutionStatus status, String backendID); + + /** + * Get all executions with given status and backend ID + * + * @param status + * Execution status + * @param backendID + * Backend ID + * @return List of all executions with given status and backend ID + */ + public List getAll(PipelineExecutionStatus status, String backendID); + /** * @param pipeline * @param status @@ -97,18 +126,22 @@ public PipelineExecution getLastExecution(Schedule schedule, /** * Checks if some of the executions were deleted *

- * @param ids executions to check + * + * @param ids + * executions to check * @return true if one or more execution were deleted */ - public boolean hasDeleted(List ids); - - /** - * Checks if there are executions for selected pipeline with selected statuses - * - * @param pipeline for which executions we are checking - * @param statuses of executions we are checking - * @return true if there is at least one execution with selected statuses, false otherwise - */ + public boolean hasDeleted(List ids); + + /** + * Checks if there are executions for selected pipeline with selected statuses + * + * @param pipeline + * for which executions we are checking + * @param statuses + * of executions we are checking + * @return true if there is at least one execution with selected statuses, false otherwise + */ boolean hasWithStatus(Pipeline pipeline, List statuses); } diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/DbExecutionImpl.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/DbExecutionImpl.java index 96979c8fae..99124b105c 100644 --- a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/DbExecutionImpl.java +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/DbExecutionImpl.java @@ -22,6 +22,8 @@ import javax.persistence.TypedQuery; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @@ -35,7 +37,9 @@ * @author Petyr */ @Transactional(propagation = Propagation.MANDATORY) -class DbExecutionImpl extends DbAccessBase implements DbExecution { +class DbExecutionImpl extends DbAccessBaseimplements DbExecution { + + private static Logger LOG = LoggerFactory.getLogger(DbExecutionImpl.class); protected DbExecutionImpl() { super(PipelineExecution.class); @@ -75,6 +79,17 @@ public List getAllByPriorityLimited(PipelineExecutionStatus s return executeList(query); } + @Override + public List getAllByPriorityLimited(PipelineExecutionStatus status, String backendID) { + final String stringQuery = "SELECT e FROM PipelineExecution e" + + " WHERE e.status = :status and e.backendId = :backendId and e.orderNumber >= :limited_priority " + + "order by e.orderNumber ASC , e.id ASC"; + TypedQuery query = createTypedQuery(stringQuery); + query.setParameter("limited_priority", ScheduledJobsPriority.IGNORE.getValue()); + query.setParameter("status", status); + query.setParameter("backendId", backendID); + return executeList(query); + } @Override public List getAll(Pipeline pipeline, PipelineExecutionStatus status) { @@ -86,6 +101,16 @@ public List getAll(Pipeline pipeline, PipelineExecutionStatus return executeList(query); } + @Override + public List getAll(PipelineExecutionStatus status, String backendID) { + final String stringQuery = "SELECT e FROM PipelineExecution e" + + " WHERE e.status = :status AND e.backendId = :backend"; + TypedQuery query = createTypedQuery(stringQuery); + query.setParameter("status", status); + query.setParameter("backend", backendID); + return executeList(query); + } + @Override public PipelineExecution getLastExecution(Pipeline pipeline, Set statuses) { @@ -132,17 +157,17 @@ public boolean hasModified(Date since) { @Override public boolean hasDeleted(List ids) { - if (ids == null || ids.isEmpty()) { - return false; - } - final String stringQuery = "SELECT COUNT(e) FROM PipelineExecution e" - + " WHERE e.id IN :ids"; - TypedQuery query = createCountTypedQuery(stringQuery); - query.setParameter("ids", ids); - Long number = (Long) query.getSingleResult(); - return !number.equals((long)ids.size()); + if (ids == null || ids.isEmpty()) { + return false; + } + final String stringQuery = "SELECT COUNT(e) FROM PipelineExecution e" + + " WHERE e.id IN :ids"; + TypedQuery query = createCountTypedQuery(stringQuery); + query.setParameter("ids", ids); + Long number = (Long) query.getSingleResult(); + return !number.equals((long) ids.size()); } - + @Override public boolean hasWithStatus(Pipeline pipeline, List statuses) { final String stringQuery = "SELECT COUNT(e) FROM PipelineExecution e" @@ -154,4 +179,5 @@ public boolean hasWithStatus(Pipeline pipeline, List st Long count = (Long) query.getSingleResult(); return count > 0; } + } diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/DbPipeline.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/DbPipeline.java index e1af40c6b6..bea7e73e94 100644 --- a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/DbPipeline.java +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/DbPipeline.java @@ -81,4 +81,5 @@ public interface DbPipeline extends DbAccess { * @return List of pipelines */ public List getPipelinesForUser(String externalUserId); + } diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/DbPipelineImpl.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/DbPipelineImpl.java index 58ee791cb7..078c4152f9 100644 --- a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/DbPipelineImpl.java +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/DbPipelineImpl.java @@ -35,7 +35,7 @@ * @author Martin Virag */ @Transactional(propagation = Propagation.MANDATORY) -class DbPipelineImpl extends DbAccessBase implements DbPipeline { +class DbPipelineImpl extends DbAccessBaseimplements DbPipeline { protected DbPipelineImpl() { super(Pipeline.class); diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/PipelineExecution.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/PipelineExecution.java index 1f17f6ea86..04928cf36f 100644 --- a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/PipelineExecution.java +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/pipeline/PipelineExecution.java @@ -136,6 +136,9 @@ public class PipelineExecution implements OwnedEntity, DataObject { @JoinColumn(name = "user_actor_id") private UserActor actor; + @Column(name = "backend_id") + private String backendId; + /** * No-arg constructor for JPA */ @@ -440,6 +443,14 @@ public void setActor(UserActor actor) { this.actor = actor; } + public String getBackendId() { + return this.backendId; + } + + public void setBackendId(String backendId) { + this.backendId = backendId; + } + /** * Hashcode is compatible with {@link #equals(java.lang.Object)}. * diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/scheduling/DbSchedule.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/scheduling/DbSchedule.java index 0a1110d192..750b84e69e 100644 --- a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/scheduling/DbSchedule.java +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/scheduling/DbSchedule.java @@ -69,6 +69,15 @@ public interface DbSchedule extends DbAccess { */ public List getAllTimeBasedNotQueuedRunning(); + /** + * Fetches all {@link Schedule}s which are activated in + * certain time and the execution for the scheduled pipeline + * isn't already queued or running. + * + * @return list of schedules + */ + public List getAllTimeBasedNotQueuedRunningForCluster(); + /** * Fetches active (enabled) {@link Schedule}s which are activated based on * pipelines executions. @@ -86,4 +95,14 @@ public interface DbSchedule extends DbAccess { */ public List getLastExecForRunAfter(Schedule schedule); + /** + * Return times of last executions (or null if there has been no successful + * execution) of run-after pipelines for runAfter base schedule. + * + * @param schedule + * @param backendID + * @return list of timestamps + */ + public List getLastExecForRunAfter(Schedule schedule, String backendID); + } diff --git a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/scheduling/DbScheduleImpl.java b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/scheduling/DbScheduleImpl.java index 7fbfd018c0..5bbcfccd1e 100644 --- a/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/scheduling/DbScheduleImpl.java +++ b/commons-app/src/main/java/cz/cuni/mff/xrg/odcs/commons/app/scheduling/DbScheduleImpl.java @@ -91,6 +91,22 @@ public List getAllTimeBasedNotQueuedRunning() { return executeList(query); } + @SuppressWarnings("unchecked") + @Override + public List getAllTimeBasedNotQueuedRunningForCluster() { + + final String queryStr = "SELECT * FROM exec_schedule s" + + " WHERE s.type = %d AND s.id NOT IN (" + + " SELECT s1.id FROM exec_schedule s1" + + " LEFT JOIN exec_pipeline e" + + " ON e.id = s1.pipeline_id WHERE e.status IN (%d, %d))" + + " ORDER BY s.id ASC" + + " FOR UPDATE"; + + String query = String.format(queryStr, 1, 0, 1); + return this.em.createNativeQuery(query, Schedule.class).getResultList(); + } + @Override public List getActiveRunAfterBased() { final String stringQuery = "SELECT s FROM Schedule s" @@ -121,4 +137,25 @@ public List getLastExecForRunAfter(Schedule schedule) { return Collections.checkedList(query.getResultList(), Date.class); } + @Override + public List getLastExecForRunAfter(Schedule schedule, String backendId) { + final String stringQuery = "SELECT max(exec.end)" + + " FROM Schedule schedule" + + " JOIN schedule.afterPipelines pipeline" + + " JOIN PipelineExecution exec ON exec.pipeline = pipeline" + + " WHERE schedule.id = :schedule AND exec.status IN :status AND exec.backendId = :backendId" + + " GROUP BY pipeline.id"; + + Set statuses = new HashSet<>(); + statuses.add(PipelineExecutionStatus.FINISHED_SUCCESS); + statuses.add(PipelineExecutionStatus.FINISHED_WARNING); + + TypedQuery query = this.em.createQuery(stringQuery, Date.class); + query.setParameter("schedule", schedule.getId()); + query.setParameter("status", statuses); + query.setParameter("backendId", backendId); + + return Collections.checkedList(query.getResultList(), Date.class); + } + } diff --git a/commons-app/src/main/java/eu/unifiedviews/commons/dao/view/ExecutionView.java b/commons-app/src/main/java/eu/unifiedviews/commons/dao/view/ExecutionView.java index f57bc23e28..6179a655f5 100644 --- a/commons-app/src/main/java/eu/unifiedviews/commons/dao/view/ExecutionView.java +++ b/commons-app/src/main/java/eu/unifiedviews/commons/dao/view/ExecutionView.java @@ -112,6 +112,9 @@ public class ExecutionView implements DataObject { @Column(name = "t_last_change") private Date lastChange; + @Column(name = "backend_id") + private String backendId; + @Override public Long getId() { return id; @@ -217,6 +220,14 @@ public void setOwnerFullName(String ownerFullName) { this.ownerFullName = ownerFullName; } + public String getBackendId() { + return this.backendId; + } + + public void setBackendId(String backendId) { + this.backendId = backendId; + } + /** * @return Duration of last pipeline execution, -1 if no such execution exists. */ diff --git a/commons-app/src/main/resources/META-INF/persistence.xml b/commons-app/src/main/resources/META-INF/persistence.xml index fd8781f44b..2beb2ad3ed 100644 --- a/commons-app/src/main/resources/META-INF/persistence.xml +++ b/commons-app/src/main/resources/META-INF/persistence.xml @@ -28,6 +28,7 @@ cz.cuni.mff.xrg.odcs.commons.app.user.EmailAddress cz.cuni.mff.xrg.odcs.commons.app.user.UserNotificationRecord cz.cuni.mff.xrg.odcs.commons.app.user.UserActor + cz.cuni.mff.xrg.odcs.commons.app.execution.server.ExecutionServer eu.unifiedviews.commons.dao.view.PipelineView eu.unifiedviews.commons.dao.view.ExecutionView diff --git a/commons-app/src/main/resources/commons-app-context.xml b/commons-app/src/main/resources/commons-app-context.xml index c5286bec58..131e81bc92 100644 --- a/commons-app/src/main/resources/commons-app-context.xml +++ b/commons-app/src/main/resources/commons-app-context.xml @@ -66,6 +66,9 @@ + + + @@ -86,6 +89,7 @@ + diff --git a/db/mysql/schema.sql b/db/mysql/schema.sql index de3b6c6b70..9fa8ec5800 100644 --- a/db/mysql/schema.sql +++ b/db/mysql/schema.sql @@ -33,6 +33,7 @@ DROP TABLE IF EXISTS `sch_email`; DROP TABLE IF EXISTS `rdf_ns_prefix`; DROP TABLE IF EXISTS `properties`; DROP TABLE IF EXISTS `runtime_properties`; +DROP TABLE IF EXISTS `backend_servers`; CREATE TABLE `dpu_instance` ( @@ -122,6 +123,15 @@ CREATE INDEX `ix_EXEC_RECORD_r_type` ON `exec_record` (`r_type`); CREATE INDEX `ix_EXEC_RECORD_dpu_id` ON `exec_record` (`dpu_id`); CREATE INDEX `ix_EXEC_RECORD_execution_id` ON `exec_record` (`execution_id`); +CREATE TABLE `backend_servers` +( + `id` INTEGER AUTO_INCREMENT, + `backend_id` VARCHAR(128), + `last_update` TIMESTAMP, + PRIMARY KEY (`id`), + UNIQUE (`backend_id`) +); + CREATE TABLE `exec_pipeline` ( `id` INTEGER AUTO_INCREMENT, @@ -139,6 +149,7 @@ CREATE TABLE `exec_pipeline` `owner_id` INTEGER, `user_actor_id` INTEGER, `order_number` BIGINT NOT NULL, + `backend_id` VARCHAR(128), PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE INDEX `ix_EXEC_PIPELINE_status` ON `exec_pipeline` (`status`); @@ -150,6 +161,7 @@ CREATE INDEX `ix_EXEC_PIPELINE_context_id` ON `exec_pipeline` (`context_id`); CREATE INDEX `ix_EXEC_PIPELINE_schedule_id` ON `exec_pipeline` (`schedule_id`); CREATE INDEX `ix_EXEC_PIPELINE_owner_id` ON `exec_pipeline` (`owner_id`); CREATE INDEX `ix_EXEC_PIPELINE_user_actor_id` ON `exec_pipeline` (`user_actor_id`); +CREATE INDEX `ix_EXEC_PIPELINE_backend_id` ON `exec_pipeline` (`backend_id`); CREATE TABLE `exec_schedule` @@ -685,7 +697,7 @@ LEFT JOIN `user_actor` AS actor ON ppl.user_actor_id = actor.id; CREATE VIEW `exec_view` AS SELECT exec.id AS id, exec.status AS status, ppl.id AS pipeline_id, ppl.name AS pipeline_name, exec.debug_mode AS debug_mode, exec.t_start AS t_start, exec.t_end AS t_end, exec.schedule_id AS schedule_id, owner.username AS owner_name, owner.full_name AS owner_full_name, exec.stop AS stop, exec.t_last_change AS t_last_change, -actor.name AS user_actor_name FROM `exec_pipeline` AS exec +exec.backend_id AS backend_id, actor.name AS user_actor_name FROM `exec_pipeline` AS exec LEFT JOIN `ppl_model` AS ppl ON ppl.id = exec.pipeline_id LEFT JOIN `usr_user` AS owner ON owner.id = exec.owner_id LEFT JOIN `user_actor` AS actor ON actor.id = exec.user_actor_id; diff --git a/db/mysql/updates/2.2.0-update.sql b/db/mysql/updates/2.2.0-update.sql new file mode 100644 index 0000000000..54c8e3d8a0 --- /dev/null +++ b/db/mysql/updates/2.2.0-update.sql @@ -0,0 +1,25 @@ +-- ####################################################################### +-- ## MySQL database update script standard UV installation # +-- ## Update from v. 2.1.x to 2.2.0 # +-- ####################################################################### + +CREATE TABLE `backend_servers` +( + `id` INTEGER AUTO_INCREMENT, + `backend_id` VARCHAR(128), + `last_update` TIMESTAMP, + PRIMARY KEY (`id`), + UNIQUE (`backend_id`) +); + +ALTER TABLE `exec_pipeline` ADD COLUMN `backend_id` VARCHAR(128); +CREATE INDEX `ix_EXEC_PIPELINE_backend_id` ON `exec_pipeline` (`backend_id`); + +DROP VIEW IF EXISTS `exec_view`; +CREATE VIEW `exec_view` AS +SELECT exec.id AS id, exec.status AS status, ppl.id AS pipeline_id, ppl.name AS pipeline_name, exec.debug_mode AS debug_mode, exec.t_start AS t_start, +exec.t_end AS t_end, exec.schedule_id AS schedule_id, owner.username AS owner_name, owner.full_name AS owner_full_name, exec.stop AS stop, exec.t_last_change AS t_last_change, +exec.backend_id AS backend_id, actor.name AS user_actor_name FROM `exec_pipeline` AS exec +LEFT JOIN `ppl_model` AS ppl ON ppl.id = exec.pipeline_id +LEFT JOIN `usr_user` AS owner ON owner.id = exec.owner_id +LEFT JOIN `user_actor` AS actor ON actor.id = exec.user_actor_id; \ No newline at end of file diff --git a/db/postgresql/schema.sql b/db/postgresql/schema.sql index 7a65863698..e8c416e666 100644 --- a/db/postgresql/schema.sql +++ b/db/postgresql/schema.sql @@ -20,6 +20,7 @@ DROP SEQUENCE IF EXISTS "seq_organization"; DROP SEQUENCE IF EXISTS "seq_rdf_ns_prefix"; DROP SEQUENCE IF EXISTS "seq_ppl_open_event"; DROP SEQUENCE IF EXISTS "seq_user_actor"; +DROP SEQUENCE IF EXISTS "seq_backend_servers"; DROP VIEW IF EXISTS "pipeline_view"; DROP VIEW IF EXISTS "exec_last_view"; DROP VIEW IF EXISTS "exec_view"; @@ -55,6 +56,7 @@ DROP TABLE IF EXISTS "sch_email"; DROP TABLE IF EXISTS "rdf_ns_prefix"; DROP TABLE IF EXISTS "properties"; DROP TABLE IF EXISTS "runtime_properties"; +DROP TABLE IF EXISTS "backend_servers"; CREATE SEQUENCE "seq_dpu_record" START 1; CREATE TABLE "dpu_instance" @@ -149,6 +151,16 @@ CREATE INDEX "ix_EXEC_RECORD_r_type" ON "exec_record" ("r_type"); CREATE INDEX "ix_EXEC_RECORD_dpu_id" ON "exec_record" ("dpu_id"); CREATE INDEX "ix_EXEC_RECORD_execution_id" ON "exec_record" ("execution_id"); +CREATE SEQUENCE "seq_backend_servers" START 1; +CREATE TABLE "backend_servers" +( + "id" INTEGER, + "backend_id" VARCHAR(128), + "last_update" TIMESTAMP, + PRIMARY KEY ("id"), + UNIQUE ("backend_id") +); + CREATE SEQUENCE "seq_exec_pipeline" START 1; CREATE TABLE "exec_pipeline" ( @@ -167,6 +179,7 @@ CREATE TABLE "exec_pipeline" "owner_id" INTEGER, "user_actor_id" INTEGER, "order_number" BIGINT NOT NULL, + "backend_id" VARCHAR(128), PRIMARY KEY ("id") ); CREATE INDEX "ix_EXEC_PIPELINE_status" ON "exec_pipeline" ("status"); @@ -178,6 +191,7 @@ CREATE INDEX "ix_EXEC_PIPELINE_context_id" ON "exec_pipeline" ("context_id"); CREATE INDEX "ix_EXEC_PIPELINE_schedule_id" ON "exec_pipeline" ("schedule_id"); CREATE INDEX "ix_EXEC_PIPELINE_owner_id" ON "exec_pipeline" ("owner_id"); CREATE INDEX "ix_EXEC_PIPELINE_user_actor_id" ON "exec_pipeline" ("user_actor_id"); +CREATE INDEX "ix_EXEC_PIPELINE_backend_id" ON "exec_pipeline" ("backend_id"); CREATE SEQUENCE "seq_exec_schedule" START 1; CREATE TABLE "exec_schedule" @@ -720,7 +734,7 @@ LEFT JOIN "user_actor" AS actor ON ppl.user_actor_id = actor.id; CREATE VIEW "exec_view" AS SELECT exec.id AS id, exec.status AS status, ppl.id AS pipeline_id, ppl.name AS pipeline_name, exec.debug_mode AS debug_mode, exec.t_start AS t_start, -exec.t_end AS t_end, exec.schedule_id AS schedule_id, owner.username AS owner_name, owner.full_name AS owner_full_name, exec.stop AS stop, exec.t_last_change AS t_last_change, +exec.t_end AS t_end, exec.schedule_id AS schedule_id, exec.backend_id as backend_id, owner.username AS owner_name, owner.full_name AS owner_full_name, exec.stop AS stop, exec.t_last_change AS t_last_change, actor.name AS user_actor_name FROM "exec_pipeline" AS exec LEFT JOIN "ppl_model" AS ppl ON ppl.id = exec.pipeline_id LEFT JOIN "usr_user" AS owner ON owner.id = exec.owner_id diff --git a/db/postgresql/updates/2.2.0-update.sql b/db/postgresql/updates/2.2.0-update.sql new file mode 100644 index 0000000000..75b9b57870 --- /dev/null +++ b/db/postgresql/updates/2.2.0-update.sql @@ -0,0 +1,26 @@ +-- ############################################################################# +-- ## Database update script for both standard and eDemo UV installation # +-- ## Update from v. 2.1.x to 2.2.0 # +-- ############################################################################# +CREATE SEQUENCE "seq_backend_servers" START 1; +CREATE TABLE "backend_servers" +( + "id" INTEGER, + "backend_id" VARCHAR(128), + "last_update" TIMESTAMP, + PRIMARY KEY ("id"), + UNIQUE ("backend_id") +); + +ALTER TABLE "exec_pipeline" ADD COLUMN "backend_id" VARCHAR(128); +CREATE INDEX "ix_EXEC_PIPELINE_backend_id" ON "exec_pipeline" ("backend_id"); + +DROP VIEW IF EXISTS "exec_view"; +CREATE VIEW `exec_view` AS +SELECT exec.id AS id, exec.status AS status, ppl.id AS pipeline_id, ppl.name AS pipeline_name, exec.debug_mode AS debug_mode, exec.t_start AS t_start, +exec.t_end AS t_end, exec.schedule_id AS schedule_id, owner.username AS owner_name, owner.full_name AS owner_full_name, exec.stop AS stop, exec.t_last_change AS t_last_change, +exec.backend_id AS backend_id, actor.name AS user_actor_name FROM `exec_pipeline` AS exec +LEFT JOIN `ppl_model` AS ppl ON ppl.id = exec.pipeline_id +LEFT JOIN `usr_user` AS owner ON owner.id = exec.owner_id +LEFT JOIN `user_actor` AS actor ON actor.id = exec.user_actor_id; + diff --git a/frontend/src/main/java/cz/cuni/mff/xrg/odcs/frontend/auxiliaries/PipelineHelper.java b/frontend/src/main/java/cz/cuni/mff/xrg/odcs/frontend/auxiliaries/PipelineHelper.java index bfb44a1f63..25178938b6 100644 --- a/frontend/src/main/java/cz/cuni/mff/xrg/odcs/frontend/auxiliaries/PipelineHelper.java +++ b/frontend/src/main/java/cz/cuni/mff/xrg/odcs/frontend/auxiliaries/PipelineHelper.java @@ -18,6 +18,8 @@ import java.util.Arrays; +import javax.annotation.PostConstruct; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -30,7 +32,10 @@ import cz.cuni.mff.xrg.odcs.commons.app.ScheduledJobsPriority; import cz.cuni.mff.xrg.odcs.commons.app.communication.CheckDatabaseService; +import cz.cuni.mff.xrg.odcs.commons.app.conf.AppConfig; import cz.cuni.mff.xrg.odcs.commons.app.conf.ConfigProperty; +import cz.cuni.mff.xrg.odcs.commons.app.conf.MissingConfigPropertyException; +import cz.cuni.mff.xrg.odcs.commons.app.facade.ExecutionFacade; import cz.cuni.mff.xrg.odcs.commons.app.facade.PipelineFacade; import cz.cuni.mff.xrg.odcs.commons.app.facade.RuntimePropertiesFacade; import cz.cuni.mff.xrg.odcs.commons.app.pipeline.Pipeline; @@ -53,10 +58,18 @@ public class PipelineHelper { private PipelineFacade pipelineFacade; @Autowired - private CheckDatabaseService checkDatabaseService; + private RuntimePropertiesFacade runtimePropertyFacade; @Autowired - private RuntimePropertiesFacade runtimePropertyFacade; + private ExecutionFacade executionFacade; + + @Autowired + private AppConfig appConfig; + + @Autowired + private CheckDatabaseService checkDatabaseService; + + private boolean backendClusterMode = false; /** * Sets up parameters of pipeline execution and runs the pipeline. @@ -81,7 +94,7 @@ public PipelineExecution runPipeline(Pipeline pipeline, boolean inDebugMode) { * @param debugNode * {@link Node} where debug execution should stop. Valid * only for debug mode. - * @return {@link PipelineExecution} of given {@link Pipeline}. + * @return {@link PipelineExecution} of given {@link Pipeline} or null if backend is offline. */ public PipelineExecution runPipeline(Pipeline pipeline, boolean inDebugMode, Node debugNode) { final boolean hasQueuedOrRunning = pipelineFacade.hasExecutionsWithStatus(pipeline, @@ -97,35 +110,37 @@ public PipelineExecution runPipeline(Pipeline pipeline, boolean inDebugMode, Nod pipelineExec.setDebugNode(debugNode); } - try { - Long orderPosition = getOrderPosition(); - // run immediately - set higher priority - pipelineExec.setOrderNumber(orderPosition); - pipelineFacade.save(pipelineExec); - checkDatabaseService.checkDatabase(); - } catch (RemoteAccessException e) { - ConfirmDialog - .show(UI.getCurrent(), - Messages.getString("PipelineHelper.backend.offline.dialog.name"), Messages.getString("PipelineHelper.backend.offline.dialog.message"), Messages.getString("PipelineHelper.backend.offline.dialog.schedule"), - Messages.getString("PipelineHelper.backend.offline.dialog.cancel"), new ConfirmDialog.Listener() { - private static final long serialVersionUID = 1L; - - @Override - public void onClose(ConfirmDialog cd) { - PipelineExecution pplExec = pipelineFacade.getExecution(pipelineExec.getId()); - if (pplExec != null && pplExec.getStatus() != PipelineExecutionStatus.QUEUED) { - Notification.show(Messages.getString("PipelineHelper.execution.state.title"), Messages.getString("PipelineHelper.execution.state.description"), Type.WARNING_MESSAGE); - return; // already running - } - if (cd.isConfirmed()) { - pipelineFacade.save(pipelineExec); - } else { - pipelineFacade.delete(pipelineExec); - } - } - }); + Long orderPosition = getOrderPosition(); + // run immediately - set higher priority + pipelineExec.setOrderNumber(orderPosition); + this.pipelineFacade.save(pipelineExec); + if (!checkBackendActive()) { + ConfirmDialog.show(UI.getCurrent(), + Messages.getString("PipelineHelper.backend.offline.dialog.name"), + Messages.getString("PipelineHelper.backend.offline.dialog.message"), + Messages.getString("PipelineHelper.backend.offline.dialog.schedule"), + Messages.getString("PipelineHelper.backend.offline.dialog.cancel"), + new ConfirmDialog.Listener() { + private static final long serialVersionUID = 1L; + + @Override + public void onClose(ConfirmDialog cd) { + PipelineExecution pplExec = pipelineFacade.getExecution(pipelineExec.getId()); + if (pplExec != null && pplExec.getStatus() != PipelineExecutionStatus.QUEUED) { + Notification.show(Messages.getString("PipelineHelper.execution.state.title"), + Messages.getString("PipelineHelper.execution.state.description"), Type.WARNING_MESSAGE); + return; // already running + } + if (cd.isConfirmed()) { + pipelineFacade.save(pipelineExec); + } else { + pipelineFacade.delete(pipelineExec); + } + } + }); return null; } + Notification.show(Messages.getString("PipelineHelper.execution.started"), Notification.Type.HUMANIZED_MESSAGE); return pipelineExec; } @@ -166,4 +181,27 @@ private Long getOrderPosition() { return orderNumber; } + @PostConstruct + public void init() { + try { + this.backendClusterMode = this.appConfig.getBoolean(ConfigProperty.BACKEND_CLUSTER_MODE); + } catch (MissingConfigPropertyException e) { + LOG.info("Running in single mode because cluster mode property is missing in config.properties, {}", e.getLocalizedMessage()); + } + } + + private boolean checkBackendActive() { + boolean backendActive = true; + if (this.backendClusterMode) { + backendActive = this.executionFacade.checkAnyBackendActive(); + } else { + try { + this.checkDatabaseService.checkDatabase(); + } catch (RemoteAccessException e) { + backendActive = false; + } + } + return backendActive; + } + } diff --git a/frontend/src/main/java/cz/cuni/mff/xrg/odcs/frontend/gui/views/dpu/DPUPresenterImpl.java b/frontend/src/main/java/cz/cuni/mff/xrg/odcs/frontend/gui/views/dpu/DPUPresenterImpl.java index c100bce94b..66aeff6669 100644 --- a/frontend/src/main/java/cz/cuni/mff/xrg/odcs/frontend/gui/views/dpu/DPUPresenterImpl.java +++ b/frontend/src/main/java/cz/cuni/mff/xrg/odcs/frontend/gui/views/dpu/DPUPresenterImpl.java @@ -424,8 +424,6 @@ public void copyDPUEventHandler() { public void deleteDPUEventHandler() { boolean isDeleted = deleteDPU(selectedDpu); if (isDeleted) { - // clear selection - selectedDpu = null; // and refresh the layout view.refresh(); view.selectNewDPU(null); diff --git a/frontend/src/main/java/cz/cuni/mff/xrg/odcs/frontend/monitor/BackendHeartbeat.java b/frontend/src/main/java/cz/cuni/mff/xrg/odcs/frontend/monitor/BackendHeartbeat.java index 098751bbf9..4b264f9eb7 100644 --- a/frontend/src/main/java/cz/cuni/mff/xrg/odcs/frontend/monitor/BackendHeartbeat.java +++ b/frontend/src/main/java/cz/cuni/mff/xrg/odcs/frontend/monitor/BackendHeartbeat.java @@ -16,11 +16,18 @@ */ package cz.cuni.mff.xrg.odcs.frontend.monitor; +import javax.annotation.PostConstruct; + import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.remoting.RemoteAccessException; import org.springframework.scheduling.annotation.Scheduled; import cz.cuni.mff.xrg.odcs.commons.app.communication.HeartbeatService; +import cz.cuni.mff.xrg.odcs.commons.app.conf.AppConfig; +import cz.cuni.mff.xrg.odcs.commons.app.conf.ConfigProperty; +import cz.cuni.mff.xrg.odcs.commons.app.conf.MissingConfigPropertyException; +import cz.cuni.mff.xrg.odcs.commons.app.facade.ExecutionFacade; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Periodically checks Backend status. As singleton component should prevent @@ -30,9 +37,22 @@ */ public class BackendHeartbeat { + /** + * Logger class. + */ + private static final Logger LOG = LoggerFactory.getLogger(BackendHeartbeat.class); + + @Autowired + private ExecutionFacade executionFacade; + + @Autowired + private AppConfig appConfig; + @Autowired private HeartbeatService heartbeatService; + private boolean backendClusterMode = false; + /** * True if backend is alive. */ @@ -41,14 +61,27 @@ public class BackendHeartbeat { @Scheduled(fixedDelay = 6 * 1000) private void check() { try { - alive = heartbeatService.isAlive(); - } catch (RemoteAccessException ex) { - alive = false; + if (this.backendClusterMode) { + this.alive = this.executionFacade.checkAnyBackendActive(); + } else { + this.alive = this.heartbeatService.isAlive(); + } + } catch (Exception ex) { + this.alive = false; } } public boolean checkIsAlive() { - return alive; + return this.alive; } + @PostConstruct + public void init() { + try { + this.backendClusterMode = this.appConfig.getBoolean(ConfigProperty.BACKEND_CLUSTER_MODE); + } catch (MissingConfigPropertyException e) { + LOG.info("Running in single mode because cluster mode property is missing in config.properties, {}", e.getLocalizedMessage()); + + } + } } diff --git a/frontend/src/main/resources/frontend-messages_sk.properties b/frontend/src/main/resources/frontend-messages_sk.properties index 9cc082ec56..c0bc0b2c6e 100644 --- a/frontend/src/main/resources/frontend-messages_sk.properties +++ b/frontend/src/main/resources/frontend-messages_sk.properties @@ -483,7 +483,7 @@ PipelineExport.schedule.export = Exportova\u0165 pl\u00E1nova\u010D procesu PipelineHelper.backend.offline.dialog.cancel = Zru\u0161i\u0165 PipelineHelper.backend.offline.dialog.message = Administra\u010Dn\u00FD syst\u00E9m je offline. Chcete napl\u00E1nova\u0165 spustenie procesu, ked bude syst\u00E9m online, alebo chcete spracovanie zru\u0161i\u0165? PipelineHelper.backend.offline.dialog.name = Spracovanie procesu -PipelineHelper.backend.offline.dialog.schedule = Pl\u00E1nova\u010D +PipelineHelper.backend.offline.dialog.schedule = Napl\u00E1nova\u0165 PipelineHelper.execution.started = Spracovanie procesu za\u010Dalo .. PipelineHelper.execution.state.description = Stav spracovania sa medzi\u010Dasom zmenil. Skontrolujte a sk\u00FAste znova. PipelineHelper.execution.state.title = Spracovanie nepl\u00E1novan\u00E9 / zru\u0161en\u00E9 diff --git a/frontend/src/main/webapp/WEB-INF/config.sample.properties b/frontend/src/main/webapp/WEB-INF/config.sample.properties index b5a40d086f..395b68238c 100644 --- a/frontend/src/main/webapp/WEB-INF/config.sample.properties +++ b/frontend/src/main/webapp/WEB-INF/config.sample.properties @@ -27,6 +27,15 @@ general.workingdir = {full path to "odcs" (home) dir of the project}/backend/wor backend.host = 127.0.0.1 backend.port = 5010 +# This must be enabled in case backend(s) run cluster mode in order to check backend(s) status +# via database. In case of single backend RMI calls are used. +# backend.cluster.mode = false + +# If backend cluster enabled, this is timeout for backend activity +# If frontend detects that no backend has been active for more than this limit, frontend works in backend offline mode +# Timeout is in secods, default = 10 s +# backend.alive.limit = 10 + # Connection configuration setting for relational database # for mysql { database.sql.driver = com.mysql.jdbc.Driver