diff --git a/harmony/harmonytask/harmonytask.go b/harmony/harmonytask/harmonytask.go index e054d3cf1..e59b79ca3 100644 --- a/harmony/harmonytask/harmonytask.go +++ b/harmony/harmonytask/harmonytask.go @@ -26,7 +26,7 @@ type TaskTypeDetails struct { // Max returns how many tasks this machine can run of this type. // Nil (default)/Zero or less means unrestricted. // Counters can either be independent when created with Max, or shared between tasks with SharedMax.Make() - Max Limiter + Max taskhelp.Limiter // Name is the task name to be added to the task list. Name string @@ -105,23 +105,6 @@ type TaskInterface interface { Adder(AddTaskFunc) } -type Limiter interface { - // Active returns the number of tasks of this type that are currently running - // in this limiter / limiter group. - Active() int - - // ActiveThis returns the number of tasks of this type that are currently running - // in this limiter (e.g. per-task-type count). - ActiveThis() int - - // AtMax returns whether this limiter permits more tasks to run. - AtMax() bool - - // Add increments / decrements the active task counters by delta. This call - // is atomic - Add(delta int) -} - // AddTaskFunc is responsible for adding a task's details "extra info" to the DB. // It should return true if the task should be added, false if it was already there. // This is typically accomplished with a "unique" index on your detals table that @@ -188,6 +171,7 @@ func New( if h.Max == nil { h.Max = taskhelp.Max(0) } + h.Max = h.Max.Instance() if Registry[h.TaskTypeDetails.Name] == nil { return nil, fmt.Errorf("task %s not registered: var _ = harmonytask.Reg(t TaskInterface)", h.TaskTypeDetails.Name) diff --git a/harmony/taskhelp/max.go b/harmony/taskhelp/max.go index 05e89c404..46ed7b512 100644 --- a/harmony/taskhelp/max.go +++ b/harmony/taskhelp/max.go @@ -4,6 +4,27 @@ import ( "sync/atomic" ) +type Limiter interface { + // Active returns the number of tasks of this type that are currently running + // in this limiter / limiter group. + Active() int + + // ActiveThis returns the number of tasks of this type that are currently running + // in this limiter (e.g. per-task-type count). + ActiveThis() int + + // AtMax returns whether this limiter permits more tasks to run. + AtMax() bool + + // Add increments / decrements the active task counters by delta. This call + // is atomic + Add(delta int) + + // Instance spawns a sub-instance of this limiter. This is called by harmonytask on startup for each task + // using this limiter. Each sub-instance has it's own individual "This" counter, but can share a common counter. + Instance() Limiter +} + type MaxCounter struct { // maximum number of tasks of this type that can be run N int @@ -37,6 +58,10 @@ func (m *MaxCounter) Add(n int) { m.currentThis.Add(int32(n)) } +func (m *MaxCounter) Instance() Limiter { + return &MaxCounter{N: m.N, current: m.current, currentThis: new(atomic.Int32)} +} + func Max(n int) *MaxCounter { return &MaxCounter{N: n, current: new(atomic.Int32), currentThis: new(atomic.Int32)} } diff --git a/tasks/seal/task_sdr.go b/tasks/seal/task_sdr.go index 2e989821b..1622e2550 100644 --- a/tasks/seal/task_sdr.go +++ b/tasks/seal/task_sdr.go @@ -14,6 +14,7 @@ import ( "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" "github.com/filecoin-project/curio/harmony/resources" + "github.com/filecoin-project/curio/harmony/taskhelp" "github.com/filecoin-project/curio/lib/dealdata" ffi2 "github.com/filecoin-project/curio/lib/ffi" "github.com/filecoin-project/curio/lib/paths" @@ -45,11 +46,11 @@ type SDRTask struct { sc *ffi2.SealCalls - max harmonytask.Limiter + max taskhelp.Limiter min int } -func NewSDRTask(api SDRAPI, db *harmonydb.DB, sp *SealPoller, sc *ffi2.SealCalls, maxSDR harmonytask.Limiter, minSDR int) *SDRTask { +func NewSDRTask(api SDRAPI, db *harmonydb.DB, sp *SealPoller, sc *ffi2.SealCalls, maxSDR taskhelp.Limiter, minSDR int) *SDRTask { return &SDRTask{ api: api, db: db, diff --git a/tasks/unseal/task_unseal_sdr.go b/tasks/unseal/task_unseal_sdr.go index d8cf3880f..57d09027b 100644 --- a/tasks/unseal/task_unseal_sdr.go +++ b/tasks/unseal/task_unseal_sdr.go @@ -13,6 +13,7 @@ import ( "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" "github.com/filecoin-project/curio/harmony/resources" + "github.com/filecoin-project/curio/harmony/taskhelp" "github.com/filecoin-project/curio/lib/ffi" "github.com/filecoin-project/curio/lib/passcall" "github.com/filecoin-project/curio/lib/paths" @@ -30,14 +31,14 @@ type UnsealSDRApi interface { } type TaskUnsealSdr struct { - max harmonytask.Limiter + max taskhelp.Limiter sc *ffi.SealCalls db *harmonydb.DB api UnsealSDRApi } -func NewTaskUnsealSDR(sc *ffi.SealCalls, db *harmonydb.DB, max harmonytask.Limiter, api UnsealSDRApi) *TaskUnsealSdr { +func NewTaskUnsealSDR(sc *ffi.SealCalls, db *harmonydb.DB, max taskhelp.Limiter, api UnsealSDRApi) *TaskUnsealSdr { return &TaskUnsealSdr{ max: max, sc: sc,