Skip to content

Commit

Permalink
Merge pull request #482 from UnifiedViews/feature/backendCluster
Browse files Browse the repository at this point in the history
Backend active/active cluster
  • Loading branch information
tomas-knap committed Aug 21, 2015
2 parents ec701d9 + 3b63d45 commit c5820f5
Show file tree
Hide file tree
Showing 39 changed files with 1,109 additions and 138 deletions.
11 changes: 11 additions & 0 deletions backend/conf/config.sample.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ general.workingdir = {full path to "odcs" (home) dir of the project}/backend/wor
backend.host = 127.0.0.1
backend.port = 5010

# Backend(s) run in cluster mode - developed and tested only for PostgreSQL
# With this property enabled, 2 or even N backends can work with one database
# All backends work simultaneously, they can process pipeline schedules and executions
# If backend cluster enabled, backend ID must be set - mandatory parameter
# backend.cluster.mode = false
# backend.id = BackendServer1

# If backend should restart running executions on startup
# By default running executions are restarted, if false, executions are failed by backend at startup
# backend.startup.restart.running = true

# Connection configuration setting for relational database
# for mysql {
database.sql.driver = com.mysql.jdbc.Driver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
import cz.cuni.mff.xrg.odcs.commons.app.ScheduledJobsPriority;
import cz.cuni.mff.xrg.odcs.commons.app.conf.AppConfig;
import cz.cuni.mff.xrg.odcs.commons.app.conf.ConfigProperty;
import cz.cuni.mff.xrg.odcs.commons.app.conf.MissingConfigPropertyException;
import cz.cuni.mff.xrg.odcs.commons.app.execution.log.Log;
import cz.cuni.mff.xrg.odcs.commons.app.facade.ExecutionFacade;
import cz.cuni.mff.xrg.odcs.commons.app.facade.PipelineFacade;
import cz.cuni.mff.xrg.odcs.commons.app.facade.RuntimePropertiesFacade;
import cz.cuni.mff.xrg.odcs.commons.app.pipeline.PipelineExecution;
Expand All @@ -56,10 +58,15 @@
public class Engine implements ApplicationListener<ApplicationEvent> {

private static final Logger LOG = LoggerFactory.getLogger(Engine.class);

private static final Integer DEFAULT_LIMIT_SHEDULED_PPL = 2;

public Integer numberOfRunningJobs = 0;

private final Object LockRunningJobs = new Object();

private boolean clusterMode = false;

/**
* Publisher instance.
*/
Expand All @@ -83,13 +90,16 @@ public class Engine implements ApplicationListener<ApplicationEvent> {
*/
@Autowired
protected PipelineFacade pipelineFacade;

/**
* Runtime properties facade.
*/
@Autowired
protected RuntimePropertiesFacade runtimePropertiesFacade;

@Autowired
protected ExecutionFacade executionFacade;

/**
* Thread pool.
*/
Expand All @@ -105,19 +115,34 @@ public class Engine implements ApplicationListener<ApplicationEvent> {
*/
protected Boolean startUpDone;

/**
* Backend identifier
*/
protected String backendID;

@PostConstruct
private void propertySetter() {
this.executorService = Executors.newCachedThreadPool();
this.startUpDone = false;

workingDirectory = new File(
appConfig.getString(ConfigProperty.GENERAL_WORKINGDIR));
// limitOfScheduledPipelines = appConfig.getInteger(ConfigProperty.BACKEND_LIMIT_OF_SCHEDULED_PIPELINES);
// limitOfScheduledPipelines = appConfig.getInteger(ConfigProperty.BACKEND_LIMIT_OF_SCHEDULED_PIPELINES);
LOG.info("Working dir: {}", workingDirectory.toString());
// make sure that our working directory exist
if (workingDirectory.isDirectory()) {
workingDirectory.mkdirs();
}

try {
this.clusterMode = this.appConfig.getBoolean(ConfigProperty.BACKEND_CLUSTER_MODE);
} catch (MissingConfigPropertyException e) {
LOG.info("Running in single mode because cluster mode property is missing in config.properties, {}", e.getLocalizedMessage());
}
if (this.clusterMode) {
this.backendID = this.appConfig.getString(ConfigProperty.BACKEND_ID);
LOG.info("Backend ID: {}", this.backendID);
}
}

/**
Expand All @@ -135,11 +160,11 @@ protected void run(PipelineExecution execution) {

/**
* Check database for new task (PipelineExecutions to run). Can run
* concurrently. Check database every 20 seconds.
* concurrently. Check database every 2 seconds.
*/

@Async
@Scheduled(fixedDelay = 20000)
@Scheduled(fixedDelay = 2000)
protected void checkJobs() {
synchronized (LockRunningJobs) {
LOG.debug(">>> Entering checkJobs()");
Expand All @@ -152,21 +177,35 @@ protected void checkJobs() {

Integer limitOfScheduledPipelines = getLimitOfScheduledPipelines();
LOG.debug("limit of scheduled pipelines: " + limitOfScheduledPipelines);

List<PipelineExecution> jobs = pipelineFacade.getAllExecutionsByPriorityLimited(PipelineExecutionStatus.QUEUED);
// run pipeline executions ..
for (PipelineExecution job : jobs) {
if (job.getOrderNumber() == ScheduledJobsPriority.IGNORE.getValue()) {
run(job);
numberOfRunningJobs++;
continue;
LOG.debug("Number of running jobs: {}", this.numberOfRunningJobs);

List<PipelineExecution> jobs = null;
if (this.clusterMode) {
// Update backend activity timestamp in DB
this.executionFacade.updateBackendTimestamp(this.backendID);
int limit = limitOfScheduledPipelines - this.numberOfRunningJobs;
if (limit < 0) {
limit = 0;
}
long countOfUnallocated = this.executionFacade.getCountOfUnallocatedQueuedExecutionsWithIgnorePriority();
if (limit < countOfUnallocated) {
limit = (int) countOfUnallocated;
}

if (numberOfRunningJobs < limitOfScheduledPipelines) {
int allocated = this.executionFacade.allocateQueuedExecutionsForBackend(this.backendID, limit);
LOG.debug("Allocated {} executions by backend '{}'", allocated, this.backendID);

LOG.debug("Going to find all allocated QUEUED executions");
jobs = this.pipelineFacade.getAllExecutionsByPriorityLimited(PipelineExecutionStatus.QUEUED, this.backendID);
LOG.debug("Found {} executions planned for execution", jobs.size());
} else {
jobs = this.pipelineFacade.getAllExecutionsByPriorityLimited(PipelineExecutionStatus.QUEUED);
}
// run pipeline executions ..
for (PipelineExecution job : jobs) {
if (this.numberOfRunningJobs < limitOfScheduledPipelines || ScheduledJobsPriority.IGNORE.getValue() == job.getOrderNumber()) {
run(job);
numberOfRunningJobs++;
} else {
break;
this.numberOfRunningJobs++;
}
}

Expand All @@ -186,7 +225,7 @@ protected Integer getLimitOfScheduledPipelines() {
return DEFAULT_LIMIT_SHEDULED_PPL;
}
try {
return Integer.parseInt(limit.getValue());
return Integer.parseInt(limit.getValue());
} catch (NumberFormatException e) {
LOG.error("Value not a number of RuntimeProperty: " + ConfigProperty.BACKEND_LIMIT_OF_SCHEDULED_PIPELINES.toString()
+ ", error: " + e.getMessage());
Expand All @@ -210,8 +249,12 @@ protected synchronized void startUp() {
ExecutionSanitizer sanitizer = beanFactory.getBean(ExecutionSanitizer.class);

// list executions
List<PipelineExecution> running = pipelineFacade
.getAllExecutions(PipelineExecutionStatus.RUNNING);
List<PipelineExecution> running = null;
if (this.clusterMode) {
running = this.pipelineFacade.getAllExecutions(PipelineExecutionStatus.RUNNING, this.backendID);
} else {
running = this.pipelineFacade.getAllExecutions(PipelineExecutionStatus.RUNNING);
}
for (PipelineExecution execution : running) {
MDC.put(Log.MDC_EXECUTION_KEY_NAME, execution.getId().toString());
// hanging pipeline ..
Expand All @@ -226,8 +269,12 @@ protected synchronized void startUp() {
MDC.remove(Log.MDC_EXECUTION_KEY_NAME);
}

List<PipelineExecution> cancelling = pipelineFacade
.getAllExecutions(PipelineExecutionStatus.CANCELLING);
List<PipelineExecution> cancelling = null;
if (this.clusterMode) {
cancelling = this.pipelineFacade.getAllExecutions(PipelineExecutionStatus.CANCELLING, this.backendID);
} else {
cancelling = this.pipelineFacade.getAllExecutions(PipelineExecutionStatus.CANCELLING);
}

for (PipelineExecution execution : cancelling) {
MDC.put(Log.MDC_EXECUTION_KEY_NAME, execution.getId().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineRestart;
import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineSanitized;
import cz.cuni.mff.xrg.odcs.commons.app.conf.AppConfig;
import cz.cuni.mff.xrg.odcs.commons.app.conf.ConfigProperty;
import cz.cuni.mff.xrg.odcs.commons.app.conf.MissingConfigPropertyException;
import cz.cuni.mff.xrg.odcs.commons.app.dataunit.DataUnitFactory;
import cz.cuni.mff.xrg.odcs.commons.app.dpu.DPUInstanceRecord;
import cz.cuni.mff.xrg.odcs.commons.app.execution.context.DataUnitInfo;
Expand All @@ -39,8 +42,8 @@
import cz.cuni.mff.xrg.odcs.commons.app.pipeline.PipelineExecutionStatus;
import cz.cuni.mff.xrg.odcs.commons.app.resource.MissingResourceException;
import cz.cuni.mff.xrg.odcs.commons.app.resource.ResourceManager;
import eu.unifiedviews.commons.dataunit.ManagableDataUnit;
import cz.cuni.mff.xrg.odcs.rdf.repositories.GraphUrl;
import eu.unifiedviews.commons.dataunit.ManagableDataUnit;
import eu.unifiedviews.commons.rdf.repository.RDFException;
import eu.unifiedviews.dataunit.DataUnitException;

Expand All @@ -63,6 +66,9 @@ class ExecutionSanitizer {
@Autowired
private ResourceManager resourceManager;

@Autowired
private AppConfig appConfig;

/**
* Fix possible problems with given execution. Logs of this method are
* logged with the execution id of given {@link PipelineExecution} Method does not save the changes into database! So called must secure
Expand All @@ -73,16 +79,25 @@ class ExecutionSanitizer {
public void sanitize(PipelineExecution execution) {
// check for flags
if (execution.getStop()) {
sanitizeCancelling(execution);
sanitizeCancellingRunning(execution);
return;
}

switch (execution.getStatus()) {
case CANCELLING:
sanitizeCancelling(execution);
sanitizeCancellingRunning(execution);
return;
case RUNNING:
sanitizeRunning(execution);
boolean restartRunning = true;
try {
restartRunning = this.appConfig.getBoolean(ConfigProperty.BACKEND_STARTUP_RESTART_RUNNING);
} catch (MissingConfigPropertyException ignore) {
/* ignore exception*/}
if (restartRunning) {
restartRunning(execution);
} else {
sanitizeCancellingRunning(execution);
}
return;
default:
// do nothing with such pipeline ..
Expand All @@ -95,8 +110,8 @@ public void sanitize(PipelineExecution execution) {
*
* @param execution
*/
private void sanitizeRunning(PipelineExecution execution) {
eventPublisher.publishEvent(new PipelineRestart(execution, this));
private void restartRunning(PipelineExecution execution) {
this.eventPublisher.publishEvent(new PipelineRestart(execution, this));

// set state back to scheduled
execution.setStatus(PipelineExecutionStatus.QUEUED);
Expand All @@ -107,7 +122,7 @@ private void sanitizeRunning(PipelineExecution execution) {
*
* @param execution
*/
private void sanitizeCancelling(PipelineExecution execution) {
private void sanitizeCancellingRunning(PipelineExecution execution) {
// publish event about this ..
eventPublisher.publishEvent(new PipelineSanitized(execution, this));

Expand All @@ -133,7 +148,12 @@ private void sanitizeCancelling(PipelineExecution execution) {
// this means that the execution does not run at all
}
// set canceled state
execution.setStatus(PipelineExecutionStatus.CANCELLED);
if (execution.getStatus() == PipelineExecutionStatus.CANCELLING) {
execution.setStatus(PipelineExecutionStatus.CANCELLED);
} else if (execution.getStatus() == PipelineExecutionStatus.RUNNING) {
execution.setStatus(PipelineExecutionStatus.FAILED);
}

execution.setEnd(now);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineFailedEvent;
import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineFinished;
import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineStarted;
import cz.cuni.mff.xrg.odcs.commons.app.conf.AppConfig;
import cz.cuni.mff.xrg.odcs.commons.app.conf.ConfigProperty;
import cz.cuni.mff.xrg.odcs.commons.app.conf.MissingConfigPropertyException;
import cz.cuni.mff.xrg.odcs.commons.app.execution.log.Log;
import cz.cuni.mff.xrg.odcs.commons.app.facade.LogFacade;
import cz.cuni.mff.xrg.odcs.commons.app.facade.PipelineFacade;
Expand Down Expand Up @@ -125,11 +128,29 @@ public class Executor implements Runnable {
*/
private Date lastSuccessfulExTime;

@Autowired
private AppConfig appConfig;

/**
* Backend identifier
*/
private String backendID;

private boolean clusterMode = false;

/**
* Sort pre/post executors.
*/
@PostConstruct
public void init() {
try {
this.clusterMode = this.appConfig.getBoolean(ConfigProperty.BACKEND_CLUSTER_MODE);
} catch (MissingConfigPropertyException e) {
LOG.info("Running in single mode because cluster mode property is missing in config.properties, {}", e.getLocalizedMessage());
}
if (this.clusterMode) {
this.backendID = this.appConfig.getString(ConfigProperty.BACKEND_ID);
}
if (preExecutors != null) {
Collections.sort(preExecutors,
AnnotationAwareOrderComparator.INSTANCE);
Expand All @@ -155,6 +176,9 @@ public void bind(PipelineExecution execution) {
// update state and set start time
this.execution.setStart(new Date());
this.execution.setStatus(PipelineExecutionStatus.RUNNING);
if (this.clusterMode) {
this.execution.setBackendId(this.backendID);
}

try {
pipelineFacade.save(this.execution);
Expand Down
Loading

0 comments on commit c5820f5

Please sign in to comment.