From 4d1160ec9a1c90d6e41ecdef1a21d43caef56d18 Mon Sep 17 00:00:00 2001 From: Patrick Deasy Date: Fri, 15 Nov 2024 10:15:40 -0800 Subject: [PATCH 1/3] AMQ-7397 Add preserveDeliveryMode to deadLetterStrategy. AMQ-7397 Move Message preparation back to RegionBroker. AMQ-7397 Add message persistence tests for dead letter strategy. --- .../activemq/broker/region/RegionBroker.java | 41 ++-- .../policy/AbstractDeadLetterStrategy.java | 12 ++ .../region/policy/DeadLetterStrategy.java | 10 + .../policy/DeadLetterPersistenceTest.java | 179 ++++++++++++++++++ 4 files changed, 225 insertions(+), 17 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterPersistenceTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 844fa120299..61e4da05332 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -781,23 +781,8 @@ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference return false; } - // message may be inflight to other subscriptions so do not modify - message = message.copy(); - long dlqExpiration = deadLetterStrategy.getExpiration(); - if (dlqExpiration > 0) { - dlqExpiration += System.currentTimeMillis(); - } else { - stampAsExpired(message); - } - message.setExpiration(dlqExpiration); - if (!message.isPersistent()) { - message.setPersistent(true); - message.setProperty("originalDeliveryMode", "NON_PERSISTENT"); - } - if (poisonCause != null) { - message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, - poisonCause.toString()); - } + message = prepareMessageForDeadLetterQueue(message, deadLetterStrategy, poisonCause); + // The original destination and transaction id do // not get filled when the message is first sent, // it is only populated if the message is routed to @@ -822,6 +807,28 @@ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference return false; } + private Message prepareMessageForDeadLetterQueue(Message message, DeadLetterStrategy deadLetterStrategy, Throwable poisonCause) throws IOException { + // message may be inflight to other subscriptions so do not modify + message = message.copy(); + long dlqExpiration = deadLetterStrategy.getExpiration(); + if (dlqExpiration > 0) { + dlqExpiration += System.currentTimeMillis(); + } else { + stampAsExpired(message); + } + message.setExpiration(dlqExpiration); + if (!message.isPersistent() && !deadLetterStrategy.isPreserveDeliveryMode()) { + message.setPersistent(true); + message.setProperty("originalDeliveryMode", "NON_PERSISTENT"); + } + if (poisonCause != null) { + message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, + poisonCause.toString()); + } + + return message; + } + @Override public Broker getRoot() { try { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java index 24212068103..6f684abd133 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java @@ -17,10 +17,13 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.ActiveMQMessageAudit; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + /** * A strategy for choosing which destination is used for dead letter queue * messages. @@ -30,6 +33,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { private static final Logger LOG = LoggerFactory.getLogger(AbstractDeadLetterStrategy.class); private boolean processNonPersistent = false; private boolean processExpired = true; + private boolean preserveDeliveryMode = false; private boolean enableAudit = true; private long expiration; @@ -91,6 +95,14 @@ public void setProcessNonPersistent(boolean processNonPersistent) { this.processNonPersistent = processNonPersistent; } + @Override + public boolean isPreserveDeliveryMode() { return this.preserveDeliveryMode; } + + @Override + public void setPreserveDeliveryMode(boolean preserveDeliveryMode) { + this.preserveDeliveryMode = preserveDeliveryMode; + } + public boolean isEnableAudit() { return enableAudit; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java index fdd11740db0..5e73d27c386 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java @@ -60,6 +60,16 @@ public interface DeadLetterStrategy { */ public void setProcessNonPersistent(boolean processNonPersistent); + /** + * @return the preserveDeliveryMode + */ + public boolean isPreserveDeliveryMode(); + + /** + * @param preserveDeliveryMode the preserveDeliveryMode to set + */ + public void setPreserveDeliveryMode(boolean preserveDeliveryMode); + /** * Allows for a Message that was already processed by a DLQ to be rolled back in case * of a move or a retry of that message, otherwise the Message would be considered a diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterPersistenceTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterPersistenceTest.java new file mode 100644 index 00000000000..d93793f6da2 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/DeadLetterPersistenceTest.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.policy; + +import jakarta.jms.*; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.*; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DeadLetterPersistenceTest extends DeadLetterTest { + private static final Logger LOG = LoggerFactory.getLogger(DiscardingDeadLetterPolicyTest.class); + private static final String CLIENT_ID = "clientID"; + private static final String NON_PERSISTENT_DEST = "nonPersistentDest"; + private static final String PRESERVE_DELIVERY_DEST = "preserveDeliveryDest"; + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + + PolicyEntry policy = new PolicyEntry(); + IndividualDeadLetterStrategy strategy = new IndividualDeadLetterStrategy(); + strategy.setProcessNonPersistent(true); + strategy.setDestinationPerDurableSubscriber(true); + policy.setDeadLetterStrategy(strategy); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + + SharedDeadLetterStrategy processNonPersistent = new SharedDeadLetterStrategy(); + processNonPersistent.setDeadLetterQueue(new ActiveMQQueue("DLQ." + NON_PERSISTENT_DEST)); + processNonPersistent.setProcessNonPersistent(true); + PolicyEntry processNonPersistentDlqPolicy = new PolicyEntry(); + processNonPersistentDlqPolicy.setDeadLetterStrategy(processNonPersistent); + + pMap.put(new ActiveMQQueue(NON_PERSISTENT_DEST), processNonPersistentDlqPolicy); + + SharedDeadLetterStrategy processPreserveDelivery = new SharedDeadLetterStrategy(); + processPreserveDelivery.setDeadLetterQueue(new ActiveMQQueue("DLQ." + PRESERVE_DELIVERY_DEST)); + processPreserveDelivery.setProcessNonPersistent(true); + processPreserveDelivery.setPreserveDeliveryMode(true); + PolicyEntry processPreserveDeliveryDlqPolicy = new PolicyEntry(); + processPreserveDeliveryDlqPolicy.setDeadLetterStrategy(processPreserveDelivery); + + pMap.put(new ActiveMQQueue(PRESERVE_DELIVERY_DEST), processPreserveDeliveryDlqPolicy); + + broker.setDestinationPolicy(pMap); + + return broker; + } + + @Override + protected String createClientId() { + return CLIENT_ID; + } + + @Override + protected Destination createDlqDestination() { + String prefix = topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue."; + String destinationName = prefix + getClass().getName() + "." + getName(); + if (durableSubscriber) { + String subName = // connectionId:SubName + CLIENT_ID + ":" + getDestination().toString(); + destinationName += "." + subName ; + } + return new ActiveMQQueue(destinationName); + } + + @Override + protected void doTest() throws Exception { + validateMessagePersistentSetToTrueWhenProducerIsPersistent(); + validateMessagePersistentSetToTrueWhenProducerIsNonPeristent(); + validateMessagePersitentNotSetWhenPreserveDeliveryModeIsTrue(); + } + + public void validateMessagePersistentSetToTrueWhenProducerIsPersistent() throws Exception { + messageCount = 1; + connection.start(); + + ActiveMQConnection amqConnection = (ActiveMQConnection) connection; + rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1; + + makeConsumer(); + makeDlqConsumer(); + sendMessages(); + + for (int i = 0; i < messageCount; i++) { + consumeAndRollback(i); + } + + for (int i = 0; i < messageCount; i++) { + Message msg = dlqConsumer.receive(1000); + assertNotNull("Should be a DLQ message for loop: " + i, msg); + org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg; + assertTrue(commandMsg.isPersistent()); + } + + session.commit(); + } + + public void validateMessagePersistentSetToTrueWhenProducerIsNonPeristent() throws Exception { + messageCount = 1; + destination = new ActiveMQQueue(NON_PERSISTENT_DEST); + durableSubscriber = false; + deliveryMode = DeliveryMode.NON_PERSISTENT; + connection.start(); + + ActiveMQConnection amqConnection = (ActiveMQConnection) connection; + rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1; + + makeConsumer(); + makeDlqConsumer(); + sendMessages(); + + for (int i = 0; i < messageCount; i++) { + consumeAndRollback(i); + } + + dlqDestination = new ActiveMQQueue("DLQ." + NON_PERSISTENT_DEST); + dlqConsumer = session.createConsumer(dlqDestination); + + for (int i = 0; i < messageCount; i++) { + Message msg = dlqConsumer.receive(1000); + assertNotNull("Should be a DLQ message for loop: " + i, msg); + assertEquals("NON_PERSISTENT", msg.getStringProperty("originalDeliveryMode")); + org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg; + assertTrue(commandMsg.isPersistent()); + } + + session.commit(); + } + + public void validateMessagePersitentNotSetWhenPreserveDeliveryModeIsTrue() throws Exception { + messageCount = 1; + destination = new ActiveMQQueue(PRESERVE_DELIVERY_DEST); + durableSubscriber = false; + deliveryMode = DeliveryMode.NON_PERSISTENT; + connection.start(); + + ActiveMQConnection amqConnection = (ActiveMQConnection) connection; + rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1; + + makeConsumer(); + makeDlqConsumer(); + sendMessages(); + + for (int i = 0; i < messageCount; i++) { + consumeAndRollback(i); + } + + dlqDestination = new ActiveMQQueue("DLQ." + PRESERVE_DELIVERY_DEST); + dlqConsumer = session.createConsumer(dlqDestination); + + for (int i = 0; i < messageCount; i++) { + Message msg = dlqConsumer.receive(1000); + assertNotNull("Should be a DLQ message for loop: " + i, msg); + org.apache.activemq.command.Message commandMsg = (org.apache.activemq.command.Message ) msg; + assertFalse(commandMsg.isPersistent()); + } + + session.commit(); + } +} From 691b3f75d2473d9c65fe8c43c3348494c640d4b1 Mon Sep 17 00:00:00 2001 From: Patrick Deasy Date: Thu, 5 Dec 2024 13:53:11 -0800 Subject: [PATCH 2/3] AMQ-7397 Ensure format of isPreserveDeliveryMode is consistent. --- .../broker/region/policy/AbstractDeadLetterStrategy.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java index 6f684abd133..6a207ad862a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java @@ -96,7 +96,9 @@ public void setProcessNonPersistent(boolean processNonPersistent) { } @Override - public boolean isPreserveDeliveryMode() { return this.preserveDeliveryMode; } + public boolean isPreserveDeliveryMode() { + return this.preserveDeliveryMode; + } @Override public void setPreserveDeliveryMode(boolean preserveDeliveryMode) { From c75bef7942c1b17fdbe076938f2d82ce0e7ec441 Mon Sep 17 00:00:00 2001 From: Patrick Deasy Date: Fri, 6 Dec 2024 09:55:14 -0800 Subject: [PATCH 3/3] AMQ-7397 removed prepareMessageForDeadLetterQueue method. --- .../activemq/broker/region/RegionBroker.java | 40 ++++++++----------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 61e4da05332..d5f22087ac2 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -781,7 +781,23 @@ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference return false; } - message = prepareMessageForDeadLetterQueue(message, deadLetterStrategy, poisonCause); + // message may be inflight to other subscriptions so do not modify + message = message.copy(); + long dlqExpiration = deadLetterStrategy.getExpiration(); + if (dlqExpiration > 0) { + dlqExpiration += System.currentTimeMillis(); + } else { + stampAsExpired(message); + } + message.setExpiration(dlqExpiration); + if (!message.isPersistent() && !deadLetterStrategy.isPreserveDeliveryMode()) { + message.setPersistent(true); + message.setProperty("originalDeliveryMode", "NON_PERSISTENT"); + } + if (poisonCause != null) { + message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, + poisonCause.toString()); + } // The original destination and transaction id do // not get filled when the message is first sent, @@ -807,28 +823,6 @@ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference return false; } - private Message prepareMessageForDeadLetterQueue(Message message, DeadLetterStrategy deadLetterStrategy, Throwable poisonCause) throws IOException { - // message may be inflight to other subscriptions so do not modify - message = message.copy(); - long dlqExpiration = deadLetterStrategy.getExpiration(); - if (dlqExpiration > 0) { - dlqExpiration += System.currentTimeMillis(); - } else { - stampAsExpired(message); - } - message.setExpiration(dlqExpiration); - if (!message.isPersistent() && !deadLetterStrategy.isPreserveDeliveryMode()) { - message.setPersistent(true); - message.setProperty("originalDeliveryMode", "NON_PERSISTENT"); - } - if (poisonCause != null) { - message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, - poisonCause.toString()); - } - - return message; - } - @Override public Broker getRoot() { try {