-
Notifications
You must be signed in to change notification settings - Fork 3
/
writer_queue.go
172 lines (154 loc) · 5.04 KB
/
writer_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/* This file is part of copyondemand.
*
* Copyright © 2020 Datto, Inc.
* Author: Bryan Ehrlich <[email protected]>
*
* Licensed under the Apache Software License, Version 2.0
* Fedora-License-Identifier: ASL 2.0
* SPDX-2.0-License-Identifier: Apache-2.0
* SPDX-3.0-License-Identifier: Apache-2.0
*
* copyondemand is free software.
* For more information on the license, see LICENSE.
* For more information on free software, see <https://www.gnu.org/philosophy/free-sw.en.html>.
*
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package copyondemand
import (
"sync"
"time"
)
const (
dynamicQueueCapacity = 500 // Hard cap of # of actions that can be in the queue
backgroundQueueCapacity = 1
)
// WriteActionType is an enum for the type of action that can be in the writer queue
type WriteActionType int
// QueueType is an enum for the type of queue you want an action from
type QueueType int
const (
// WriteData actions flush data to disk that was already read from source by a dynamic read action
WriteData WriteActionType = 0
// FixDirtyBlock actions sync data from source, to fully sync blocks that were partially written on the target
FixDirtyBlock WriteActionType = 1
// SyncBlock actions sync data from source and write them to the backing file
SyncBlock WriteActionType = 2
)
const (
// DynamicQueue serves actions from the user
DynamicQueue QueueType = 0
// BackgroundQueue copies blocks in the background
BackgroundQueue QueueType = 1
)
// QueuedWriteAction contains the type, affected block range, and optional data to be written to disk
type QueuedWriteAction struct {
startBlock uint64
endBlock uint64
actionType WriteActionType
data []byte
}
// WriterQueue is a thin wrapper to a channel, which allows for limiting the amount of data in the channel at once
type WriterQueue struct {
dynamicActionQueue chan *QueuedWriteAction
backgroundActionQueue chan *QueuedWriteAction
writeActionPool *sync.Pool
}
// NewWriterQueue initializes a writer queue
func NewWriterQueue() *WriterQueue {
dynamicActionQueue := make(chan *QueuedWriteAction, dynamicQueueCapacity)
backgroundActionQueue := make(chan *QueuedWriteAction, backgroundQueueCapacity)
writeActionPool := &sync.Pool{
New: func() interface{} {
return &QueuedWriteAction{}
},
}
return &WriterQueue{
dynamicActionQueue,
backgroundActionQueue,
writeActionPool,
}
}
// TryEnqueue attempts to add a write action to the write queue. timeoutMilliseconds = 0
// indicates that this function should block
func (wq *WriterQueue) TryEnqueue(wa *QueuedWriteAction, timeoutMilliseconds int) bool {
var selectedQueue chan *QueuedWriteAction
if wa.actionType == SyncBlock {
selectedQueue = wq.backgroundActionQueue
} else {
selectedQueue = wq.dynamicActionQueue
}
if timeoutMilliseconds == 0 {
selectedQueue <- wa
return true
}
select {
case selectedQueue <- wa:
return true
case <-time.After(time.Duration(timeoutMilliseconds) * time.Millisecond):
return false
}
}
// TryDequeue attempts to pull an item off the queue, if no message has
// arrived in waitMilliseconds this method returns nil. If includeBackgroundItems
// is true you _may_ also get background block sync actions
func (wq *WriterQueue) TryDequeue(waitMilliseconds int, includeBackgroundItems bool) *QueuedWriteAction {
if includeBackgroundItems {
select {
case wa := <-wq.dynamicActionQueue:
return wa
case wa := <-wq.backgroundActionQueue:
return wa
case <-time.After(time.Duration(waitMilliseconds) * time.Millisecond):
return nil
}
} else {
select {
case wa := <-wq.dynamicActionQueue:
return wa
case <-time.After(time.Duration(waitMilliseconds) * time.Millisecond):
return nil
}
}
}
// Dequeue attempts to pull an item off the queue. This method is non-blocking
// if the queue contains no items, this function returns nil. If includeBackgroundItems
// is true you _may_ also get background block sync actions.
func (wq *WriterQueue) Dequeue(includeBackgroundItems bool) *QueuedWriteAction {
if includeBackgroundItems {
select {
case wa := <-wq.dynamicActionQueue:
return wa
case wa := <-wq.backgroundActionQueue:
return wa
default:
return nil
}
} else {
select {
case wa := <-wq.dynamicActionQueue:
return wa
default:
return nil
}
}
}
// MakeWriteAction constructs a QueuedWriteAction struct
func (wq *WriterQueue) MakeWriteAction() *QueuedWriteAction {
return wq.writeActionPool.Get().(*QueuedWriteAction)
}
// PutWriteAction adds a write action back to the pool
func (wq *WriterQueue) PutWriteAction(wa *QueuedWriteAction) {
// Remove references to heap data
wa.data = nil
wq.writeActionPool.Put(wa)
}