Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lmrpc: Decode DDO in SectorsStatus #102

Merged
merged 2 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/snadrus/must v0.0.0-20240605044437-98cedd57f8eb
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v2 v2.25.5
github.com/whyrusleeping/cbor-gen v0.1.1
github.com/yugabyte/pgx/v5 v5.5.3-yb-2
go.opencensus.io v0.24.0
go.uber.org/multierr v1.11.0
Expand Down Expand Up @@ -285,7 +286,6 @@ require (
github.com/valyala/fasttemplate v1.0.1 // indirect
github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba // indirect
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect
github.com/whyrusleeping/cbor-gen v0.1.1 // indirect
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
Expand Down
46 changes: 44 additions & 2 deletions market/fakelm/lmimpl.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package fakelm

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"net/http"
"net/url"

"github.com/gbrlsnchs/jwt/v3"
"github.com/google/uuid"
logging "github.com/ipfs/go-log/v2"
typegen "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand All @@ -23,12 +27,15 @@ import (
"github.com/filecoin-project/curio/market"

lapi "github.com/filecoin-project/lotus/api"
market2 "github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
lpiece "github.com/filecoin-project/lotus/storage/pipeline/piece"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

var log = logging.Logger("lmrpc")

type LMRPCProvider struct {
si paths.SectorIndex
full api.Chain
Expand Down Expand Up @@ -71,6 +78,7 @@ func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber,
var ssip []struct {
PieceCID *string `db:"piece_cid"`
DealID *int64 `db:"f05_deal_id"`
DDOPAM *string `db:"ddo_pam"`
Complete bool `db:"after_commit_msg_success"`
Failed bool `db:"failed"`
SDR bool `db:"after_sdr"`
Expand Down Expand Up @@ -101,6 +109,7 @@ func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber,
SELECT
mp.piece_cid,
mp.f05_deal_id,
mp.ddo_pam as ddo_pam,
cc.after_commit_msg_success,
cc.failed,
cc.after_sdr,
Expand All @@ -118,6 +127,7 @@ func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber,
SELECT
ip.piece_cid,
ip.f05_deal_id,
ip.direct_piece_activation_manifest as ddo_pam,
cc.after_commit_msg_success,
cc.failed,
cc.after_sdr,
Expand All @@ -135,6 +145,7 @@ func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber,
SELECT
ip.piece_cid,
NULL::bigint as f05_deal_id,
ip.direct_piece_activation_manifest as ddo_pam,
FALSE as after_commit_msg_success,
FALSE as failed,
FALSE AS after_sdr,
Expand All @@ -146,12 +157,13 @@ func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber,
INNER JOIN
CheckCommit cc ON ip.sp_id = cc.sp_id AND ip.sector_number = cc.sector_number
WHERE
cc.after_commit_msg IS FALSE
cc.after_commit_msg IS TRUE
),
FallbackPieces AS (
SELECT
op.piece_cid,
op.f05_deal_id,
op.direct_piece_activation_manifest as ddo_pam,
FALSE as after_commit_msg_success,
FALSE as failed,
FALSE as after_sdr,
Expand All @@ -176,10 +188,40 @@ func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber,
}

var deals []abi.DealID
var seenDealIDs = make(map[abi.DealID]struct{})

if len(ssip) > 0 {
for _, d := range ssip {
var dealID abi.DealID

if d.DealID != nil {
deals = append(deals, abi.DealID(*d.DealID))
dealID = abi.DealID(*d.DealID)
} else if d.DDOPAM != nil {
var pam miner.PieceActivationManifest
err := json.Unmarshal([]byte(*d.DDOPAM), &pam)
if err != nil {
return lapi.SectorInfo{}, err
}
if len(pam.Notify) != 1 {
continue
}
if pam.Notify[0].Address != market2.Address {
continue
}
maj, val, err := typegen.CborReadHeaderBuf(bytes.NewReader(pam.Notify[0].Payload), make([]byte, 9))
if err != nil {
return lapi.SectorInfo{}, err
}
if maj != typegen.MajUnsignedInt {
log.Errorw("deal id not an unsigned int", "maj", maj)
continue
}
dealID = abi.DealID(val)
}

if _, ok := seenDealIDs[dealID]; !ok {
deals = append(deals, dealID)
seenDealIDs[dealID] = struct{}{}
}
}
}
Expand Down