-
Notifications
You must be signed in to change notification settings - Fork 0
/
blocks.js
161 lines (149 loc) · 4.39 KB
/
blocks.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
'use strict'
function isEmpty (o) {
for(var k in o) return false
return true
}
const constants = require('./constants')
const B_HEADER = constants.block
const V_HEADER = constants.vector
const FREE = constants.free
module.exports = function (raf, block_size, magic_number) {
if(!magic_number)
magic_number = constants.magic
// if(!magic_number) throw new Error('non-zero magic number must be provided')
//todo: make blocks be a cache, not store all blocks in memory.
var blocks = []
var dirty = {}, _dirty, writing
var waiting_ready = [], waiting_drain = [], self
function ready () {
while(waiting_ready.length) waiting_ready.shift()()
}
;(function reload () {
raf.stat(function (err, stat) {
//get the last block. it should always be complete,
//but raf errors if you request a partial read.
var block_index = Math.ceil(stat.size/block_size) - 1
if(err || stat.size == 0) {
var block = blocks[0] = Buffer.alloc(block_size)
blocks[0].writeUInt32LE(magic_number, 0)
blocks[0].writeUInt32LE(self.free = B_HEADER, FREE)
ready()
}
else {
raf.read(block_index*block_size, block_size, function (err, _block) {
if(err) throw err
if(_block.length < block_size) {
block = Buffer.alloc(block_size)
_block.copy(block)
}
else block = _block
if(block.readUInt32LE(0) !== magic_number) {
//throw new Error('incorrect magic number')
return raf.del(0, stat.size, reload)
}
blocks[block_index] = block
self.free = block_index*block_size + block.readUInt32LE(FREE)
ready()
})
}
})
})()
var _dirty
function write () {
_dirty = dirty
dirty = {}
;(function next () {
for(var k in _dirty) {
var i = +k
delete dirty[i]
return raf.write(block_size*i, blocks[i], function (err) {
delete _dirty[i]
next()
})
}
//if we are here, we wrote everything.
if(!isEmpty(dirty)) write()
else {
writing = false
while(waiting_drain.length)
waiting_drain.shift()()
}
})()
}
function queue_write () {
if(writing) return
writing = true
write()
}
return self = {
block_size: block_size,
free: undefined,
get: function (i, cb) {
if(Buffer.isBuffer(blocks[i]))
cb(null, blocks[i])
else if(Array.isArray(blocks[i]))
blocks[i].push(cb)
//optimize case where there is only a single reader:
else if('function' === typeof blocks[i])
blocks[i] = [blocks[i], cb]
else {
// if(i >= blocks.length) throw new Error('read beyond length:'+i)
blocks[i] = cb
raf.read(block_size*i, block_size, function (err, block) {
if(err) throw err
if(block.readUInt32LE(0) !== magic_number) {
throw new Error('incorrect magic number')
return raf.del(0, stat.size, reload)
}
if('function' === typeof blocks[i]) {
var _cb = blocks[i]
blocks[i] = block
_cb(null, block)
}
else {
var _cbs = blocks[i]
blocks[i] = block
for(var j = 0; j < _cbs.length; j++)
_cbs[j](null, block)
}
})
}
},
dirty: function (i) {
dirty[i] = true
//queue write...
queue_write()
},
last: function () {
//if last block is non-empty, return that
//else allocate a new empty block.
self.free = (blocks.length-1) * block_size + blocks[blocks.length-1].readUInt32LE(FREE)
if(self.free % block_size)
return ~~(self.free / block_size)
else {
var i = blocks.length
blocks[i] = Buffer.alloc(block_size)
blocks[i].writeUInt32LE(magic_number, 0)
blocks[i].writeUInt32LE(B_HEADER, FREE)
self.dirty(i)
return i
}
},
clear: function () {
//clear everything not currently being read...
for(var i = 0; i < blocks.length - 1; i++) {
if(!Array.isArray(blocks[i]))
delete blocks[i]
}
},
blocks: blocks,
ready: function (cb) {
if(self.free) return cb()
else waiting_ready.push(cb)
},
drain: function (cb) {
if(!writing && isEmpty(dirty)) cb()
else waiting_drain.push(cb)
}
}
}