diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index b05c3ec66d8..13bd290a527 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -184,6 +184,7 @@ public class BrokerService implements Service { private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges // to other jms messaging systems private boolean deleteAllMessagesOnStartup; + private boolean deleteAllScheduledMessagesOnStartup = false; private boolean advisorySupport = true; private boolean anonymousProducerAdvisorySupport = false; private URI vmConnectorURI; @@ -1630,6 +1631,18 @@ public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStar this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup; } + /** + * Sets whether all scheduled messages are deleted on startup + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor" + */ + public void setDeleteAllScheduledMessagesOnStartup(boolean deleteAllScheduledMessagesOnStartup) { + this.deleteAllScheduledMessagesOnStartup = deleteAllScheduledMessagesOnStartup; + } + + public boolean isDeleteAllScheduledMessagesOnStartup() { + return deleteAllScheduledMessagesOnStartup; + } + public URI getVmConnectorURI() { if (vmConnectorURI == null) { try { @@ -2440,6 +2453,7 @@ protected Broker addInterceptors(Broker broker) throws Exception { if (isSchedulerSupport()) { SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore()); sb.setMaxRepeatAllowed(maxSchedulerRepeatAllowed); + sb.setDeleteAllScheduledMessagesOnStartup(deleteAllScheduledMessagesOnStartup); if (isUseJmx()) { JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); try { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java index 3a778c5ad1c..220ae874fa0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java @@ -72,6 +72,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener { private final JobSchedulerStore store; private JobScheduler scheduler; private int maxRepeatAllowed = MAX_REPEAT_ALLOWED; + private boolean deleteAllScheduledMessagesOnStartup; public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception { super(next); @@ -212,6 +213,9 @@ public synchronized JobScheduler getJobScheduler() throws Exception { public void start() throws Exception { this.started.set(true); getInternalScheduler(); + if (deleteAllScheduledMessagesOnStartup) { + deleteAllScheduledMessages(); + } super.start(); } @@ -364,6 +368,11 @@ private void doSchedule(Message messageSend, Object cronValue, Object periodValu new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay, period, repeat); } + private void deleteAllScheduledMessages() throws Exception { + LOG.info("Deleting all scheduled messages on startup because deleteAllScheduledMessagesOnStartup configuration has been provided"); + getInternalScheduler().removeAllJobs(); + } + @Override public void scheduledJob(String id, ByteSequence job) { org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength()); @@ -487,4 +496,12 @@ public int getMaxRepeatAllowed() { public void setMaxRepeatAllowed(int maxRepeatAllowed) { this.maxRepeatAllowed = maxRepeatAllowed; } + + public boolean getDeleteAllScheduledMessagesOnStartup() { + return deleteAllScheduledMessagesOnStartup; + } + + public void setDeleteAllScheduledMessagesOnStartup(boolean deleteAllSchedulerdMessagesOnStartup) { + this.deleteAllScheduledMessagesOnStartup = deleteAllSchedulerdMessagesOnStartup; + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/config/ConfigTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/config/ConfigTest.java index 2c4e3279068..d0fae18a648 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/config/ConfigTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/config/ConfigTest.java @@ -170,6 +170,7 @@ public void testBrokerConfig() throws Exception { assertEquals("Broker Config Error (persistent)", false, broker.isPersistent()); assertEquals("Broker Config Error (useShutdownHook)", false, broker.isUseShutdownHook()); assertEquals("Broker Config Error (deleteAllMessagesOnStartup)", true, broker.isDeleteAllMessagesOnStartup()); + assertEquals("Broker Config Error (deleteAllScheduledMessagesOnStartup)", true, broker.isDeleteAllScheduledMessagesOnStartup()); LOG.info("Success"); // Check specific vm transport diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/config/example.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/config/example.xml index 0a7a6864357..1903cc79a29 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/config/example.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/config/example.xml @@ -26,7 +26,7 @@ + useShutdownHook="false" deleteAllMessagesOnStartup="true" deleteAllScheduledMessagesOnStartup="true">