Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: F3 #292

Merged
merged 7 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/filecoin-project/curio/lib/paths"
"github.com/filecoin-project/curio/lib/slotmgr"
"github.com/filecoin-project/curio/lib/storiface"
"github.com/filecoin-project/curio/tasks/f3"
"github.com/filecoin-project/curio/tasks/gc"
"github.com/filecoin-project/curio/tasks/message"
"github.com/filecoin-project/curio/tasks/metadata"
Expand Down Expand Up @@ -150,7 +151,8 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
store := dependencies.Stor
winPoStTask := winning.NewWinPostTask(cfg.Subsystems.WinningPostMaxTasks, db, store, verif, asyncParams(), full, maddrs)
inclCkTask := winning.NewInclusionCheckTask(db, full)
activeTasks = append(activeTasks, winPoStTask, inclCkTask)
f3Task := f3.NewF3Task(db, full, maddrs)
activeTasks = append(activeTasks, winPoStTask, inclCkTask, f3Task)

// Warn if also running a sealing task
if cfg.Subsystems.EnableSealSDR || cfg.Subsystems.EnableSealSDRTrees || cfg.Subsystems.EnableSendPrecommitMsg || cfg.Subsystems.EnablePoRepProof || cfg.Subsystems.EnableMoveStorage || cfg.Subsystems.EnableSendCommitMsg || cfg.Subsystems.EnableUpdateEncode || cfg.Subsystems.EnableUpdateProve || cfg.Subsystems.EnableUpdateSubmit {
Expand Down
3 changes: 2 additions & 1 deletion deps/apiinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/filecoin-project/curio/api"

lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
cliutil "github.com/filecoin-project/lotus/cli/util"
)
Expand Down Expand Up @@ -96,7 +97,7 @@ var RPCErrors = jsonrpc.NewErrors()
func newChainNodeRPCV1(ctx context.Context, addr string, requestHeader http.Header, opts ...jsonrpc.Option) (api.Chain, jsonrpc.ClientCloser, error) {
var res api.ChainStruct
closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin",
api.GetInternalStructs(&res), requestHeader, append([]jsonrpc.Option{jsonrpc.WithErrors(RPCErrors)}, opts...)...)
api.GetInternalStructs(&res), requestHeader, append([]jsonrpc.Option{jsonrpc.WithErrors(lapi.RPCErrors)}, opts...)...)

