Skip to content

Commit

Permalink
Add option for deleting all scheduled messages on startup.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenliao94 committed Nov 26, 2024
1 parent aa842da commit c7c4d1a
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public class BrokerService implements Service {
private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
// to other jms messaging systems
private boolean deleteAllMessagesOnStartup;
private boolean deleteAllScheduledMessagesOnStartup = false;
private boolean advisorySupport = true;
private boolean anonymousProducerAdvisorySupport = false;
private URI vmConnectorURI;
Expand Down Expand Up @@ -1630,6 +1631,18 @@ public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStar
this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
}

/**
* Sets whether all scheduled messages are deleted on startup
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
*/
public void setDeleteAllScheduledMessagesOnStartup(boolean deleteAllScheduledMessagesOnStartup) {
this.deleteAllScheduledMessagesOnStartup = deleteAllScheduledMessagesOnStartup;
}

public boolean isDeleteAllScheduledMessagesOnStartup() {
return deleteAllScheduledMessagesOnStartup;
}

public URI getVmConnectorURI() {
if (vmConnectorURI == null) {
try {
Expand Down Expand Up @@ -2440,6 +2453,7 @@ protected Broker addInterceptors(Broker broker) throws Exception {
if (isSchedulerSupport()) {
SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
sb.setMaxRepeatAllowed(maxSchedulerRepeatAllowed);
sb.setDeleteAllScheduledMessagesOnStartup(deleteAllScheduledMessagesOnStartup);
if (isUseJmx()) {
JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class SchedulerBroker extends BrokerFilter implements JobListener {
private final JobSchedulerStore store;
private JobScheduler scheduler;
private int maxRepeatAllowed = MAX_REPEAT_ALLOWED;
private boolean deleteAllScheduledMessagesOnStartup;

public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store) throws Exception {
super(next);
Expand Down Expand Up @@ -212,6 +213,9 @@ public synchronized JobScheduler getJobScheduler() throws Exception {
public void start() throws Exception {
this.started.set(true);
getInternalScheduler();
if (deleteAllScheduledMessagesOnStartup) {
deleteAllScheduledMessages();
}
super.start();
}

Expand Down Expand Up @@ -364,6 +368,11 @@ private void doSchedule(Message messageSend, Object cronValue, Object periodValu
new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay, period, repeat);
}

private void deleteAllScheduledMessages() throws Exception {
LOG.info("Deleting all scheduled messages on startup because deleteAllScheduledMessagesOnStartup configuration has been provided");
getInternalScheduler().removeAllJobs();
}

@Override
public void scheduledJob(String id, ByteSequence job) {
org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job.getOffset(), job.getLength());
Expand Down Expand Up @@ -487,4 +496,12 @@ public int getMaxRepeatAllowed() {
public void setMaxRepeatAllowed(int maxRepeatAllowed) {
this.maxRepeatAllowed = maxRepeatAllowed;
}

public boolean getDeleteAllScheduledMessagesOnStartup() {
return deleteAllScheduledMessagesOnStartup;
}

public void setDeleteAllScheduledMessagesOnStartup(boolean deleteAllSchedulerdMessagesOnStartup) {
this.deleteAllScheduledMessagesOnStartup = deleteAllSchedulerdMessagesOnStartup;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public void testBrokerConfig() throws Exception {
assertEquals("Broker Config Error (persistent)", false, broker.isPersistent());
assertEquals("Broker Config Error (useShutdownHook)", false, broker.isUseShutdownHook());
assertEquals("Broker Config Error (deleteAllMessagesOnStartup)", true, broker.isDeleteAllMessagesOnStartup());
assertEquals("Broker Config Error (deleteAllScheduledMessagesOnStartup)", true, broker.isDeleteAllScheduledMessagesOnStartup());
LOG.info("Success");

// Check specific vm transport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<amq:broker brokerName="brokerConfigTest" populateJMSXUserID="false"
useLoggingForShutdownErrors="true" useJmx="true"
persistent="false" vmConnectorURI="vm://javacoola"
useShutdownHook="false" deleteAllMessagesOnStartup="true">
useShutdownHook="false" deleteAllMessagesOnStartup="true" deleteAllScheduledMessagesOnStartup="true">

<!--
|| NOTE this config file is used for unit testing the configuration mechanism
Expand Down

0 comments on commit c7c4d1a

Please sign in to comment.