-
Notifications
You must be signed in to change notification settings - Fork 172
/
redlock.js
432 lines (344 loc) · 11 KB
/
redlock.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
'use strict';
const util = require('util');
const crypto = require('crypto');
const Promise = require('bluebird');
const EventEmitter = require('events');
// constants
const lockScript = `
-- Return 0 if an entry already exists.
for i, key in ipairs(KEYS) do
if redis.call("exists", key) == 1 then
return 0
end
end
-- Create an entry for each provided key.
for i, key in ipairs(KEYS) do
redis.call("set", key, ARGV[1], "PX", ARGV[2])
end
-- Return the number of entries added.
return table.getn(KEYS)
`;
const unlockScript = `
local count = 0
for i, key in ipairs(KEYS) do
-- Only remove entries for *this* lock value.
if redis.call("get", key) == ARGV[1] then
redis.pcall("del", key)
count = count + 1
end
end
-- Return the number of entries removed.
return count
`;
const extendScript = `
-- Return 0 if an entry exists with a *different* lock value.
for i, key in ipairs(KEYS) do
if redis.call("get", key) ~= ARGV[1] then
return 0
end
end
-- Update the entry for each provided key.
for i, key in ipairs(KEYS) do
redis.call("set", key, ARGV[1], "PX", ARGV[2])
end
-- Return the number of entries updated.
return table.getn(KEYS)
`;
// defaults
const defaults = {
driftFactor: 0.01,
retryCount: 10,
retryDelay: 200,
retryJitter: 100
};
// LockError
// ---------
// This error is returned when there is an error locking a resource.
function LockError(message, attempts) {
Error.call(this);
Error.captureStackTrace(this, LockError);
this.name = 'LockError';
this.message = message || 'Failed to lock the resource.';
this.attempts = attempts;
}
util.inherits(LockError, Error);
// Lock
// ----
// An object of this type is returned when a resource is successfully locked. It contains
// convenience methods `unlock` and `extend` which perform the associated Redlock method on
// itself.
function Lock(redlock, resource, value, expiration, attempts) {
this.redlock = redlock;
this.resource = resource;
this.value = value;
this.expiration = expiration;
this.attempts = attempts;
}
Lock.prototype.unlock = function unlock(callback) {
return this.redlock.unlock(this, callback);
};
Lock.prototype.extend = function extend(ttl, callback) {
return this.redlock.extend(this, ttl, callback);
};
// Attach a reference to Lock, which allows the application to use instanceof
// to ensure type.
Redlock.Lock = Lock;
// Redlock
// -------
// A redlock object is instantiated with an array of at least one redis client and an optional
// `options` object. Properties of the Redlock object should NOT be changed after it is first
// used, as doing so could have unintended consequences for live locks.
function Redlock(clients, options) {
// set default options
options = options || {};
this.driftFactor = typeof options.driftFactor === 'number' ? options.driftFactor : defaults.driftFactor;
this.retryCount = typeof options.retryCount === 'number' ? options.retryCount : defaults.retryCount;
this.retryDelay = typeof options.retryDelay === 'number' ? options.retryDelay : defaults.retryDelay;
this.retryJitter = typeof options.retryJitter === 'number' ? options.retryJitter : defaults.retryJitter;
this.lockScript = typeof options.lockScript === 'function' ? options.lockScript(lockScript) : lockScript;
this.unlockScript = typeof options.unlockScript === 'function' ? options.unlockScript(unlockScript) : unlockScript;
this.extendScript = typeof options.extendScript === 'function' ? options.extendScript(extendScript) : extendScript;
// set the redis servers from additional arguments
this.servers = clients;
if(this.servers.length === 0)
throw new Error('Redlock must be instantiated with at least one redis server.');
}
// Inherit all the EventEmitter methods, like `on`, and `off`
util.inherits(Redlock, EventEmitter);
// Attach a reference to LockError per issue #7, which allows the application to use instanceof
// to destinguish between error types.
Redlock.LockError = LockError;
// quit
// ----
// This method runs `.quit()` on all client connections.
Redlock.prototype.quit = function quit(callback) {
// quit all clients
return Promise.map(this.servers, function(client) {
return client.quit();
})
// optionally run callback
.nodeify(callback);
};
// lock
// ----
// This method locks a resource using the redlock algorithm.
//
// ```js
// redlock.lock(
// 'some-resource', // the resource to lock
// 2000, // ttl in ms
// function(err, lock) { // callback function (optional)
// ...
// }
// )
// ```
Redlock.prototype.acquire =
Redlock.prototype.lock = function lock(resource, ttl, callback) {
return this._lock(resource, null, ttl, callback);
};
// lock
// ----
// This method locks a resource using the redlock algorithm,
// and returns a bluebird disposer.
//
// ```js
// using(
// redlock.disposer(
// 'some-resource', // the resource to lock
// 2000 // ttl in ms
// ),
// function(lock) {
// ...
// }
// );
// ```
Redlock.prototype.disposer = function disposer(resource, ttl, errorHandler) {
errorHandler = errorHandler || function(err) {};
return this._lock(resource, null, ttl).disposer(function(lock){
return lock.unlock().catch(errorHandler);
});
};
// unlock
// ------
// This method unlocks the provided lock from all servers still persisting it. It will fail
// with an error if it is unable to release the lock on a quorum of nodes, but will make no
// attempt to restore the lock on nodes that failed to release. It is safe to re-attempt an
// unlock or to ignore the error, as the lock will automatically expire after its timeout.
Redlock.prototype.release =
Redlock.prototype.unlock = function unlock(lock, callback) {
const self = this;
// array of locked resources
const resource = Array.isArray(lock.resource)
? lock.resource
: [lock.resource];
// immediately invalidate the lock
lock.expiration = 0;
return new Promise(function(resolve, reject) {
// the number of votes needed for consensus
const quorum = Math.floor(self.servers.length / 2) + 1;
// the number of servers which have agreed to release this lock
let votes = 0;
// the number of async redis calls still waiting to finish
let waiting = self.servers.length;
// release the lock on each server
self.servers.forEach(function(server){
return server.eval(
[
self.unlockScript,
resource.length,
...resource,
lock.value
],
loop
)
});
function loop(err, response) {
if(err) self.emit('clientError', err);
// - If the response is less than the resource length, than one or
// more resources failed to unlock:
// - It may have been re-acquired by another process;
// - It may hava already been manually released;
// - It may have expired;
if(response === resource.length || response === '' + resource.length)
votes++;
if(waiting-- > 1) return;
// SUCCESS: there is concensus and the lock is released
if(votes >= quorum)
return resolve();
// FAILURE: the lock could not be released
return reject(new LockError('Unable to fully release the lock on resource "' + lock.resource + '".'));
}
})
// optionally run callback
.nodeify(callback);
};
// extend
// ------
// This method extends a valid lock by the provided `ttl`.
Redlock.prototype.extend = function extend(lock, ttl, callback) {
const self = this;
// the lock has expired
if(lock.expiration < Date.now())
return Promise.reject(new LockError('Cannot extend lock on resource "' + lock.resource + '" because the lock has already expired.', 0)).nodeify(callback);
// extend the lock
return self._lock(lock.resource, lock.value, ttl)
// modify and return the original lock object
.then(function(extension){
lock.value = extension.value;
lock.expiration = extension.expiration;
return lock;
})
// optionally run callback
.nodeify(callback);
};
// _lock
// -----
// This method locks a resource using the redlock algorithm.
//
// ###Creating New Locks:
//
// ```js
// redlock._lock(
// 'some-resource', // the resource to lock
// null, // no original lock value
// 2000, // ttl in ms
// function(err, lock) { // callback function (optional)
// ...
// }
// )
// ```
//
// ###Extending Existing Locks:
//
// ```js
// redlock._lock(
// 'some-resource', // the resource to lock
// 'dkkk18g4gy39dx6r', // the value of the original lock
// 2000, // ttl in ms
// function(err, lock) { // callback function (optional)
// ...
// }
// )
// ```
Redlock.prototype._lock = function _lock(resource, value, ttl, callback) {
const self = this;
// array of locked resources
resource = Array.isArray(resource) ? resource : [resource];
return new Promise(function(resolve, reject) {
let request;
// the number of times we have attempted this lock
let attempts = 0;
// create a new lock
if(value === null) {
value = self._random();
request = function(server, loop){
return server.eval(
[
self.lockScript,
resource.length,
...resource,
value,
ttl
],
loop
);
};
}
// extend an existing lock
else {
request = function(server, loop){
return server.eval(
[
self.extendScript,
resource.length,
...resource,
value,
ttl
],
loop
);
};
}
function attempt(){
attempts++;
// the time when this attempt started
const start = Date.now();
// the number of votes needed for consensus
const quorum = Math.floor(self.servers.length / 2) + 1;
// the number of servers which have agreed to this lock
let votes = 0;
// the number of async redis calls still waiting to finish
let waiting = self.servers.length;
function loop(err, response) {
if(err) self.emit('clientError', err);
if(response === resource.length || response === '' + resource.length) votes++;
if(waiting-- > 1) return;
// Add 2 milliseconds to the drift to account for Redis expires precision, which is 1 ms,
// plus the configured allowable drift factor
const drift = Math.round(self.driftFactor * ttl) + 2;
const lock = new Lock(self, resource, value, start + ttl - drift, attempts);
// SUCCESS: there is concensus and the lock is not expired
if(votes >= quorum && lock.expiration > Date.now())
return resolve(lock);
// remove this lock from servers that voted for it
return lock.unlock(function(){
// RETRY
if(self.retryCount === -1 || attempts <= self.retryCount)
return setTimeout(attempt, Math.max(0, self.retryDelay + Math.floor((Math.random() * 2 - 1) * self.retryJitter)));
// FAILED
return reject(new LockError('Exceeded ' + self.retryCount + ' attempts to lock the resource "' + resource + '".', attempts));
});
}
return self.servers.forEach(function(server){
return request(server, loop);
});
}
return attempt();
})
// optionally run callback
.nodeify(callback);
};
Redlock.prototype._random = function _random(){
return crypto.randomBytes(16).toString('hex');
};
module.exports = Redlock;