Skip to content

Commit

Permalink
AMQ-7397 Add preserveDeliveryMode to deadLetterStrategy.
Browse files Browse the repository at this point in the history
AMQ-7397 Move Message preparation back to RegionBroker.

AMQ-7397 Add message persistence tests for dead letter strategy.
  • Loading branch information
Patrick Deasy committed Dec 4, 2024
1 parent 3400983 commit 92ac5e2
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -781,23 +781,8 @@ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference
return false;
}

// message may be inflight to other subscriptions so do not modify
message = message.copy();
long dlqExpiration = deadLetterStrategy.getExpiration();
if (dlqExpiration > 0) {
dlqExpiration += System.currentTimeMillis();
} else {
stampAsExpired(message);
}
message.setExpiration(dlqExpiration);
if (!message.isPersistent()) {
message.setPersistent(true);
message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
}
if (poisonCause != null) {
message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
poisonCause.toString());
}
message = prepareMessageForDeadLetterQueue(message, deadLetterStrategy, poisonCause);

// The original destination and transaction id do
// not get filled when the message is first sent,
// it is only populated if the message is routed to
Expand All @@ -822,6 +807,28 @@ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference
return false;
}

private Message prepareMessageForDeadLetterQueue(Message message, DeadLetterStrategy deadLetterStrategy, Throwable poisonCause) throws IOException {
// message may be inflight to other subscriptions so do not modify
message = message.copy();
long dlqExpiration = deadLetterStrategy.getExpiration();
if (dlqExpiration > 0) {
dlqExpiration += System.currentTimeMillis();
} else {
stampAsExpired(message);
}
message.setExpiration(dlqExpiration);
if (!message.isPersistent() && !deadLetterStrategy.isPreserveDeliveryMode()) {
message.setPersistent(true);
message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
}
if (poisonCause != null) {
message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
poisonCause.toString());
}

return message;
}

@Override
public Broker getRoot() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package org.apache.activemq.broker.region.policy;

import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* A strategy for choosing which destination is used for dead letter queue
* messages.
Expand All @@ -30,6 +33,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
private static final Logger LOG = LoggerFactory.getLogger(AbstractDeadLetterStrategy.class);
private boolean processNonPersistent = false;
private boolean processExpired = true;
private boolean preserveDeliveryMode = false;
private boolean enableAudit = true;
private long expiration;

Expand Down Expand Up @@ -91,6 +95,14 @@ public void setProcessNonPersistent(boolean processNonPersistent) {
this.processNonPersistent = processNonPersistent;
}

@Override
public boolean isPreserveDeliveryMode() { return this.preserveDeliveryMode; }

@Override
public void setPreserveDeliveryMode(boolean preserveDeliveryMode) {
this.preserveDeliveryMode = preserveDeliveryMode;
}

