-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.go
73 lines (54 loc) · 1.12 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package fan
import (
"sync"
"github.com/midgarco/fan/log"
"github.com/midgarco/fan/worker"
)
type poolWorker struct {
id int64
status WorkerStatus
workFunc workfn
logger log.Interface
readyChan chan bool
jobChan *chan interface{}
resultsChan chan *worker.Result
doneChan chan bool
}
type WorkerStatus int
const (
WorkerStatus_IDLE = iota + 1
WorkerStatus_PROCESSING
WorkerStatus_COMPLETE
)
func (w *poolWorker) doWork(wg *sync.WaitGroup) {
defer wg.Done()
w.readyChan <- true
for {
select {
case payload := <-*w.jobChan:
done := make(chan bool)
result := &worker.Result{
Id: w.id,
Payload: payload,
}
go func(msg interface{}) {
defer func() { done <- true }()
w.status = WorkerStatus_PROCESSING
w.logger.Debugf("[fan] worker: %d\n", w.id)
if err := w.workFunc(&worker.Details{
Id: w.id,
Payload: payload,
}); err != nil {
result.Error = err
}
w.status = WorkerStatus_COMPLETE
}(payload)
<-done
close(done)
w.resultsChan <- result
w.status = WorkerStatus_IDLE
case <-w.doneChan:
return
}
}
}