From 862acda6db93bd2c43bc5da23048681dafbc473a Mon Sep 17 00:00:00 2001 From: Patrick Deasy Date: Tue, 3 Dec 2024 15:40:11 -0800 Subject: [PATCH] AMQ-7397 Move Message preparation back to RegionBroker. --- .../activemq/broker/region/RegionBroker.java | 35 ++++++++++++++++++- .../policy/AbstractDeadLetterStrategy.java | 34 ------------------ .../region/policy/DeadLetterStrategy.java | 9 ----- 3 files changed, 34 insertions(+), 44 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 31d4091c5f..61e4da0533 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -81,6 +81,7 @@ * Routes Broker operations to the correct messaging regions for processing. */ public class RegionBroker extends EmptyBroker { + public static final String ORIGINAL_EXPIRATION = "originalExpiration"; private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class); private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator(); @@ -748,6 +749,16 @@ public boolean isExpired(MessageReference messageReference) { return messageReference.canProcessAsExpired(); } + private boolean stampAsExpired(Message message) throws IOException { + boolean stamped = false; + if (message.getProperty(ORIGINAL_EXPIRATION) == null) { + long expiration = message.getExpiration(); + message.setProperty(ORIGINAL_EXPIRATION, expiration); + stamped = true; + } + return stamped; + } + @Override public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) { LOG.debug("Message expired {}", node); @@ -770,7 +781,7 @@ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference return false; } - message = deadLetterStrategy.prepareMessageForDeadLetterQueue(message, poisonCause); + message = prepareMessageForDeadLetterQueue(message, deadLetterStrategy, poisonCause); // The original destination and transaction id do // not get filled when the message is first sent, @@ -796,6 +807,28 @@ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference return false; } + private Message prepareMessageForDeadLetterQueue(Message message, DeadLetterStrategy deadLetterStrategy, Throwable poisonCause) throws IOException { + // message may be inflight to other subscriptions so do not modify + message = message.copy(); + long dlqExpiration = deadLetterStrategy.getExpiration(); + if (dlqExpiration > 0) { + dlqExpiration += System.currentTimeMillis(); + } else { + stampAsExpired(message); + } + message.setExpiration(dlqExpiration); + if (!message.isPersistent() && !deadLetterStrategy.isPreserveDeliveryMode()) { + message.setPersistent(true); + message.setProperty("originalDeliveryMode", "NON_PERSISTENT"); + } + if (poisonCause != null) { + message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, + poisonCause.toString()); + } + + return message; + } + @Override public Broker getRoot() { try { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java index 9d416696a2..6f684abd13 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java @@ -30,7 +30,6 @@ * */ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy { - public static final String ORIGINAL_EXPIRATION = "originalExpiration"; private static final Logger LOG = LoggerFactory.getLogger(AbstractDeadLetterStrategy.class); private boolean processNonPersistent = false; private boolean processExpired = true; @@ -64,39 +63,6 @@ public boolean isSendToDeadLetterQueue(Message message) { return result; } - @Override - public Message prepareMessageForDeadLetterQueue(Message message, Throwable poisonCause) throws IOException { - // message may be inflight to other subscriptions so do not modify - message = message.copy(); - long dlqExpiration = expiration; - if (dlqExpiration > 0) { - dlqExpiration += System.currentTimeMillis(); - } else { - stampAsExpired(message); - } - message.setExpiration(dlqExpiration); - if (!message.isPersistent() && !preserveDeliveryMode) { - message.setPersistent(true); - message.setProperty("originalDeliveryMode", "NON_PERSISTENT"); - } - if (poisonCause != null) { - message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, - poisonCause.toString()); - } - - return message; - } - - private boolean stampAsExpired(Message message) throws IOException { - boolean stamped = false; - if (message.getProperty(ORIGINAL_EXPIRATION) == null) { - long expiration = message.getExpiration(); - message.setProperty(ORIGINAL_EXPIRATION, expiration); - stamped = true; - } - return stamped; - } - /** * @return the processExpired */ diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java index 0fa03d2da5..67b753bf0c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java @@ -37,15 +37,6 @@ public interface DeadLetterStrategy { */ boolean isSendToDeadLetterQueue(Message message); - /** - * Allow pluggable strategy for preparing a message for a dead letter queue - * for example, you might not want update the deliveryMode of messages - * @param message - * @param poisonCause - * @return prepared message to be sent to a dead letter queue - */ - Message prepareMessageForDeadLetterQueue(Message message, Throwable poisonCause) throws IOException; - /** * Returns the dead letter queue for the given message and subscription. */