Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backend active/active cluster #482

Merged
merged 31 commits into from
Aug 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
606aea2
Preparation for backend cluster - active / passive mode; Added new ta…
eea03 Jul 24, 2015
bd1539b
Cluster changes implementation; Changed sanitizing of pipeline execut…
eea03 Jul 27, 2015
f33237b
Backend cluster implementation continued; Fixes of initial startup of…
eea03 Jul 28, 2015
79fe3c1
Merge branch 'develop' into feature/backendCluster
eea03 Jul 28, 2015
a33e116
Backend cluster implementation; Frontend no longer communicates with …
eea03 Jul 29, 2015
43c7fcf
Bugfix: Backend offline warning shown when none of backends are online
eea03 Jul 29, 2015
11814e6
Merge pull request #483 from UnifiedViews/feature/409_dpu_template_re…
eea02 Jul 30, 2015
2cfa4b1
Added MySQL SQL scripts and Postgres update script; Fixed Postgres in…
eea03 Jul 30, 2015
3bfba6f
First changes towards active/active backend cluster
eea03 Jul 31, 2015
12fd528
Active / active backend UV cluster implementation; Fixes, documentati…
eea03 Aug 3, 2015
73c28fc
Merge branch 'develop' into feature/backendCluster
eea03 Aug 4, 2015
44143e9
UV Backend active/active cluster; Cluster is now optional and must be…
eea03 Aug 4, 2015
37de9ad
Fix of non-cluster backend executor - backend ID is not set for execu…
eea03 Aug 4, 2015
259eb1d
Reverted changes in backend test configuration for cluster unit tests
eea03 Aug 4, 2015
a44dd04
UV backend cluster enhancement; Backend allocates only such count of …
eea03 Aug 5, 2015
43eba34
Merge branch 'develop' into feature/backendCluster
eea03 Aug 6, 2015
9711181
Revert "Merge pull request #483 from UnifiedViews/feature/409_dpu_tem…
Aug 6, 2015
26e0116
Merge branch 'develop' into feature/backendCluster
eea03 Aug 10, 2015
79afa26
Merge branch 'develop' into feature/backendCluster
eea03 Aug 12, 2015
0f615ee
Merge branch 'develop' into feature/backendCluster
eea03 Aug 17, 2015
df0db38
Merge branch 'release/UV_Core_v2.1.3'
peterklimo Aug 18, 2015
501fa12
Updated update script to drop execution view before recreating
tomas-knap Aug 18, 2015
f9638ae
Updated comment of the public method
tomas-knap Aug 18, 2015
2d940e2
Adding logging of situation when config.properties param for cluster …
tomas-knap Aug 18, 2015
3387dfa
Updated comment - returned value
tomas-knap Aug 18, 2015
a189567
Merge branch 'develop' into feature/backendCluster
eea03 Aug 18, 2015
b5470db
Default limit for backend active status decreased to 10 seconds (shou…
eea03 Aug 18, 2015
3a73419
Merge branch 'develop', remote-tracking branch 'origin' into feature/…
tomas-knap Aug 18, 2015
fb6f469
Merge branch 'develop' into feature/backendCluster
tomas-knap Aug 18, 2015
bfd010d
Added license header for new files
tomas-knap Aug 18, 2015
3b63d45
Bugfix: adjusted allocation SQL query to work for both PostgreSQL and…
eea03 Aug 19, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions backend/conf/config.sample.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ general.workingdir = {full path to "odcs" (home) dir of the project}/backend/wor
backend.host = 127.0.0.1
backend.port = 5010

# Backend(s) run in cluster mode - developed and tested only for PostgreSQL
# With this property enabled, 2 or even N backends can work with one database
# All backends work simultaneously, they can process pipeline schedules and executions
# If backend cluster enabled, backend ID must be set - mandatory parameter
# backend.cluster.mode = false
# backend.id = BackendServer1

# If backend should restart running executions on startup
# By default running executions are restarted, if false, executions are failed by backend at startup
# backend.startup.restart.running = true

# Connection configuration setting for relational database
# for mysql {
database.sql.driver = com.mysql.jdbc.Driver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
import cz.cuni.mff.xrg.odcs.commons.app.ScheduledJobsPriority;
import cz.cuni.mff.xrg.odcs.commons.app.conf.AppConfig;
import cz.cuni.mff.xrg.odcs.commons.app.conf.ConfigProperty;
import cz.cuni.mff.xrg.odcs.commons.app.conf.MissingConfigPropertyException;
import cz.cuni.mff.xrg.odcs.commons.app.execution.log.Log;
import cz.cuni.mff.xrg.odcs.commons.app.facade.ExecutionFacade;
import cz.cuni.mff.xrg.odcs.commons.app.facade.PipelineFacade;
import cz.cuni.mff.xrg.odcs.commons.app.facade.RuntimePropertiesFacade;
import cz.cuni.mff.xrg.odcs.commons.app.pipeline.PipelineExecution;
Expand All @@ -56,10 +58,15 @@
public class Engine implements ApplicationListener<ApplicationEvent> {

private static final Logger LOG = LoggerFactory.getLogger(Engine.class);

private static final Integer DEFAULT_LIMIT_SHEDULED_PPL = 2;

public Integer numberOfRunningJobs = 0;

private final Object LockRunningJobs = new Object();

private boolean clusterMode = false;

/**
* Publisher instance.
*/
Expand All @@ -83,13 +90,16 @@ public class Engine implements ApplicationListener<ApplicationEvent> {
*/
@Autowired
protected PipelineFacade pipelineFacade;

/**
* Runtime properties facade.
*/
@Autowired
protected RuntimePropertiesFacade runtimePropertiesFacade;

@Autowired
protected ExecutionFacade executionFacade;

/**
* Thread pool.
*/
Expand All @@ -105,19 +115,34 @@ public class Engine implements ApplicationListener<ApplicationEvent> {
*/
protected Boolean startUpDone;

/**
* Backend identifier
*/
protected String backendID;

@PostConstruct
private void propertySetter() {
this.executorService = Executors.newCachedThreadPool();
this.startUpDone = false;

workingDirectory = new File(
appConfig.getString(ConfigProperty.GENERAL_WORKINGDIR));
// limitOfScheduledPipelines = appConfig.getInteger(ConfigProperty.BACKEND_LIMIT_OF_SCHEDULED_PIPELINES);
// limitOfScheduledPipelines = appConfig.getInteger(ConfigProperty.BACKEND_LIMIT_OF_SCHEDULED_PIPELINES);
LOG.info("Working dir: {}", workingDirectory.toString());
// make sure that our working directory exist
if (workingDirectory.isDirectory()) {
workingDirectory.mkdirs();
}

try {
this.clusterMode = this.appConfig.getBoolean(ConfigProperty.BACKEND_CLUSTER_MODE);
} catch (MissingConfigPropertyException e) {
LOG.info("Running in single mode because cluster mode property is missing in config.properties, {}", e.getLocalizedMessage());
}
if (this.clusterMode) {
this.backendID = this.appConfig.getString(ConfigProperty.BACKEND_ID);
LOG.info("Backend ID: {}", this.backendID);
}
}

/**
Expand All @@ -135,11 +160,11 @@ protected void run(PipelineExecution execution) {

/**
* Check database for new task (PipelineExecutions to run). Can run
* concurrently. Check database every 20 seconds.
* concurrently. Check database every 2 seconds.
*/

@Async
@Scheduled(fixedDelay = 20000)
@Scheduled(fixedDelay = 2000)
protected void checkJobs() {
synchronized (LockRunningJobs) {
LOG.debug(">>> Entering checkJobs()");
Expand All @@ -152,21 +177,35 @@ protected void checkJobs() {

Integer limitOfScheduledPipelines = getLimitOfScheduledPipelines();
LOG.debug("limit of scheduled pipelines: " + limitOfScheduledPipelines);

List<PipelineExecution> jobs = pipelineFacade.getAllExecutionsByPriorityLimited(PipelineExecutionStatus.QUEUED);
// run pipeline executions ..
for (PipelineExecution job : jobs) {
if (job.getOrderNumber() == ScheduledJobsPriority.IGNORE.getValue()) {
run(job);
numberOfRunningJobs++;
continue;
LOG.debug("Number of running jobs: {}", this.numberOfRunningJobs);

List<PipelineExecution> jobs = null;
if (this.clusterMode) {
// Update backend activity timestamp in DB
this.executionFacade.updateBackendTimestamp(this.backendID);
int limit = limitOfScheduledPipelines - this.numberOfRunningJobs;
if (limit < 0) {
limit = 0;
}
long countOfUnallocated = this.executionFacade.getCountOfUnallocatedQueuedExecutionsWithIgnorePriority();
if (limit < countOfUnallocated) {
limit = (int) countOfUnallocated;
}

if (numberOfRunningJobs < limitOfScheduledPipelines) {
int allocated = this.executionFacade.allocateQueuedExecutionsForBackend(this.backendID, limit);
LOG.debug("Allocated {} executions by backend '{}'", allocated, this.backendID);

LOG.debug("Going to find all allocated QUEUED executions");
jobs = this.pipelineFacade.getAllExecutionsByPriorityLimited(PipelineExecutionStatus.QUEUED, this.backendID);
LOG.debug("Found {} executions planned for execution", jobs.size());
} else {
jobs = this.pipelineFacade.getAllExecutionsByPriorityLimited(PipelineExecutionStatus.QUEUED);
}
// run pipeline executions ..
for (PipelineExecution job : jobs) {
if (this.numberOfRunningJobs < limitOfScheduledPipelines || ScheduledJobsPriority.IGNORE.getValue() == job.getOrderNumber()) {
run(job);
numberOfRunningJobs++;
} else {
break;
this.numberOfRunningJobs++;
}
}

Expand All @@ -186,7 +225,7 @@ protected Integer getLimitOfScheduledPipelines() {
return DEFAULT_LIMIT_SHEDULED_PPL;
}
try {
return Integer.parseInt(limit.getValue());
return Integer.parseInt(limit.getValue());
} catch (NumberFormatException e) {
LOG.error("Value not a number of RuntimeProperty: " + ConfigProperty.BACKEND_LIMIT_OF_SCHEDULED_PIPELINES.toString()
+ ", error: " + e.getMessage());
Expand All @@ -210,8 +249,12 @@ protected synchronized void startUp() {
ExecutionSanitizer sanitizer = beanFactory.getBean(ExecutionSanitizer.class);

// list executions
List<PipelineExecution> running = pipelineFacade
.getAllExecutions(PipelineExecutionStatus.RUNNING);
List<PipelineExecution> running = null;
if (this.clusterMode) {
running = this.pipelineFacade.getAllExecutions(PipelineExecutionStatus.RUNNING, this.backendID);
} else {
running = this.pipelineFacade.getAllExecutions(PipelineExecutionStatus.RUNNING);
}
for (PipelineExecution execution : running) {
MDC.put(Log.MDC_EXECUTION_KEY_NAME, execution.getId().toString());
// hanging pipeline ..
Expand All @@ -226,8 +269,12 @@ protected synchronized void startUp() {
MDC.remove(Log.MDC_EXECUTION_KEY_NAME);
}

List<PipelineExecution> cancelling = pipelineFacade
.getAllExecutions(PipelineExecutionStatus.CANCELLING);
List<PipelineExecution> cancelling = null;
if (this.clusterMode) {
cancelling = this.pipelineFacade.getAllExecutions(PipelineExecutionStatus.CANCELLING, this.backendID);
} else {
cancelling = this.pipelineFacade.getAllExecutions(PipelineExecutionStatus.CANCELLING);
}

for (PipelineExecution execution : cancelling) {
MDC.put(Log.MDC_EXECUTION_KEY_NAME, execution.getId().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineRestart;
import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineSanitized;
import cz.cuni.mff.xrg.odcs.commons.app.conf.AppConfig;
import cz.cuni.mff.xrg.odcs.commons.app.conf.ConfigProperty;
import cz.cuni.mff.xrg.odcs.commons.app.conf.MissingConfigPropertyException;
import cz.cuni.mff.xrg.odcs.commons.app.dataunit.DataUnitFactory;
import cz.cuni.mff.xrg.odcs.commons.app.dpu.DPUInstanceRecord;
import cz.cuni.mff.xrg.odcs.commons.app.execution.context.DataUnitInfo;
Expand All @@ -39,8 +42,8 @@
import cz.cuni.mff.xrg.odcs.commons.app.pipeline.PipelineExecutionStatus;
import cz.cuni.mff.xrg.odcs.commons.app.resource.MissingResourceException;
import cz.cuni.mff.xrg.odcs.commons.app.resource.ResourceManager;
import eu.unifiedviews.commons.dataunit.ManagableDataUnit;
import cz.cuni.mff.xrg.odcs.rdf.repositories.GraphUrl;
import eu.unifiedviews.commons.dataunit.ManagableDataUnit;
import eu.unifiedviews.commons.rdf.repository.RDFException;
import eu.unifiedviews.dataunit.DataUnitException;

Expand All @@ -63,6 +66,9 @@ class ExecutionSanitizer {
@Autowired
private ResourceManager resourceManager;

@Autowired
private AppConfig appConfig;

/**
* Fix possible problems with given execution. Logs of this method are
* logged with the execution id of given {@link PipelineExecution} Method does not save the changes into database! So called must secure
Expand All @@ -73,16 +79,25 @@ class ExecutionSanitizer {
public void sanitize(PipelineExecution execution) {
// check for flags
if (execution.getStop()) {
sanitizeCancelling(execution);
sanitizeCancellingRunning(execution);
return;
}

switch (execution.getStatus()) {
case CANCELLING:
sanitizeCancelling(execution);
sanitizeCancellingRunning(execution);
return;
case RUNNING:
sanitizeRunning(execution);
boolean restartRunning = true;
try {
restartRunning = this.appConfig.getBoolean(ConfigProperty.BACKEND_STARTUP_RESTART_RUNNING);
} catch (MissingConfigPropertyException ignore) {
/* ignore exception*/}
if (restartRunning) {
restartRunning(execution);
} else {
sanitizeCancellingRunning(execution);
}
return;
default:
// do nothing with such pipeline ..
Expand All @@ -95,8 +110,8 @@ public void sanitize(PipelineExecution execution) {
*
* @param execution
*/
private void sanitizeRunning(PipelineExecution execution) {
eventPublisher.publishEvent(new PipelineRestart(execution, this));
private void restartRunning(PipelineExecution execution) {
this.eventPublisher.publishEvent(new PipelineRestart(execution, this));

// set state back to scheduled
execution.setStatus(PipelineExecutionStatus.QUEUED);
Expand All @@ -107,7 +122,7 @@ private void sanitizeRunning(PipelineExecution execution) {
*
* @param execution
*/
private void sanitizeCancelling(PipelineExecution execution) {
private void sanitizeCancellingRunning(PipelineExecution execution) {
// publish event about this ..
eventPublisher.publishEvent(new PipelineSanitized(execution, this));

Expand All @@ -133,7 +148,12 @@ private void sanitizeCancelling(PipelineExecution execution) {
// this means that the execution does not run at all
}
// set canceled state
execution.setStatus(PipelineExecutionStatus.CANCELLED);
if (execution.getStatus() == PipelineExecutionStatus.CANCELLING) {
execution.setStatus(PipelineExecutionStatus.CANCELLED);
} else if (execution.getStatus() == PipelineExecutionStatus.RUNNING) {
execution.setStatus(PipelineExecutionStatus.FAILED);
}

execution.setEnd(now);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineFailedEvent;
import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineFinished;
import cz.cuni.mff.xrg.odcs.backend.pipeline.event.PipelineStarted;
import cz.cuni.mff.xrg.odcs.commons.app.conf.AppConfig;
import cz.cuni.mff.xrg.odcs.commons.app.conf.ConfigProperty;
import cz.cuni.mff.xrg.odcs.commons.app.conf.MissingConfigPropertyException;
import cz.cuni.mff.xrg.odcs.commons.app.execution.log.Log;
import cz.cuni.mff.xrg.odcs.commons.app.facade.LogFacade;
import cz.cuni.mff.xrg.odcs.commons.app.facade.PipelineFacade;
Expand Down Expand Up @@ -125,11 +128,29 @@ public class Executor implements Runnable {
*/
private Date lastSuccessfulExTime;

@Autowired
private AppConfig appConfig;

/**
* Backend identifier
*/
private String backendID;

private boolean clusterMode = false;

/**
* Sort pre/post executors.
*/
@PostConstruct
public void init() {
try {
this.clusterMode = this.appConfig.getBoolean(ConfigProperty.BACKEND_CLUSTER_MODE);
} catch (MissingConfigPropertyException e) {
LOG.info("Running in single mode because cluster mode property is missing in config.properties, {}", e.getLocalizedMessage());
}
if (this.clusterMode) {
this.backendID = this.appConfig.getString(ConfigProperty.BACKEND_ID);
}
if (preExecutors != null) {
Collections.sort(preExecutors,
AnnotationAwareOrderComparator.INSTANCE);
Expand All @@ -155,6 +176,9 @@ public void bind(PipelineExecution execution) {
// update state and set start time
this.execution.setStart(new Date());
this.execution.setStatus(PipelineExecutionStatus.RUNNING);
if (this.clusterMode) {
this.execution.setBackendId(this.backendID);
}

try {
pipelineFacade.save(this.execution);
Expand Down
Loading