-
Notifications
You must be signed in to change notification settings - Fork 5.9k
/
resource_pool.go
228 lines (198 loc) · 6.13 KB
/
resource_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package pools provides functionality to manage and reuse resources
// like connections.
package pools
import (
"fmt"
"time"
"github.com/ngaut/sync2"
)
var (
CLOSED_ERR = fmt.Errorf("ResourcePool is closed")
)
// Factory is a function that can be used to create a resource.
type Factory func() (Resource, error)
// Every resource needs to suport the Resource interface.
// Thread synchronization between Close() and IsClosed()
// is the responsibility the caller.
type Resource interface {
Close()
}
// ResourcePool allows you to use a pool of resources.
type ResourcePool struct {
resources chan resourceWrapper
factory Factory
capacity sync2.AtomicInt64
idleTimeout sync2.AtomicDuration
// stats
waitCount sync2.AtomicInt64
waitTime sync2.AtomicDuration
}
type resourceWrapper struct {
resource Resource
timeUsed time.Time
}
// NewResourcePool creates a new ResourcePool pool.
// capacity is the initial capacity of the pool.
// maxCap is the maximum capacity.
// If a resource is unused beyond idleTimeout, it's discarded.
// An idleTimeout of 0 means that there is no timeout.
func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration) *ResourcePool {
if capacity <= 0 || maxCap <= 0 || capacity > maxCap {
panic(fmt.Errorf("Invalid/out of range capacity"))
}
rp := &ResourcePool{
resources: make(chan resourceWrapper, maxCap),
factory: factory,
capacity: sync2.AtomicInt64(capacity),
idleTimeout: sync2.AtomicDuration(idleTimeout),
}
for i := 0; i < capacity; i++ {
rp.resources <- resourceWrapper{}
}
return rp
}
// Close empties the pool calling Close on all its resources.
// You can call Close while there are outstanding resources.
// It waits for all resources to be returned (Put).
// After a Close, Get and TryGet are not allowed.
func (rp *ResourcePool) Close() {
rp.SetCapacity(0)
}
func (rp *ResourcePool) IsClosed() (closed bool) {
return rp.capacity.Get() == 0
}
// Get will return the next available resource. If capacity
// has not been reached, it will create a new one using the factory. Otherwise,
// it will indefinitely wait till the next resource becomes available.
func (rp *ResourcePool) Get() (resource Resource, err error) {
return rp.get(true)
}
// TryGet will return the next available resource. If none is available, and capacity
// has not been reached, it will create a new one using the factory. Otherwise,
// it will return nil with no error.
func (rp *ResourcePool) TryGet() (resource Resource, err error) {
return rp.get(false)
}
func (rp *ResourcePool) get(wait bool) (resource Resource, err error) {
// Fetch
var wrapper resourceWrapper
var ok bool
select {
case wrapper, ok = <-rp.resources:
default:
if !wait {
return nil, nil
}
startTime := time.Now()
wrapper, ok = <-rp.resources
rp.recordWait(startTime)
}
if !ok {
return nil, CLOSED_ERR
}
// Unwrap
timeout := rp.idleTimeout.Get()
if wrapper.resource != nil && timeout > 0 && wrapper.timeUsed.Add(timeout).Sub(time.Now()) < 0 {
wrapper.resource.Close()
wrapper.resource = nil
}
if wrapper.resource == nil {
wrapper.resource, err = rp.factory()
if err != nil {
rp.resources <- resourceWrapper{}
}
}
return wrapper.resource, err
}
// Put will return a resource to the pool. For every successful Get,
// a corresponding Put is required. If you no longer need a resource,
// you will need to call Put(nil) instead of returning the closed resource.
// The will eventually cause a new resource to be created in its place.
func (rp *ResourcePool) Put(resource Resource) {
var wrapper resourceWrapper
if resource != nil {
wrapper = resourceWrapper{resource, time.Now()}
}
select {
case rp.resources <- wrapper:
default:
panic(fmt.Errorf("Attempt to Put into a full ResourcePool"))
}
}
// SetCapacity changes the capacity of the pool.
// You can use it to shrink or expand, but not beyond
// the max capacity. If the change requires the pool
// to be shrunk, SetCapacity waits till the necessary
// number of resources are returned to the pool.
// A SetCapacity of 0 is equivalent to closing the ResourcePool.
func (rp *ResourcePool) SetCapacity(capacity int) error {
if capacity < 0 || capacity > cap(rp.resources) {
return fmt.Errorf("capacity %d is out of range", capacity)
}
// Atomically swap new capacity with old, but only
// if old capacity is non-zero.
var oldcap int
for {
oldcap = int(rp.capacity.Get())
if oldcap == 0 {
return CLOSED_ERR
}
if oldcap == capacity {
return nil
}
if rp.capacity.CompareAndSwap(int64(oldcap), int64(capacity)) {
break
}
}
if capacity < oldcap {
for i := 0; i < oldcap-capacity; i++ {
wrapper := <-rp.resources
if wrapper.resource != nil {
wrapper.resource.Close()
}
}
} else {
for i := 0; i < capacity-oldcap; i++ {
rp.resources <- resourceWrapper{}
}
}
if capacity == 0 {
close(rp.resources)
}
return nil
}
func (rp *ResourcePool) recordWait(start time.Time) {
rp.waitCount.Add(1)
rp.waitTime.Add(time.Now().Sub(start))
}
func (rp *ResourcePool) SetIdleTimeout(idleTimeout time.Duration) {
rp.idleTimeout.Set(idleTimeout)
}
func (rp *ResourcePool) StatsJSON() string {
c, a, mx, wc, wt, it := rp.Stats()
return fmt.Sprintf(`{"Capacity": %v, "Available": %v, "MaxCapacity": %v, "WaitCount": %v, "WaitTime": %v, "IdleTimeout": %v}`, c, a, mx, wc, int64(wt), int64(it))
}
func (rp *ResourcePool) Stats() (capacity, available, maxCap, waitCount int64, waitTime, idleTimeout time.Duration) {
return rp.Capacity(), rp.Available(), rp.MaxCap(), rp.WaitCount(), rp.WaitTime(), rp.IdleTimeout()
}
func (rp *ResourcePool) Capacity() int64 {
return rp.capacity.Get()
}
func (rp *ResourcePool) Available() int64 {
return int64(len(rp.resources))
}
func (rp *ResourcePool) MaxCap() int64 {
return int64(cap(rp.resources))
}
func (rp *ResourcePool) WaitCount() int64 {
return rp.waitCount.Get()
}
func (rp *ResourcePool) WaitTime() time.Duration {
return rp.waitTime.Get()
}
func (rp *ResourcePool) IdleTimeout() time.Duration {
return rp.idleTimeout.Get()
}