Ozero is a goroutine pool for Go, focused on simplicity.
When you create a new ozero, you can set the pool size, by using NewPoolN(), or you can just let it be the default size, being the CPU count, with NewPool(). The interface has been designed to have a user-friendly style, so all you have to do is .SetWorkerFunc() to handle the jobs in the pool. From that moment on, you have a ready-to-use pool, to which you can .SendJob(data) any job you want to the pool, and it will be processed by the first available goroutine.
You have two functions to send jobs to the pool:
- SendJob(data). It will send the job to the pool and return inmediately, no matter how busy the pool is.
- SendJobSync(data). It will send the job to the pool, waiting until one goroutine gets the job and starts working on it.
Usually, there is no big difference on which method you use, however, there is one little gotcha you need to know.
In the following example, the pool will behave randomly, because some of the goroutines started by .SendJob may have not been initialized yet when your main goroutine gets to .Close(). Because of that, some of the jobs may not get processed, because they are being sent to a closed pool. Sending jobs on a closed pool does not cause a panic, they will just get ignored.
func main() {
nThreads := 10
taskPool := ozero.NewPoolN(nThreads).SetWorkerFunc(func(data interface{}) error {
x := data.(int)
log.Printf("Data: %d\n", x)
time.Sleep(time.Second)
return nil
})
before := time.Now()
for i := 0; i < 20; i++ {
taskPool.SendJob(i) // Here you should use .SendJobSync()
}
taskPool.Close()
log.Printf("Elapsed %.2f seconds", time.Now().Sub(before).Seconds())
}
In the previous example, using SendJobSync, being that the pool size is 10, and there are 20 jobs to be processed, each lasting 1 second, the expected total time is 2 seconds.
If you are not going to close the pool, the recommended method to send jobs is SendJob, because it will let you send jobs, even if the pool is busy.
If your WorkerFunc crashes, a new goroutine is spawned, so you don't have to worry about the pool crashing. Everything is built thread-safe for you. If you want to catch this crashes, or the errors your workerFunc returns, you can just .SetErrorFunc(), and you'll get the data and the error caused.
Often you want to retry a job if it fails. To do this, you have the following functions available:
- SetTries(n). Sets the maximum number of times that a job is retried if it crashes. Set to zero to retry indefinitely.
- SetRetryDelay(duration). Set the time between retries.
- SetShouldRetryFunc(data, error, retry count). You can avoid a job being retried for a specficied error by implementing this funcion and returning false. This is useful if your job might fail in a permanent way, like in a HTTP 404 error, or might fail in a temporary way, like in a HTTP 500 error.
One important note, is that your error func is only called after all the retries are being executed, and your ShouldRetryFunc is called after every error or crash.
A common way to use the WorkerFunc is to let it panic on error, letting the pool retry the job.
Finally, you can create as many pools as you want!
package main
import (
"log"
"time"
"github.com/ANPez/Ozero"
)
func main() {
nThreads := 10
taskPool := ozero.NewPoolN(nThreads).SetWorkerFunc(func(data interface{}) error {
url := data.(string)
log.Printf("Downloading URL: %s.", url)
downloadOrPanic(url)
log.Printf("Job finished OK")
return nil
}).SetErrorFunc(func(data interface{}, err error) {
log.Printf("Error while processing job in queue")
}).SetShouldRetryFunc(func(data interface{}, err error, retry int) bool {
switch err := err.(type) {
case *types.HTTPError:
return (err.StatusCode < 400) || (err.StatusCode >= 500)
}
return true
}).SetTries(3).SetRetryDelay(time.Second)
}
Copyright 2016 Antonio Nicolás Pina
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.