Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wdpost skip / race issues #31

Merged
merged 2 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions cmd/curio/proving.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"github.com/filecoin-project/curio/cmd/curio/tasks"
"github.com/filecoin-project/curio/deps"
"github.com/yugabyte/pgx/v5"
"os"
"time"

Expand Down Expand Up @@ -110,7 +111,11 @@ var wdPostTaskCmd = &cli.Command{
return xerrors.Errorf("writing SQL transaction: %w", err)
}
fmt.Printf("Inserted task %v. Waiting for success ", taskId)

var result sql.NullString
var lastHistID *int64
prevFound := true

for {
time.Sleep(time.Second)
err = deps.DB.QueryRow(ctx, `SELECT result FROM harmony_test WHERE task_id=$1`, taskId).Scan(&result)
Expand All @@ -121,6 +126,40 @@ var wdPostTaskCmd = &cli.Command{
break
}
fmt.Print(".")
magik6k marked this conversation as resolved.
Show resolved Hide resolved

{
// look at history
var histID *int64
var errmsg sql.NullString
err = deps.DB.QueryRow(ctx, `SELECT id, result, err FROM harmony_task_history WHERE task_id=$1 ORDER BY work_end DESC LIMIT 1`, taskId).Scan(&histID, &result, &errmsg)
if err != nil && err != pgx.ErrNoRows {
return xerrors.Errorf("reading result from harmony_task_history: %w", err)
}

if err == nil && histID != nil && (lastHistID == nil || *histID != *lastHistID) {
fmt.Println()
var errstr string
if errmsg.Valid {
errstr = errmsg.String
}
fmt.Printf("History %d: %s\n", *histID, errstr)
lastHistID = histID
}
}

{
// look for fails
var found bool
err = deps.DB.QueryRow(ctx, `SELECT true FROM harmony_task WHERE id=$1`, taskId).Scan(&found)
if err != nil && err != pgx.ErrNoRows {
return xerrors.Errorf("reading result from harmony_task: %w", err)
}

if !found && !prevFound {
return xerrors.Errorf("task %d was not found in harmony_task, likely out of retries", taskId)
}
prevFound = found
}
}
fmt.Println()
log.Infof("Result: %s", result.String)
Expand Down
58 changes: 45 additions & 13 deletions tasks/window/compute_do.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"github.com/filecoin-project/curio/lib/ffiselect"
"github.com/samber/lo"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -72,7 +73,7 @@ func (t *WdPostTask) DoPartition(ctx context.Context, ts *types.TipSet, maddr ad
Proofs: nil,
}

var partitions []miner2.PoStPartition
var postPartition miner2.PoStPartition
var xsinfos []proof7.ExtendedSectorInfo

