Skip to content

Commit

Permalink
Allow cross receiver message settlement (#316)
Browse files Browse the repository at this point in the history
* Allow cross receiver message settlement

It's not prevented at present, but can cause a memory leak due to
entries never being removed from the unsettledMessages map.
When a received message isn't settled, associate its receiver with the
message. The settlement APIs will direct to the associated receiver.

* mark message as settled when RSM is mode first

set rcv to nil when message has been settled

* add some additional comments

* improve comment
  • Loading branch information
jhendrixMSFT authored Jan 8, 2024
1 parent 1c1e489 commit 76124f7
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 7 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release History

## 1.0.3 (2024-01-09)

### Bugs Fixed

* Fixed an issue that could cause a memory leak when settling messages across `Receiver` instances.

## 1.0.2 (2023-09-05)

### Bugs Fixed
Expand Down
10 changes: 8 additions & 2 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ type Message struct {
// encryption details).
Footer Annotations

deliveryID uint32 // used when sending disposition
settled bool // whether transfer was settled by sender
deliveryID uint32 // used when sending disposition
settled bool // whether transfer was settled by sender
rcv *Receiver // used to settle message on the corresponding Receiver (nil if settled == true)
}

// NewMessage returns a *Message with data as the first payload in the Data field.
Expand Down Expand Up @@ -299,6 +300,11 @@ func (m *Message) Unmarshal(r *buffer.Buffer) error {
return nil
}

func (m *Message) onSettlement() {
m.settled = true
m.rcv = nil
}

/*
<type name="header" class="composite" source="list" provides="section">
<descriptor name="amqp:header:list" code="0x00000000:0x00000070"/>
Expand Down
19 changes: 14 additions & 5 deletions receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (r *Receiver) Receive(ctx context.Context, opts *ReceiveOptions) (*Message,
// If the context's deadline expires or is cancelled before the operation
// completes, the message's disposition is in an unknown state.
func (r *Receiver) AcceptMessage(ctx context.Context, msg *Message) error {
return r.messageDisposition(ctx, msg, &encoding.StateAccepted{})
return msg.rcv.messageDisposition(ctx, msg, &encoding.StateAccepted{})
}

// Reject notifies the server that the message is invalid.
Expand All @@ -155,7 +155,7 @@ func (r *Receiver) AcceptMessage(ctx context.Context, msg *Message) error {
// If the context's deadline expires or is cancelled before the operation
// completes, the message's disposition is in an unknown state.
func (r *Receiver) RejectMessage(ctx context.Context, msg *Message, e *Error) error {
return r.messageDisposition(ctx, msg, &encoding.StateRejected{Error: e})
return msg.rcv.messageDisposition(ctx, msg, &encoding.StateRejected{Error: e})
}

// Release releases the message back to the server. The message may be redelivered to this or another consumer.
Expand All @@ -165,7 +165,7 @@ func (r *Receiver) RejectMessage(ctx context.Context, msg *Message, e *Error) er
// If the context's deadline expires or is cancelled before the operation
// completes, the message's disposition is in an unknown state.
func (r *Receiver) ReleaseMessage(ctx context.Context, msg *Message) error {
return r.messageDisposition(ctx, msg, &encoding.StateReleased{})
return msg.rcv.messageDisposition(ctx, msg, &encoding.StateReleased{})
}

// Modify notifies the server that the message was not acted upon and should be modifed.
Expand All @@ -179,7 +179,7 @@ func (r *Receiver) ModifyMessage(ctx context.Context, msg *Message, options *Mod
if options == nil {
options = &ModifyMessageOptions{}
}
return r.messageDisposition(ctx,
return msg.rcv.messageDisposition(ctx,
msg, &encoding.StateModified{
DeliveryFailed: options.DeliveryFailed,
UndeliverableHere: options.UndeliverableHere,
Expand Down Expand Up @@ -269,11 +269,18 @@ func (r *Receiver) sendDisposition(ctx context.Context, first uint32, last *uint
}
}

// messageDisposition is called via the *Receiver associated with a *Message.
// this allows messages to be settled across Receiver instances.
// note that only unsettled messsages will have their rcv field set.
func (r *Receiver) messageDisposition(ctx context.Context, msg *Message, state encoding.DeliveryState) error {
// settling a message that's already settled (sender-settled or otherwise) will have a nil rcv.
// which means that r will be nil. you MUST NOT dereference r if msg.settled == true
if msg.settled {
return nil
}

debug.Assert(r != nil)

// NOTE: we MUST add to the in-flight map before sending the disposition. if not, it's possible
// to receive the ack'ing disposition frame *before* the in-flight map has been updated which
// will cause the below <-wait to never trigger.
Expand All @@ -290,6 +297,7 @@ func (r *Receiver) messageDisposition(ctx context.Context, msg *Message, state e

if wait == nil {
// mode first, there will be no settlement ack
msg.onSettlement()
r.deleteUnsettled(msg)
r.onSettlement(1)
return nil
Expand Down Expand Up @@ -703,7 +711,7 @@ func (r *Receiver) muxHandleFrame(fr frames.FrameBody) error {
// removal from the in-flight map will also remove the message from the unsettled map
count := r.inFlight.remove(fr.First, fr.Last, dispositionError, func(msg *Message) {
r.deleteUnsettled(msg)
msg.settled = true
msg.onSettlement()
})
r.onSettlement(count)

Expand Down Expand Up @@ -809,6 +817,7 @@ func (r *Receiver) muxReceive(fr frames.PerformTransfer) {
// send to receiver
if !r.msg.settled {
r.addUnsettled(&r.msg)
r.msg.rcv = r
debug.Log(3, "RX (Receiver %p): add unsettled delivery ID %d", r, r.msg.deliveryID)
}

Expand Down

0 comments on commit 76124f7

Please sign in to comment.