-
Notifications
You must be signed in to change notification settings - Fork 56
/
policy_utilities.js
115 lines (101 loc) · 3.93 KB
/
policy_utilities.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
'use strict';
var debug = require('debug')('amqp10:policy:utils'),
constants = require('../constants'),
errors = require('../errors'),
u = require('../utilities');
var WindowPolicies = {
RefreshAtHalf: function(session) {
if (session._sessionParams.incomingWindow < (session.policy.windowQuantum / 2)) {
debug('Refreshing session window by ' + session.policy.windowQuantum + ': ' + session._sessionParams.incomingWindow + ' remaining.');
session.addWindow(session.policy.windowQuantum);
}
},
RefreshAtEmpty: function(session) {
if (session._sessionParams.incomingWindow <= 0) {
debug('Refreshing session window by ' + session.policy.windowQuantum + ': ' + session._sessionParams.incomingWindow + ' remaining.');
session.addWindow(session.policy.windowQuantum);
}
},
DoNotRefresh: function(session) {
// Do Nothing
}
};
module.exports.WindowPolicies = WindowPolicies;
var CreditPolicies = {
RefreshAtHalf: function(link) {
if (link.linkCredit < (link.policy.creditQuantum / 2)) {
debug('Refreshing link ' + link.name + ' credit by ' + link.policy.creditQuantum + ': ' + link.linkCredit + ' remaining.');
link.addCredits(link.policy.creditQuantum);
}
},
RefreshAtEmpty: function(link) {
if (link.linkCredit <= 0) {
debug('Refreshing link ' + link.name + ' credit by ' + link.policy.creditQuantum + ': ' + link.linkCredit + ' remaining.');
link.addCredits(link.policy.creditQuantum);
}
},
RefreshSettled: function (threshold) {
return function (link, options) {
if (link.policy.rcvSettleMode === constants.receiverSettleMode.autoSettle) {
throw new errors.InvalidStateError('Cannot specify RefreshSettled as link refresh policy when auto-settling messages.');
}
var creditQuantum = (!!options && options.initial) ? link.policy.creditQuantum : link.settledMessagesSinceLastCredit;
if (creditQuantum > 0 && link.linkCredit < threshold) {
debug('Refreshing link ' + link.name + ' credit by ' + creditQuantum + ': ' + link.linkCredit + ' remaining.');
link.addCredits(creditQuantum);
}
};
},
DoNotRefresh: function(link) {
// Do Nothing
}
};
module.exports.CreditPolicies = CreditPolicies;
/**
* Defines the behavior of the return value of `SenderLink.send`
* @enum
*/
var SenderCallbackPolicy = {
/** Callback immediately after sending, no promise is created */
None: 'none',
/** Only callback when settled Disposition received from recipient */
OnSettle: 'settled',
/** Callback as soon as sent, will not call-back again if future disposition
* results in error. */
OnSent: 'sent',
};
module.exports.SenderCallbackPolicies = SenderCallbackPolicy; // deprecated
module.exports.SenderCallbackPolicy = SenderCallbackPolicy;
function fixDeprecatedLinkOptions(policy) {
if (policy && policy.attach) {
if (policy.attach.hasOwnProperty('senderSettleMode')) {
policy.attach.sndSettleMode = policy.attach.senderSettleMode;
delete policy.attach.senderSettleMode;
}
if (policy.attach.hasOwnProperty('receiverSettleMode')) {
policy.attach.rcvSettleMode = policy.attach.receiverSettleMode;
delete policy.attach.receiverSettleMode;
}
}
}
module.exports.fixDeprecatedLinkOptions = fixDeprecatedLinkOptions;
function merge(newPolicy, base) {
var policy = u.deepMerge(newPolicy, base);
fixDeprecatedLinkOptions(policy.senderLink);
fixDeprecatedLinkOptions(policy.receiverLink);
return policy;
}
module.exports.Merge = merge;
// Receiver links process messages N at a time, only renewing credits on ack.
module.exports.RenewOnSettle = function(initialCredit, threshold, basePolicy) {
basePolicy = basePolicy || {};
return merge({
receiverLink: {
credit: CreditPolicies.RefreshSettled(threshold),
creditQuantum: initialCredit,
attach: {
rcvSettleMode: constants.receiverSettleMode.settleOnDisposition
}
}
}, basePolicy);
};