Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: harmony: Correctly separate this counters when sharing Max #246

Merged
merged 1 commit into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 2 additions & 18 deletions harmony/harmonytask/harmonytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type TaskTypeDetails struct {
// Max returns how many tasks this machine can run of this type.
// Nil (default)/Zero or less means unrestricted.
// Counters can either be independent when created with Max, or shared between tasks with SharedMax.Make()
Max Limiter
Max taskhelp.Limiter

// Name is the task name to be added to the task list.
Name string
Expand Down Expand Up @@ -105,23 +105,6 @@ type TaskInterface interface {
Adder(AddTaskFunc)
}

type Limiter interface {
// Active returns the number of tasks of this type that are currently running
// in this limiter / limiter group.
Active() int

// ActiveThis returns the number of tasks of this type that are currently running
// in this limiter (e.g. per-task-type count).
ActiveThis() int

// AtMax returns whether this limiter permits more tasks to run.
AtMax() bool

// Add increments / decrements the active task counters by delta. This call
// is atomic
Add(delta int)
}

// AddTaskFunc is responsible for adding a task's details "extra info" to the DB.
// It should return true if the task should be added, false if it was already there.
// This is typically accomplished with a "unique" index on your detals table that
Expand Down Expand Up @@ -188,6 +171,7 @@ func New(
if h.Max == nil {
h.Max = taskhelp.Max(0)
}
h.Max = h.Max.Instance()

if Registry[h.TaskTypeDetails.Name] == nil {
return nil, fmt.Errorf("task %s not registered: var _ = harmonytask.Reg(t TaskInterface)", h.TaskTypeDetails.Name)
Expand Down
25 changes: 25 additions & 0 deletions harmony/taskhelp/max.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,27 @@ import (
"sync/atomic"
)

type Limiter interface {
// Active returns the number of tasks of this type that are currently running
// in this limiter / limiter group.
Active() int

// ActiveThis returns the number of tasks of this type that are currently running
// in this limiter (e.g. per-task-type count).
ActiveThis() int

// AtMax returns whether this limiter permits more tasks to run.
AtMax() bool

// Add increments / decrements the active task counters by delta. This call
// is atomic
Add(delta int)

// Instance spawns a sub-instance of this limiter. This is called by harmonytask on startup for each task
// using this limiter. Each sub-instance has it's own individual "This" counter, but can share a common counter.
Instance() Limiter
}

type MaxCounter struct {
// maximum number of tasks of this type that can be run
N int
Expand Down Expand Up @@ -37,6 +58,10 @@ func (m *MaxCounter) Add(n int) {
m.currentThis.Add(int32(n))
}

func (m *MaxCounter) Instance() Limiter {
return &MaxCounter{N: m.N, current: m.current, currentThis: new(atomic.Int32)}
}

func Max(n int) *MaxCounter {
return &MaxCounter{N: n, current: new(atomic.Int32), currentThis: new(atomic.Int32)}
}
5 changes: 3 additions & 2 deletions tasks/seal/task_sdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/harmony/taskhelp"
"github.com/filecoin-project/curio/lib/dealdata"
ffi2 "github.com/filecoin-project/curio/lib/ffi"
"github.com/filecoin-project/curio/lib/paths"
Expand Down Expand Up @@ -45,11 +46,11 @@ type SDRTask struct {

sc *ffi2.SealCalls

max harmonytask.Limiter
max taskhelp.Limiter
min int
}

func NewSDRTask(api SDRAPI, db *harmonydb.DB, sp *SealPoller, sc *ffi2.SealCalls, maxSDR harmonytask.Limiter, minSDR int) *SDRTask {
func NewSDRTask(api SDRAPI, db *harmonydb.DB, sp *SealPoller, sc *ffi2.SealCalls, maxSDR taskhelp.Limiter, minSDR int) *SDRTask {
return &SDRTask{
api: api,
db: db,
Expand Down
5 changes: 3 additions & 2 deletions tasks/unseal/task_unseal_sdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/harmony/taskhelp"
"github.com/filecoin-project/curio/lib/ffi"
"github.com/filecoin-project/curio/lib/passcall"
"github.com/filecoin-project/curio/lib/paths"
Expand All @@ -30,14 +31,14 @@ type UnsealSDRApi interface {
}

type TaskUnsealSdr struct {
max harmonytask.Limiter
max taskhelp.Limiter

sc *ffi.SealCalls
db *harmonydb.DB
api UnsealSDRApi
}

func NewTaskUnsealSDR(sc *ffi.SealCalls, db *harmonydb.DB, max harmonytask.Limiter, api UnsealSDRApi) *TaskUnsealSdr {
func NewTaskUnsealSDR(sc *ffi.SealCalls, db *harmonydb.DB, max taskhelp.Limiter, api UnsealSDRApi) *TaskUnsealSdr {
return &TaskUnsealSdr{
max: max,
sc: sc,
Expand Down