return &res, closer, err
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/filecoin-project/go-commp-utils v0.1.4
github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20240802040721-2a04ffc8ffe8
github.com/filecoin-project/go-commp-utils/v2 v2.1.0
github.com/filecoin-project/go-f3 v0.7.0
github.com/filecoin-project/go-fil-commcid v0.2.0
github.com/filecoin-project/go-fil-commp-hashhash v0.2.0
github.com/filecoin-project/go-jsonrpc v0.6.1-0.20240820160949-2cfe810e5d2f
Expand Down Expand Up @@ -53,6 +54,7 @@ require (
github.com/ipfs/go-ipld-cbor v0.2.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
github.com/jpillora/backoff v1.0.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/libp2p/go-buffer-pool v0.1.0
github.com/manifoldco/promptui v0.9.0
Expand Down Expand Up @@ -131,7 +133,6 @@ require (
github.com/filecoin-project/go-amt-ipld/v4 v4.4.0 // indirect
github.com/filecoin-project/go-clock v0.1.0 // indirect
github.com/filecoin-project/go-crypto v0.1.0 // indirect
github.com/filecoin-project/go-f3 v0.7.0 // indirect
github.com/filecoin-project/go-hamt-ipld v0.1.5 // indirect
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 // indirect
github.com/filecoin-project/go-hamt-ipld/v3 v3.4.0 // indirect
Expand Down Expand Up @@ -205,7 +206,6 @@ require (
github.com/jessevdk/go-flags v1.4.0 // indirect
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/kilic/bls12-381 v0.1.1-0.20220929213557-ca162e8a70f4 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
Expand Down
7 changes: 7 additions & 0 deletions harmony/harmonydb/sql/20241021-f3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE f3_tasks (
sp_id BIGINT PRIMARY KEY,
task_id BIGINT UNIQUE,
previous_ticket BYTEA,

FOREIGN KEY (task_id) REFERENCES harmony_task (id) ON DELETE SET NULL
);
11 changes: 11 additions & 0 deletions harmony/taskhelp/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,14 @@ func SliceIfFound[T any](slice []T, f func(T) bool) ([]T, bool) {
}
return slice[:ct], true
}

// BackgroundTask are tasks that:
// * Always run in the background
// * Never finish "successfully"
func BackgroundTask(name string) string {
return "bg:" + name
}

func IsBackgroundTask(name string) bool {
return len(name) > 3 && name[:3] == "bg:"
magik6k marked this conversation as resolved.
Show resolved Hide resolved
}
285 changes: 285 additions & 0 deletions tasks/f3/f3_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
package f3

import (
"context"
"errors"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/jpillora/backoff"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/manifest"

"github.com/filecoin-project/curio/deps"
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/harmony/taskhelp"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)

const (
// ParticipationCheckProgressMaxAttempts defines the maximum number of failed attempts
// before we abandon the current lease and restart the participation process.
//
// The default backoff takes 12 attempts to reach a maximum delay of 1 minute.
// Allowing for 13 failures results in approximately 2 minutes of backoff since
// the lease was granted. Given a lease validity of up to 5 instances, this means
// we would give up on checking the lease during its mid-validity period;
// typically when we would try to renew the participation ticket. Hence, the value
// to 13.
ParticipationCheckProgressMaxAttempts = 13

// ParticipationLeaseTerm is the number of instances the miner will attempt to lease from nodes.
ParticipationLeaseTerm = 5
)

var log = logging.Logger("cf3")

type F3ParticipationAPI interface {
F3GetOrRenewParticipationTicket(ctx context.Context, minerID address.Address, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error) //perm:sign
F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error)
F3GetProgress(ctx context.Context) (gpbft.Instant, error)
F3GetManifest(ctx context.Context) (*manifest.Manifest, error)
}

type F3Task struct {
db *harmonydb.DB
api F3ParticipationAPI

leaseTerm uint64

actors map[dtypes.MinerAddress]bool
}

func NewF3Task(db *harmonydb.DB, api F3ParticipationAPI, actors map[dtypes.MinerAddress]bool) *F3Task {
return &F3Task{
db: db,
api: api,
leaseTerm: ParticipationLeaseTerm,

actors: actors,
}
}

func (f *F3Task) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
ctx := context.Background()
magik6k marked this conversation as resolved.
Show resolved Hide resolved

var spID int64
err = f.db.QueryRow(ctx, "SELECT sp_id FROM f3_tasks WHERE task_id = $1", taskID).Scan(&spID)
if err != nil {
return false, xerrors.Errorf("failed to get sp_id: %w", err)
}

maddr, err := address.NewIDAddress(uint64(spID))
if err != nil {
return false, xerrors.Errorf("failed to parse miner address: %w", err)
}

for {
if !stillOwned() {
return false, nil
}
magik6k marked this conversation as resolved.
Show resolved Hide resolved

var previousTicket []byte
err = f.db.QueryRow(ctx, "SELECT previous_ticket FROM f3_tasks WHERE task_id = $1", taskID).Scan(&previousTicket)
if err != nil {
return false, xerrors.Errorf("failed to get previous ticket: %w", err)
}

// Ensure that calls are made on the same node (the first call will determine the node)
ctx := deps.OnSingleNode(ctx)
magik6k marked this conversation as resolved.
Show resolved Hide resolved

ticket, err := f.tryGetF3ParticipationTicket(ctx, stillOwned, maddr, previousTicket)
if err != nil {
return false, xerrors.Errorf("failed to get participation ticket: %w", err)
}

lease, participating, err := f.tryF3Participate(ctx, stillOwned, ticket)
if err != nil {
return false, xerrors.Errorf("failed to participate in F3: %w", err)
}
if !participating {
return false, xerrors.Errorf("failed to participate in F3: not participating")
}

_, err = f.db.Exec(ctx, "UPDATE f3_tasks SET previous_ticket = $1 WHERE task_id = $2", ticket, taskID)
if err != nil {
return false, xerrors.Errorf("failed to update previous ticket: %w", err)
}

bo := &backoff.Backoff{
magik6k marked this conversation as resolved.
Show resolved Hide resolved
Min: 1 * time.Second,
Max: 1 * time.Minute,
Factor: 1.5,
}

err = f.awaitLeaseExpiry(stillOwned, ctx, lease, bo)
magik6k marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return false, xerrors.Errorf("failed to await lease expiry: %w", err)
}
}
}

func (f *F3Task) tryGetF3ParticipationTicket(ctx context.Context, stillOwned func() bool, participant address.Address, previousTicket []byte) (api.F3ParticipationTicket, error) {
for stillOwned() {
switch ticket, err := f.api.F3GetOrRenewParticipationTicket(ctx, participant, previousTicket, ParticipationLeaseTerm); {
case ctx.Err() != nil:
return api.F3ParticipationTicket{}, ctx.Err()
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot participate in F3 as it is disabled.", "err", err)
return api.F3ParticipationTicket{}, xerrors.Errorf("acquiring F3 participation ticket: %w", err)
case err != nil:
log.Errorw("Failed to acquire F3 participation ticket; retrying", "err", err)
time.Sleep(1 * time.Second)
continue
default:
log.Debug("Successfully acquired F3 participation ticket")
return ticket, nil
}
}
return api.F3ParticipationTicket{}, ctx.Err()
}

