Skip to content

Commit

Permalink
Merge branch 'release/UV_Core_v1.4.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal Klempa committed Oct 31, 2014
2 parents 418f5a1 + 52af10b commit 1ca5767
Show file tree
Hide file tree
Showing 103 changed files with 3,159 additions and 1,293 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Local configurations
frontend/src/main/webapp/WEB-INF/config.properties
*.local.xml
.attach*

*.class

Expand Down
2 changes: 1 addition & 1 deletion backend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>cz.cuni.mff.xrg.odcs</groupId>
<artifactId>odcs</artifactId>
<version>1.3.1</version>
<version>1.4.0</version>
</parent>
<artifactId>backend</artifactId>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,36 @@
import org.slf4j.MDC;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;

import cz.cuni.mff.xrg.odcs.backend.execution.event.CheckDatabaseEvent;
import cz.cuni.mff.xrg.odcs.backend.execution.pipeline.Executor;
import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineFinished;
import cz.cuni.mff.xrg.odcs.commons.app.ScheduledJobsPriority;
import cz.cuni.mff.xrg.odcs.commons.app.conf.AppConfig;
import cz.cuni.mff.xrg.odcs.commons.app.conf.ConfigProperty;
import cz.cuni.mff.xrg.odcs.commons.app.execution.log.Log;
import cz.cuni.mff.xrg.odcs.commons.app.facade.PipelineFacade;
import cz.cuni.mff.xrg.odcs.commons.app.facade.RuntimePropertiesFacade;
import cz.cuni.mff.xrg.odcs.commons.app.pipeline.PipelineExecution;
import cz.cuni.mff.xrg.odcs.commons.app.pipeline.PipelineExecutionStatus;
import cz.cuni.mff.xrg.odcs.commons.app.properties.RuntimeProperty;

