forked from vcabbage/amqp
-
Notifications
You must be signed in to change notification settings - Fork 59
/
Copy pathcreditor.go
117 lines (95 loc) · 2.54 KB
/
creditor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package amqp
import (
"context"
"errors"
"sync"
)
type creditor struct {
mu sync.Mutex
// future values for the next flow frame.
pendingDrain bool
creditsToAdd uint32
// drained is set when a drain is active and we're waiting
// for the corresponding flow from the remote.
drained chan struct{}
}
var (
errLinkDraining = errors.New("link is currently draining, no credits can be added")
errAlreadyDraining = errors.New("drain already in process")
)
// EndDrain ends the current drain, unblocking any active Drain calls.
func (mc *creditor) EndDrain() {
mc.mu.Lock()
defer mc.mu.Unlock()
if mc.drained != nil {
close(mc.drained)
mc.drained = nil
}
}
// FlowBits gets gets the proper values for the next flow frame
// and resets the internal state.
// Returns:
//
// (drain: true, credits: 0) if a flow is needed (drain)
// (drain: false, credits > 0) if a flow is needed (issue credit)
// (drain: false, credits == 0) if no flow needed.
func (mc *creditor) FlowBits(currentCredits uint32) (bool, uint32) {
mc.mu.Lock()
defer mc.mu.Unlock()
drain := mc.pendingDrain
var credits uint32
if mc.pendingDrain {
// only send one drain request
mc.pendingDrain = false
}
// either:
// drain is true (ie, we're going to send a drain frame, and the credits for it should be 0)
// mc.creditsToAdd == 0 (no flow frame needed, no new credits are being issued)
if drain || mc.creditsToAdd == 0 {
credits = 0
} else {
credits = mc.creditsToAdd + currentCredits
}
mc.creditsToAdd = 0
return drain, credits
}
// Drain initiates a drain and blocks until EndDrain is called.
// If the context's deadline expires or is cancelled before the operation
// completes, the drain might not have happened.
func (mc *creditor) Drain(ctx context.Context, r *Receiver) error {
mc.mu.Lock()
if mc.drained != nil {
mc.mu.Unlock()
return errAlreadyDraining
}
mc.drained = make(chan struct{})
// use a local copy to avoid racing with EndDrain()
drained := mc.drained
mc.pendingDrain = true
mc.mu.Unlock()
// cause mux() to check our flow conditions.
select {
case r.receiverReady <- struct{}{}:
default:
}
// send drain, wait for responding flow frame
select {
case <-drained:
return nil
case <-r.l.done:
return r.l.doneErr
case <-ctx.Done():
return ctx.Err()
}
}
// IssueCredit queues up additional credits to be requested at the next
// call of FlowBits()
func (mc *creditor) IssueCredit(credits uint32) error {
mc.mu.Lock()
defer mc.mu.Unlock()
if mc.drained != nil {
return errLinkDraining
}
mc.creditsToAdd += credits
return nil
}