Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

f3: Update participate loop to always participate #302

Merged
merged 3 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ require (
github.com/ipfs/go-ipld-cbor v0.2.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
github.com/jpillora/backoff v1.0.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/libp2p/go-buffer-pool v0.1.0
github.com/manifoldco/promptui v0.9.0
Expand Down Expand Up @@ -206,6 +205,7 @@ require (
github.com/jessevdk/go-flags v1.4.0 // indirect
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/kilic/bls12-381 v0.1.1-0.20220929213557-ca162e8a70f4 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
Expand Down
119 changes: 34 additions & 85 deletions tasks/f3/f3_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ import (
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/jpillora/backoff"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/manifest"

"github.com/filecoin-project/curio/deps"
Expand Down Expand Up @@ -44,7 +42,6 @@ var log = logging.Logger("cf3")
type F3ParticipationAPI interface {
F3GetOrRenewParticipationTicket(ctx context.Context, minerID address.Address, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error) //perm:sign
F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error)
F3GetProgress(ctx context.Context) (gpbft.Instant, error)
F3GetManifest(ctx context.Context) (*manifest.Manifest, error)
}

Expand Down Expand Up @@ -94,31 +91,26 @@ func (f *F3Task) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done boo
return false, xerrors.Errorf("failed to get participation ticket: %w", err)
}

lease, participating, err := f.tryF3Participate(ctx, stillOwned, ticket)
if err != nil {
return false, xerrors.Errorf("failed to participate in F3: %w", err)
}
if !participating {
return false, xerrors.Errorf("failed to participate in F3: not participating")
}

// Store the ticket in the database
_, err = f.db.Exec(ctx, "UPDATE f3_tasks SET previous_ticket = $1 WHERE task_id = $2", ticket, taskID)
if err != nil {
return false, xerrors.Errorf("failed to update previous ticket: %w", err)
}

err = f.awaitLeaseExpiry(ctx, stillOwned, lease)
// Start participation loop
err = f.participateLoop(ctx, stillOwned, ticket)
if err != nil {
return false, xerrors.Errorf("failed to await lease expiry: %w", err)
return false, xerrors.Errorf("failed during participation loop: %w", err)
}
// When participateLoop returns, we go back to get a new ticket
}

return false, xerrors.Errorf("f3 task is background task")
}

