Skip to content

Commit

Permalink
AMQ-7397 Move Message preparation back to RegionBroker.
Browse files Browse the repository at this point in the history
  • Loading branch information
Patrick Deasy committed Dec 3, 2024
1 parent 1d686f0 commit 862acda
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
* Routes Broker operations to the correct messaging regions for processing.
*/
public class RegionBroker extends EmptyBroker {
public static final String ORIGINAL_EXPIRATION = "originalExpiration";
private static final Logger LOG = LoggerFactory.getLogger(RegionBroker.class);
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();

Expand Down Expand Up @@ -748,6 +749,16 @@ public boolean isExpired(MessageReference messageReference) {
return messageReference.canProcessAsExpired();
}

private boolean stampAsExpired(Message message) throws IOException {
boolean stamped = false;
if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
long expiration = message.getExpiration();
message.setProperty(ORIGINAL_EXPIRATION, expiration);
stamped = true;
}
return stamped;
}

@Override
public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
LOG.debug("Message expired {}", node);
Expand All @@ -770,7 +781,7 @@ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference
return false;
}

message = deadLetterStrategy.prepareMessageForDeadLetterQueue(message, poisonCause);
message = prepareMessageForDeadLetterQueue(message, deadLetterStrategy, poisonCause);

// The original destination and transaction id do
// not get filled when the message is first sent,
Expand All @@ -796,6 +807,28 @@ public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference
return false;
}

private Message prepareMessageForDeadLetterQueue(Message message, DeadLetterStrategy deadLetterStrategy, Throwable poisonCause) throws IOException {
// message may be inflight to other subscriptions so do not modify
message = message.copy();
long dlqExpiration = deadLetterStrategy.getExpiration();
if (dlqExpiration > 0) {
dlqExpiration += System.currentTimeMillis();
} else {
stampAsExpired(message);
}
message.setExpiration(dlqExpiration);
if (!message.isPersistent() && !deadLetterStrategy.isPreserveDeliveryMode()) {
message.setPersistent(true);
message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
}
if (poisonCause != null) {
message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
poisonCause.toString());
}

return message;
}

@Override
public Broker getRoot() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
*
*/
public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
public static final String ORIGINAL_EXPIRATION = "originalExpiration";
private static final Logger LOG = LoggerFactory.getLogger(AbstractDeadLetterStrategy.class);
private boolean processNonPersistent = false;
private boolean processExpired = true;
Expand Down Expand Up @@ -64,39 +63,6 @@ public boolean isSendToDeadLetterQueue(Message message) {
return result;
}

@Override
public Message prepareMessageForDeadLetterQueue(Message message, Throwable poisonCause) throws IOException {
// message may be inflight to other subscriptions so do not modify
message = message.copy();
long dlqExpiration = expiration;
if (dlqExpiration > 0) {
dlqExpiration += System.currentTimeMillis();
} else {
stampAsExpired(message);
}
message.setExpiration(dlqExpiration);
if (!message.isPersistent() && !preserveDeliveryMode) {
message.setPersistent(true);
message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
}
if (poisonCause != null) {
message.setProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY,
poisonCause.toString());
}

return message;
}

private boolean stampAsExpired(Message message) throws IOException {
boolean stamped = false;
if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
long expiration = message.getExpiration();
message.setProperty(ORIGINAL_EXPIRATION, expiration);
stamped = true;
}
return stamped;
}

/**
* @return the processExpired
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,6 @@ public interface DeadLetterStrategy {
*/
boolean isSendToDeadLetterQueue(Message message);

/**
* Allow pluggable strategy for preparing a message for a dead letter queue
* for example, you might not want update the deliveryMode of messages
* @param message
* @param poisonCause
* @return prepared message to be sent to a dead letter queue
*/
Message prepareMessageForDeadLetterQueue(Message message, Throwable poisonCause) throws IOException;

/**
* Returns the dead letter queue for the given message and subscription.
*/
Expand Down

0 comments on commit 862acda

Please sign in to comment.