{
Expand Down Expand Up @@ -131,10 +132,10 @@ func (t *WdPostTask) DoPartition(ctx context.Context, ts *types.TipSet, maddr ad
}

xsinfos = append(xsinfos, ssi...)
partitions = append(partitions, miner2.PoStPartition{
postPartition = miner2.PoStPartition{
Index: partIdx,
Skipped: skipped,
})
}

log.Infow("running window post",
"chain-random", rand,
Expand All @@ -159,9 +160,9 @@ func (t *WdPostTask) DoPartition(ctx context.Context, ts *types.TipSet, maddr ad
return nil, xerrors.Errorf("failed to get window post type: %w", err)
}

postOut, ps, err := t.generateWindowPoSt(ctx, ppt, abi.ActorID(mid), xsinfos, append(abi.PoStRandomness{}, rand...))
postOut, computeSkipped, err := t.generateWindowPoSt(ctx, ppt, abi.ActorID(mid), xsinfos, append(abi.PoStRandomness{}, rand...))
elapsed := time.Since(tsStart)
log.Infow("computing window post", "partition", partIdx, "elapsed", elapsed, "skip", len(ps), "err", err)
log.Infow("computing window post", "partition", partIdx, "elapsed", elapsed, "skip", len(computeSkipped), "err", err)
if err != nil {
log.Errorf("error generating window post: %s", err)
}
Expand Down Expand Up @@ -190,32 +191,61 @@ func (t *WdPostTask) DoPartition(ctx context.Context, ts *types.TipSet, maddr ad
return nil, xerrors.Errorf("post generation randomness was different from random beacon")
}

// computeSkipped is a list of sector numbers that were skipped during PoSt computation
for _, skippedNum := range computeSkipped {
magik6k marked this conversation as resolved.
Show resolved Hide resolved
// set postPartition.Skipped bitfield entries to also contain sectors skipped during PoSt computation
// (initially it contains a list of sectors skipped during pre-checks)
postPartition.Skipped.Set(uint64(skippedNum.Number))
}

// Compute SectorsInfo list for proof verification, matching the logic in the miner actor
sinfos := make([]proof7.SectorInfo, len(xsinfos))

// skipped sector infos need to be filled with the first non-skipped sector info so that the offsets
// of the non-skipped sectors are correct. This no longer matters for PoSt verification in proofs, but
// in any case we want this logic to match the miner actor's PoSt verification logic as closely as possible.
var firstStandIn proof7.SectorInfo // https://github.com/filecoin-project/builtin-actors/blob/ea7c45478751bd0fe12d0d374abc8fdc9341bfea/actors/miner/src/sectors.rs#L112

for i, xsi := range xsinfos {
sinfos[i] = proof7.SectorInfo{
if lo.Contains(computeSkipped, abi.SectorID{Miner: abi.ActorID(mid), Number: xsi.SectorNumber}) {
// a stand-in will be added in the next loop. We don't do that here because in the first few iterations
// we may not know which sector will be the first non-skipped one.
continue
}

si := proof7.SectorInfo{
SealProof: xsi.SealProof,
SectorNumber: xsi.SectorNumber,
SealedCID: xsi.SealedCID,
}
sinfos[i] = si
if firstStandIn.SealedCID == cid.Undef {
firstStandIn = si
}
}

// fill in skipped sector infos with the first non-skipped sector info
for i := range sinfos {
if sinfos[i].SealedCID == cid.Undef {
sinfos[i] = firstStandIn
}
}

// Verify the PoSt proof!
if correct, err := t.verifier.VerifyWindowPoSt(ctx, proof.WindowPoStVerifyInfo{
Randomness: abi.PoStRandomness(checkRand),
Proofs: postOut,
ChallengedSectors: sinfos,
Prover: abi.ActorID(mid),
}); err != nil { // revive:disable-line:empty-block
/*log.Errorw("window post verification failed", "post", postOut, "error", err)
time.Sleep(5 * time.Second)
continue todo retry loop */
return nil, xerrors.Errorf("failed to verify window post: %w", err)
} else if !correct {
_ = correct
/*log.Errorw("generated incorrect window post proof", "post", postOut, "error", err)
continue todo retry loop*/
return nil, xerrors.Errorf("window post verification failed: proof was invalid")
}

// Proof generation successful, stop retrying
//somethingToProve = true
params.Partitions = partitions
params.Partitions = []miner2.PoStPartition{postPartition}
params.Proofs = postOut
//break

Expand Down Expand Up @@ -463,6 +493,8 @@ func (t *WdPostTask) GenerateWindowPoStAdv(ctx context.Context, ppt abi.Register

go func(i int, s storiface.PostSectorChallenge) {
defer wg.Done()
ctx := ctx

if t.parallel != nil {
defer func() {
<-t.parallel
Expand Down
3 changes: 2 additions & 1 deletion tasks/window/compute_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
"partition": partIdx,
"submit_at_epoch": deadline.Open,
"submit_by_epoch": deadline.Close,
"post_out": postOut,
"proof_params": msgbuf.Bytes(),
}, "", " ")
if err != nil {
Expand Down Expand Up @@ -368,7 +369,7 @@ func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Name: "WdPost",
Max: t.max,
MaxFailures: 3,
MaxFailures: 5,
Follows: nil,
Cost: resources.Resources{
Cpu: 1,
Expand Down
2 changes: 1 addition & 1 deletion web/hapi/watch_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (a *app) updateActor(ctx context.Context) error {
api := a.workingApi
a.rpcInfoLk.Unlock()

stor := store.ActorStore(ctx, blockstore.NewReadCachedBlockstore(blockstore.NewAPIBlockstore(a.workingApi), ChainBlockCache))
stor := store.ActorStore(ctx, blockstore.NewReadCachedBlockstore(blockstore.NewAPIBlockstore(api), ChainBlockCache))

if api == nil {
if time.Since(startedAt) > time.Second*10 {
Expand Down