public boolean isEnableAudit() {
return enableAudit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ public interface DeadLetterStrategy {
*/
public void setProcessNonPersistent(boolean processNonPersistent);

/**
* @return the PreserveDeliveryMode
*/
public boolean isPreserveDeliveryMode();

/**
* @param PreserveDeliveryMode the PreserveDeliveryMode to set
*/
public void setPreserveDeliveryMode(boolean PreserveDeliveryMode);

/**
* Allows for a Message that was already processed by a DLQ to be rolled back in case
* of a move or a retry of that message, otherwise the Message would be considered a
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.policy;

import jakarta.jms.*;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.*;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeadLetterPersistenceTest extends DeadLetterTest {
private static final Logger LOG = LoggerFactory.getLogger(DiscardingDeadLetterPolicyTest.class);
private static final String CLIENT_ID = "clientID";
private static final String NON_PERSISTENT_DEST = "nonPersistentDest";
private static final String PRESERVE_DELIVERY_DEST = "preserveDeliveryDest";

@Override
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();

PolicyEntry policy = new PolicyEntry();
IndividualDeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
strategy.setProcessNonPersistent(true);
strategy.setDestinationPerDurableSubscriber(true);
policy.setDeadLetterStrategy(strategy);

PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);

SharedDeadLetterStrategy processNonPersistent = new SharedDeadLetterStrategy();
processNonPersistent.setDeadLetterQueue(new ActiveMQQueue("DLQ." + NON_PERSISTENT_DEST));
processNonPersistent.setProcessNonPersistent(true);
PolicyEntry processNonPersistentDlqPolicy = new PolicyEntry();
processNonPersistentDlqPolicy.setDeadLetterStrategy(processNonPersistent);

pMap.put(new ActiveMQQueue(NON_PERSISTENT_DEST), processNonPersistentDlqPolicy);

SharedDeadLetterStrategy processPreserveDelivery = new SharedDeadLetterStrategy();
processPreserveDelivery.setDeadLetterQueue(new ActiveMQQueue("DLQ." + PRESERVE_DELIVERY_DEST));
processPreserveDelivery.setProcessNonPersistent(true);
processPreserveDelivery.setPreserveDeliveryMode(true);
PolicyEntry processPreserveDeliveryDlqPolicy = new PolicyEntry();
processPreserveDeliveryDlqPolicy.setDeadLetterStrategy(processPreserveDelivery);

pMap.put(new ActiveMQQueue(PRESERVE_DELIVERY_DEST), processPreserveDeliveryDlqPolicy);

broker.setDestinationPolicy(pMap);

return broker;
}

@Override
protected String createClientId() {
return CLIENT_ID;
}

@Override
protected Destination createDlqDestination() {
String prefix = topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue.";
String destinationName = prefix + getClass().getName() + "." + getName();
if (durableSubscriber) {
String subName = // connectionId:SubName
CLIENT_ID + ":" + getDestination().toString();
destinationName += "." + subName ;
}
return new ActiveMQQueue(destinationName);
}

@Override
protected void doTest() throws Exception {
validateMessagePersistentSetToTrueWhenProducerIsPersistent();
validateMessagePersistentSetToTrueWhenProducerIsNonPeristent();
validateMessagePersitentNotSetWhenPreserveDeliveryModeIsTrue();
}

public void validateMessagePersistentSetToTrueWhenProducerIsPersistent() throws Exception {
messageCount = 1;
connection.start();

ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;

makeConsumer();
makeDlqConsumer();
sendMessages();

for (int i = 0; i < messageCount; i++) {
consumeAndRollback(i);
}

for (int i = 0; i < messageCount; i++) {
Message msg = dlqConsumer.receive(1000);
assertNotNull("Should be a DLQ message for loop: " + i, msg);
org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg;
assertTrue(commandMsg.isPersistent());
}

session.commit();
}

public void validateMessagePersistentSetToTrueWhenProducerIsNonPeristent() throws Exception {
messageCount = 1;
destination = new ActiveMQQueue(NON_PERSISTENT_DEST);
durableSubscriber = false;
deliveryMode = DeliveryMode.NON_PERSISTENT;
connection.start();

ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;

makeConsumer();
makeDlqConsumer();
sendMessages();

for (int i = 0; i < messageCount; i++) {
consumeAndRollback(i);
}

dlqDestination = new ActiveMQQueue("DLQ." + NON_PERSISTENT_DEST);
dlqConsumer = session.createConsumer(dlqDestination);

for (int i = 0; i < messageCount; i++) {
Message msg = dlqConsumer.receive(1000);
assertNotNull("Should be a DLQ message for loop: " + i, msg);
assertEquals("NON_PERSISTENT", msg.getStringProperty("originalDeliveryMode"));
org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg;
assertTrue(commandMsg.isPersistent());
}

session.commit();
}

public void validateMessagePersitentNotSetWhenPreserveDeliveryModeIsTrue() throws Exception {
messageCount = 1;
destination = new ActiveMQQueue(PRESERVE_DELIVERY_DEST);
durableSubscriber = false;
deliveryMode = DeliveryMode.NON_PERSISTENT;
connection.start();

ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;

makeConsumer();
makeDlqConsumer();
sendMessages();

for (int i = 0; i < messageCount; i++) {
consumeAndRollback(i);
}

dlqDestination = new ActiveMQQueue("DLQ." + PRESERVE_DELIVERY_DEST);
dlqConsumer = session.createConsumer(dlqDestination);

for (int i = 0; i < messageCount; i++) {
Message msg = dlqConsumer.receive(1000);
assertNotNull("Should be a DLQ message for loop: " + i, msg);
org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg;
assertFalse(commandMsg.isPersistent());
}

session.commit();
}
}

0 comments on commit 92ac5e2

Please sign in to comment.