func (f *F3Task) tryGetF3ParticipationTicket(ctx context.Context, stillOwned func() bool, participant address.Address, previousTicket []byte) (api.F3ParticipationTicket, error) {
for stillOwned() {
switch ticket, err := f.api.F3GetOrRenewParticipationTicket(ctx, participant, previousTicket, ParticipationLeaseTerm); {
switch ticket, err := f.api.F3GetOrRenewParticipationTicket(ctx, participant, previousTicket, f.leaseTerm); {
case ctx.Err() != nil:
return api.F3ParticipationTicket{}, ctx.Err()
case errors.Is(err, api.ErrF3Disabled):
Expand All @@ -136,99 +128,56 @@ func (f *F3Task) tryGetF3ParticipationTicket(ctx context.Context, stillOwned fun
return api.F3ParticipationTicket{}, ctx.Err()
}

func (f *F3Task) tryF3Participate(ctx context.Context, stillOwned func() bool, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, bool, error) {
func (f *F3Task) participateLoop(ctx context.Context, stillOwned func() bool, ticket api.F3ParticipationTicket) error {
renewLeaseWithin := f.leaseTerm / 2
var (
haveLease bool
)
for stillOwned() {
switch lease, err := f.api.F3Participate(ctx, ticket); {
lease, err := f.api.F3Participate(ctx, ticket)
switch {
case ctx.Err() != nil:
return api.F3ParticipationLease{}, false, ctx.Err()
return ctx.Err()
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot participate in F3 as it is disabled.", "err", err)
return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err)
return xerrors.Errorf("attempting F3 participation with ticket: %w", err)
case errors.Is(err, api.ErrF3ParticipationTicketExpired):
log.Warnw("F3 participation ticket expired while attempting to participate. Acquiring a new ticket.", "err", err)
return api.F3ParticipationLease{}, false, nil
return nil // Return to get a new ticket
case errors.Is(err, api.ErrF3ParticipationTicketStartBeforeExisting):
log.Warnw("F3 participation ticket starts before the existing lease. Acquiring a new ticket.", "err", err)
return api.F3ParticipationLease{}, false, nil
return nil // Return to get a new ticket
case errors.Is(err, api.ErrF3ParticipationTicketInvalid):
log.Errorw("F3 participation ticket is not valid. Acquiring a new ticket after backoff.", "err", err)
time.Sleep(1 * time.Second)
return api.F3ParticipationLease{}, false, nil
return nil // Return to get a new ticket
case errors.Is(err, api.ErrF3ParticipationIssuerMismatch):
log.Warnw("Node is not the issuer of F3 participation ticket. Miner maybe load-balancing or node has changed. Retrying F3 participation after backoff.", "err", err)
log.Warnw("Node is not the issuer of F3 participation ticket. Miner may be load-balancing or node has changed. Retrying F3 participation.", "err", err)
time.Sleep(1 * time.Second)
continue
case errors.Is(err, api.ErrF3NotReady):
log.Warnw("F3 is not ready. Retrying F3 participation after backoff.", "err", err)
log.Warnw("F3 is not ready. Retrying F3 participation.", "err", err)
time.Sleep(30 * time.Second)
continue
case err != nil:
log.Errorw("Unexpected error while attempting F3 participation. Retrying after backoff", "err", err)
return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err)
return xerrors.Errorf("attempting F3 participation with ticket: %w", err)
case lease.ValidityTerm <= renewLeaseWithin:
return nil // Return to get a new ticket
default:
log.Infow("Successfully acquired F3 participation lease.",
"issuer", lease.Issuer,
"not-before", lease.FromInstance,
"not-after", lease.ToInstance(),
)
return lease, true, nil
}
}
return api.F3ParticipationLease{}, false, ctx.Err()
}

func (f *F3Task) awaitLeaseExpiry(ctx context.Context, stillOwned func() bool, lease api.F3ParticipationLease) error {
backoff := &backoff.Backoff{
Min: 1 * time.Second,
Max: 1 * time.Minute,
Factor: 1.5,
}

renewLeaseWithin := f.leaseTerm / 2
for stillOwned() {
manifest, err := f.api.F3GetManifest(ctx)
switch {
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
case err != nil:
if backoff.Attempt() > ParticipationCheckProgressMaxAttempts {
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", backoff.Attempt(), "err", err)
return nil
}
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", backoff.Attempt(), "backoff", backoff.Duration(), "err", err)
time.Sleep(backoff.Duration())
continue
case manifest == nil || manifest.NetworkName != lease.Network:
// If we got an unexpected manifest, or no manifest, go back to the
// beginning and try to get another ticket. Switching from having a manifest
// to having no manifest can theoretically happen if the lotus node reboots
// and has no static manifest.
return nil
}
switch progress, err := f.api.F3GetProgress(ctx); {
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
case err != nil:
if backoff.Attempt() > ParticipationCheckProgressMaxAttempts {
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", backoff.Attempt(), "err", err)
return nil
// Successfully participated
if !haveLease {
log.Infow("Successfully acquired F3 participation lease.",
"issuer", lease.Issuer,
"not-before", lease.FromInstance,
"not-after", lease.ToInstance(),
)
haveLease = true
}
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", backoff.Attempt(), "backoff", backoff.Duration(), "err", err)
time.Sleep(backoff.Duration())
case progress.ID+renewLeaseWithin >= lease.ToInstance():
log.Infof("F3 progressed (%d) to within %d instances of lease expiry (%d). Renewing participation.", progress.ID, renewLeaseWithin, lease.ToInstance())
return nil
default:
remainingInstanceLease := lease.ToInstance() - progress.ID
waitTime := time.Duration(remainingInstanceLease-renewLeaseWithin) * manifest.CatchUpAlignment
if waitTime == 0 {
waitTime = 100 * time.Millisecond
}
log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", remainingInstanceLease, waitTime)
time.Sleep(waitTime)
}

log.Debugf("F3 participation lease is valid for further %d instances.", lease.ValidityTerm)
time.Sleep(time.Second * 5)
}
return ctx.Err()
magik6k marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down