func (f *F3Task) tryF3Participate(ctx context.Context, stillOwned func() bool, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, bool, error) {
for stillOwned() {
switch lease, err := f.api.F3Participate(ctx, ticket); {
case ctx.Err() != nil:
return api.F3ParticipationLease{}, false, ctx.Err()
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot participate in F3 as it is disabled.", "err", err)
return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err)
case errors.Is(err, api.ErrF3ParticipationTicketExpired):
log.Warnw("F3 participation ticket expired while attempting to participate. Acquiring a new ticket.", "err", err)
return api.F3ParticipationLease{}, false, nil
case errors.Is(err, api.ErrF3ParticipationTicketStartBeforeExisting):
log.Warnw("F3 participation ticket starts before the existing lease. Acquiring a new ticket.", "err", err)
return api.F3ParticipationLease{}, false, nil
case errors.Is(err, api.ErrF3ParticipationTicketInvalid):
log.Errorw("F3 participation ticket is not valid. Acquiring a new ticket after backoff.", "err", err)
time.Sleep(1 * time.Second)
return api.F3ParticipationLease{}, false, nil
case errors.Is(err, api.ErrF3ParticipationIssuerMismatch):
log.Warnw("Node is not the issuer of F3 participation ticket. Miner maybe load-balancing or node has changed. Retrying F3 participation after backoff.", "err", err)
time.Sleep(1 * time.Second)
continue
case errors.Is(err, api.ErrF3NotReady):
log.Warnw("F3 is not ready. Retrying F3 participation after backoff.", "err", err)
time.Sleep(30 * time.Second)
continue
case err != nil:
log.Errorw("Unexpected error while attempting F3 participation. Retrying after backoff", "err", err)
return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err)
default:
log.Infow("Successfully acquired F3 participation lease.",
"issuer", lease.Issuer,
"not-before", lease.FromInstance,
"not-after", lease.ToInstance(),
)
return lease, true, nil
}
}
return api.F3ParticipationLease{}, false, ctx.Err()
}

func (f *F3Task) awaitLeaseExpiry(stillOwned func() bool, ctx context.Context, lease api.F3ParticipationLease, backoff *backoff.Backoff) error {
renewLeaseWithin := f.leaseTerm / 2
for stillOwned() {
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
manifest, err := f.api.F3GetManifest(ctx)
switch {
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
case err != nil:
if backoff.Attempt() > ParticipationCheckProgressMaxAttempts {
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", backoff.Attempt(), "err", err)
return nil
}
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", backoff.Attempt(), "backoff", backoff.Duration(), "err", err)
time.Sleep(backoff.Duration())
case manifest == nil || manifest.NetworkName != lease.Network:
// If we got an unexpected manifest, or no manifest, go back to the
// beginning and try to get another ticket. Switching from having a manifest
// to having no manifest can theoretically happen if the lotus node reboots
// and has no static manifest.
return nil
}
switch progress, err := f.api.F3GetProgress(ctx); {
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
case err != nil:
if backoff.Attempt() > ParticipationCheckProgressMaxAttempts {
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", backoff.Attempt(), "err", err)
return nil
}
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", backoff.Attempt(), "backoff", backoff.Duration(), "err", err)
time.Sleep(backoff.Duration())
case progress.ID+renewLeaseWithin >= lease.ToInstance():
log.Infof("F3 progressed (%d) to within %d instances of lease expiry (%d). Renewing participation.", progress.ID, renewLeaseWithin, lease.ToInstance())
return nil
default:
remainingInstanceLease := lease.ToInstance() - progress.ID
waitTime := time.Duration(remainingInstanceLease-renewLeaseWithin) * manifest.CatchUpAlignment
if waitTime == 0 {
waitTime = 100 * time.Millisecond
}
log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", remainingInstanceLease, waitTime)
time.Sleep(waitTime)
}
}
return ctx.Err()
}

func (f *F3Task) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
id := ids[0]
return &id, nil
}

func (f *F3Task) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Name: taskhelp.BackgroundTask("F3Participate"),
Cost: resources.Resources{
Cpu: 0,
Gpu: 0,
Ram: 10 << 20,
},
MaxFailures: 1,
}
}

func (f *F3Task) Adder(taskFunc harmonytask.AddTaskFunc) {
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
for minerAddress := range f.actors {
mid, err := address.IDFromAddress(address.Address(minerAddress))
if err != nil {
log.Errorw("failed to parse miner address", "miner", minerAddress, "error", err)
continue
}

taskFunc(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec("INSERT INTO f3_tasks (sp_id, task_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", mid, id)
if err != nil {
return false, err
}

return n > 0, nil
})
}
}

func (f *F3Task) GetSpid(db *harmonydb.DB, taskID int64) string {
var spId string
err := db.QueryRow(context.Background(), `SELECT sp_id FROM f3_tasks WHERE task_id = $1`, taskID).Scan(&spId)
if err != nil {
return ""
}
return spId
}

var _ = harmonytask.Reg(&F3Task{})
var _ harmonytask.TaskInterface = &F3Task{}