Skip to content

Commit

Permalink
AMQ-7397 Add message persistence tests for dead letter strategy.
Browse files Browse the repository at this point in the history
  • Loading branch information
Patrick Deasy committed Dec 3, 2024
1 parent 862acda commit 36ba737
Showing 1 changed file with 163 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package org.apache.activemq.broker.policy;

import jakarta.jms.*;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.*;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeadLetterPersistenceTest extends DeadLetterTest {
private static final Logger LOG = LoggerFactory.getLogger(DiscardingDeadLetterPolicyTest.class);
private static final String CLIENT_ID = "clientID";
private static final String NON_PERSISTENT_DEST = "nonPersistentDest";
private static final String PRESERVE_DELIVERY_DEST = "preserveDeliveryDest";

@Override
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();

PolicyEntry policy = new PolicyEntry();
IndividualDeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
strategy.setProcessNonPersistent(true);
strategy.setDestinationPerDurableSubscriber(true);
policy.setDeadLetterStrategy(strategy);

PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);

SharedDeadLetterStrategy processNonPersistent = new SharedDeadLetterStrategy();
processNonPersistent.setDeadLetterQueue(new ActiveMQQueue("DLQ." + NON_PERSISTENT_DEST));
processNonPersistent.setProcessNonPersistent(true);
PolicyEntry processNonPersistentDlqPolicy = new PolicyEntry();
processNonPersistentDlqPolicy.setDeadLetterStrategy(processNonPersistent);

pMap.put(new ActiveMQQueue(NON_PERSISTENT_DEST), processNonPersistentDlqPolicy);

SharedDeadLetterStrategy processPreserveDelivery = new SharedDeadLetterStrategy();
processPreserveDelivery.setDeadLetterQueue(new ActiveMQQueue("DLQ." + PRESERVE_DELIVERY_DEST));
processPreserveDelivery.setProcessNonPersistent(true);
processPreserveDelivery.setPreserveDeliveryMode(true);
PolicyEntry processPreserveDeliveryDlqPolicy = new PolicyEntry();
processPreserveDeliveryDlqPolicy.setDeadLetterStrategy(processPreserveDelivery);

pMap.put(new ActiveMQQueue(PRESERVE_DELIVERY_DEST), processPreserveDeliveryDlqPolicy);

broker.setDestinationPolicy(pMap);

return broker;
}

@Override
protected String createClientId() {
return CLIENT_ID;
}

@Override
protected Destination createDlqDestination() {
String prefix = topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue.";
String destinationName = prefix + getClass().getName() + "." + getName();
if (durableSubscriber) {
String subName = // connectionId:SubName
CLIENT_ID + ":" + getDestination().toString();
destinationName += "." + subName ;
}
return new ActiveMQQueue(destinationName);
}

@Override
protected void doTest() throws Exception {
validateMessagePersistentSetToTrueWhenProducerIsPersistent();
validateMessagePersistentSetToTrueWhenProducerIsNonPeristent();
validateMessagePersitentNotSetWhenPreserveDeliveryModeIsTrue();
}

public void validateMessagePersistentSetToTrueWhenProducerIsPersistent() throws Exception {
messageCount = 1;
connection.start();

ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;

makeConsumer();
makeDlqConsumer();
sendMessages();

for (int i = 0; i < messageCount; i++) {
consumeAndRollback(i);
}

for (int i = 0; i < messageCount; i++) {
Message msg = dlqConsumer.receive(1000);
assertNotNull("Should be a DLQ message for loop: " + i, msg);
org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg;
assertTrue(commandMsg.isPersistent());
}

session.commit();
}

public void validateMessagePersistentSetToTrueWhenProducerIsNonPeristent() throws Exception {
messageCount = 1;
destination = new ActiveMQQueue(NON_PERSISTENT_DEST);
durableSubscriber = false;
deliveryMode = DeliveryMode.NON_PERSISTENT;
connection.start();

ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;

makeConsumer();
makeDlqConsumer();
sendMessages();

for (int i = 0; i < messageCount; i++) {
consumeAndRollback(i);
}

dlqDestination = new ActiveMQQueue("DLQ." + NON_PERSISTENT_DEST);
dlqConsumer = session.createConsumer(dlqDestination);

for (int i = 0; i < messageCount; i++) {
Message msg = dlqConsumer.receive(1000);
assertNotNull("Should be a DLQ message for loop: " + i, msg);
assertEquals("NON_PERSISTENT", msg.getStringProperty("originalDeliveryMode"));
org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg;
assertTrue(commandMsg.isPersistent());
}

session.commit();
}

public void validateMessagePersitentNotSetWhenPreserveDeliveryModeIsTrue() throws Exception {
messageCount = 1;
destination = new ActiveMQQueue(PRESERVE_DELIVERY_DEST);
durableSubscriber = false;
deliveryMode = DeliveryMode.NON_PERSISTENT;
connection.start();

ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;

makeConsumer();
makeDlqConsumer();
sendMessages();

for (int i = 0; i < messageCount; i++) {
consumeAndRollback(i);
}

dlqDestination = new ActiveMQQueue("DLQ." + PRESERVE_DELIVERY_DEST);
dlqConsumer = session.createConsumer(dlqDestination);

for (int i = 0; i < messageCount; i++) {
Message msg = dlqConsumer.receive(1000);
assertNotNull("Should be a DLQ message for loop: " + i, msg);
org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg;
assertFalse(commandMsg.isPersistent());
}

session.commit();
}
}

0 comments on commit 36ba737

Please sign in to comment.