/**
* Responsible for running and supervision queue of PipelineExecution tasks.
*
* @author Petyr
*/
public class Engine implements ApplicationListener<CheckDatabaseEvent> {
public class Engine implements ApplicationListener<ApplicationEvent> {

private static final Logger LOG = LoggerFactory.getLogger(Engine.class);
private static final Integer DEFAULT_LIMIT_SHEDULED_PPL = 2;
public Integer numberOfRunningJobs = 0;
private final Object LockRunningJobs = new Object();

/**
* Publisher instance.
Expand All @@ -58,7 +66,13 @@ public class Engine implements ApplicationListener<CheckDatabaseEvent> {
* Pipeline facade.
*/
@Autowired
private PipelineFacade pipelineFacade;
protected PipelineFacade pipelineFacade;

/**
* Runtime properties facade.
*/
@Autowired
protected RuntimePropertiesFacade runtimePropertiesFacade;

/**
* Thread pool.
Expand All @@ -82,6 +96,7 @@ private void propertySetter() {

workingDirectory = new File(
appConfig.getString(ConfigProperty.GENERAL_WORKINGDIR));
// limitOfScheduledPipelines = appConfig.getInteger(ConfigProperty.BACKEND_LIMIT_OF_SCHEDULED_PIPELINES);
LOG.info("Working dir: {}", workingDirectory.toString());
// make sure that our working directory exist
if (workingDirectory.isDirectory()) {
Expand All @@ -106,20 +121,61 @@ protected void run(PipelineExecution execution) {
* Check database for new task (PipelineExecutions to run). Can run
* concurrently. Check database every 20 seconds.
*/

@Async
@Scheduled(fixedDelay = 20000)
protected synchronized void checkDatabase() {
if (!startUpDone) {
// we does not start any execution
// before start up method is executed
startUp();
return;
protected void checkJobs() {
synchronized (LockRunningJobs) {
LOG.debug(">>> Entering checkJobs()");
if (!startUpDone) {
// we does not start any execution
// before start up method is executed
startUp();
return;
}

Integer limitOfScheduledPipelines = getLimitOfScheduledPipelines();
LOG.debug("limit of scheduled pipelines: " + limitOfScheduledPipelines);

List<PipelineExecution> jobs = pipelineFacade.getAllExecutionsByPriorityLimited(PipelineExecutionStatus.QUEUED);
// run pipeline executions ..
for (PipelineExecution job : jobs) {
if (job.getOrderNumber() == ScheduledJobsPriority.IGNORE.getValue()) {
run(job);
numberOfRunningJobs++;
continue;
}

if (numberOfRunningJobs < limitOfScheduledPipelines) {
run(job);
numberOfRunningJobs++;
} else {
break;
}
}

LOG.debug("<<< Leaving checkJobs: {}");
}
LOG.trace("Checking for new executions.");
List<PipelineExecution> toExecute = pipelineFacade.getAllExecutions(PipelineExecutionStatus.QUEUED);
// run pipeline executions ..
for (PipelineExecution item : toExecute) {
run(item);
}

/**
* Gets runtime property for number of parallel running pipelines from database. If
* not set or its set wrongly gets default limit.
*
* @return limit for number of parallel running pipelines
*/
protected Integer getLimitOfScheduledPipelines() {
RuntimeProperty limit = runtimePropertiesFacade.getByName(ConfigProperty.BACKEND_LIMIT_OF_SCHEDULED_PIPELINES.toString());
if (limit == null) {
return DEFAULT_LIMIT_SHEDULED_PPL;
}
try {
return Integer.parseInt(limit.getValue());
} catch (NumberFormatException e) {
LOG.error("Value not a number of RuntimeProperty: " + ConfigProperty.BACKEND_LIMIT_OF_SCHEDULED_PIPELINES.toString()
+ ", error: " + e.getMessage());
LOG.warn("Setting limit of scheduled pipelines to default value: " + DEFAULT_LIMIT_SHEDULED_PPL);
return DEFAULT_LIMIT_SHEDULED_PPL;
}
}

Expand Down Expand Up @@ -173,8 +229,17 @@ protected synchronized void startUp() {
}

@Override
public void onApplicationEvent(CheckDatabaseEvent event) {
checkDatabase();
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof PipelineFinished) {
synchronized (LockRunningJobs) {
if (numberOfRunningJobs >= 0)
numberOfRunningJobs--;
}
LOG.trace("Received PipelineFinished event");
}
if (event instanceof CheckDatabaseEvent) {
checkJobs();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package cz.cuni.mff.xrg.odcs.backend.execution;

import cz.cuni.mff.xrg.odcs.commons.app.facade.PipelineFacade;
import cz.cuni.mff.xrg.odcs.commons.app.pipeline.PipelineExecution;
import cz.cuni.mff.xrg.odcs.commons.app.pipeline.PipelineExecutionStatus;

import java.util.ArrayList;
import java.util.List;

public class EngineMock extends Engine {

public EngineMock() {
this.startUpDone = true;
}
public final List<PipelineExecution> historyOfExecution = new ArrayList<>();

@Override
public synchronized void run(PipelineExecution execution) {
//mockit
System.out.println("everything is ok");
historyOfExecution.add(execution);
execution.setStatus(PipelineExecutionStatus.FINISHED_SUCCESS);
this.pipelineFacade.save(execution);
}

public PipelineFacade getPipelineFacade() {
return pipelineFacade;
}

public void setPipelineFacade(PipelineFacade pipelineFacade) {
this.pipelineFacade = pipelineFacade;
}


public void doCheck(){
checkJobs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;

import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineFinished;
import cz.cuni.mff.xrg.odcs.commons.app.facade.PipelineFacade;
import cz.cuni.mff.xrg.odcs.commons.app.facade.ScheduleFacade;
import cz.cuni.mff.xrg.odcs.commons.app.scheduling.Schedule;

Expand All @@ -26,12 +28,18 @@ class Scheduler implements ApplicationListener<ApplicationEvent> {

private static final Logger LOG = LoggerFactory.getLogger(Schedule.class);


@Autowired
private ApplicationEventPublisher eventPublisher;
/**
* Schedule facade.
*/
@Autowired
private ScheduleFacade scheduleFacade;

@Autowired
private PipelineFacade pipelineFacade;

@PostConstruct
private void initialCheck() {
// do initial run-after check
Expand Down Expand Up @@ -68,12 +76,12 @@ private synchronized void onPipelineFinished(PipelineFinished pipelineFinishedEv
*/
@Async
@Scheduled(fixedDelay = 30000)
private synchronized void timeBasedCheck() {
protected synchronized void timeBasedCheck() {
LOG.trace("onTimeCheck started");
// check DB for pipelines based on time scheduling
Date now = new Date();
// get all pipelines that are time based
List<Schedule> candidates = scheduleFacade.getAllTimeBased();
List<Schedule> candidates = scheduleFacade.getAllTimeBasedNotQueuedRunning();
// check ..
for (Schedule schedule : candidates) {
// we use information about next execution
Expand All @@ -95,9 +103,9 @@ public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof PipelineFinished) {
PipelineFinished pipelineFinishedEvent = (PipelineFinished) event;
// ...
LOG.trace("Recieved PipelineFinished event");
LOG.trace("Received PipelineFinished event");
onPipelineFinished(pipelineFinishedEvent);
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*
* @author Petyr
*/
@ContextConfiguration(locations = { "file:src/test/resource/backend-test-context.xml" })
@ContextConfiguration(locations = { "classpath:backend-test-context.xml" })
@RunWith(SpringJUnit4ClassRunner.class)
public class ConfiguratorTest {

Expand Down
Loading

0 comments on commit 1ca5767

Please sign in to comment.