From 2502aad8df9568bea97a3581811de2bdc6672821 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 10 Oct 2024 15:50:38 +1100 Subject: [PATCH] feat(events): compare-amt option for lotus-shed indexes inspect-events Instead of relying just on entry counts, compare the regenerated AMT root using just what we have in the db with the message receipt event root. This should tell us precisely that we have what we should or not. Ref: https://github.com/filecoin-project/lotus/issues/12570 --- CHANGELOG.md | 3 +- cmd/lotus-shed/indexes.go | 279 ++++++++++++++++++++++++++++++++------ 2 files changed, 241 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fa4de71bda..83347a961a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Reduce size of embedded genesis CAR files by removing WASM actor blocks and compressing with zstd. This reduces the `lotus` binary size by approximately 10 MiB. ([filecoin-project/lotus#12439](https://github.com/filecoin-project/lotus/pull/12439)) - Add ChainSafe operated Calibration archival node to the bootstrap list ([filecoin-project/lotus#12517](https://github.com/filecoin-project/lotus/pull/12517)) - Fix hotloop in F3 pariticpation API ([filecoin-project/lotus#12575](https://github.com/filecoin-project/lotus/pull/12575)) +- `lotus-shed indexes inspect-indexes` now performs a comprehensive comparison of the event index data for each message by comparing the AMT root CID from the message receipt with the root of a reconstructed AMT. Previously `inspect-indexes` simply compared event counts, comparing AMT roots confirms all the event data is byte-perfect. ([filecoin-project/lotus#12570](https://github.com/filecoin-project/lotus/pull/12570)) ## Bug Fixes - Fix a bug in the `lotus-shed indexes backfill-events` command that may result in either duplicate events being backfilled where there are existing events (such an operation *should* be idempotent) or events erroneously having duplicate `logIndex` values when queried via ETH APIs. ([filecoin-project/lotus#12567](https://github.com/filecoin-project/lotus/pull/12567)) @@ -14,8 +15,6 @@ ## Improvements -## Improvements - ## Deps # UNRELEASED Node v1.30.0 diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index 4d8e76e1382..9b9181b4be6 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -10,16 +10,21 @@ import ( "strings" "time" + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" "github.com/mitchellh/go-homedir" "github.com/urfave/cli/v2" + cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + amt4 "github.com/filecoin-project/go-amt-ipld/v4" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/exitcode" lapi "github.com/filecoin-project/lotus/api" + bstore "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/ethtypes" lcli "github.com/filecoin-project/lotus/cli" @@ -34,6 +39,10 @@ const ( insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` tipsetSeen = `SELECT height,reverted FROM events_seen WHERE tipset_key_cid=?` + + // these queries are used to extract just the information used to reconstruct the event AMT from the database + selectEventIdAndEmitter = `SELECT id, emitter_addr FROM event WHERE tipset_key_cid=? and message_cid=? ORDER BY event_index ASC` + selectEventEntries = `SELECT flags, key, codec, value FROM event_entry WHERE event_id=? ORDER BY _rowid_ ASC` ) func withCategory(cat string, cmd *cli.Command) *cli.Command { @@ -483,46 +492,108 @@ var inspectEventsCmd = &cli.Command{ if err != nil { return err } + stmtSelectEventIdAndEmitter, err := db.Prepare(selectEventIdAndEmitter) + if err != nil { + return err + } + stmtSelectEventEntries, err := db.Prepare(selectEventEntries) + if err != nil { + return err + } - processHeight := func(ctx context.Context, ts *types.TipSet, receipts []*types.MessageReceipt) error { - tsKeyCid, err := ts.Key().Cid() + processHeight := func(ctx context.Context, messages []lapi.Message, receipts []*types.MessageReceipt) error { + tsKeyCid, err := currTs.Key().Cid() if err != nil { - return fmt.Errorf("failed to get tipset key cid: %w", err) + return xerrors.Errorf("failed to get tipset key cid: %w", err) } - var expectEvents int - var expectEntries int + var problems []string + var hasEvents bool - for _, receipt := range receipts { - if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil { - continue + checkEventAndEntryCounts := func() error { + // compare by counting events, using ChainGetEvents to load the events from the chain + expectEvents, expectEntries, err := chainEventAndEntryCountsAt(ctx, currTs, receipts, api) + if err != nil { + return err } - events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot) + if expectEvents > 0 { + hasEvents = true + } + + actualEvents, actualEntries, err := dbEventAndEntryCountsAt(currTs, stmtEventCount, stmtEntryCount) if err != nil { - return fmt.Errorf("failed to load events for tipset %s: %w", currTs, err) + return err } - expectEvents += len(events) - for _, event := range events { - expectEntries += len(event.Entries) + + if actualEvents != expectEvents { + problems = append(problems, fmt.Sprintf("expected %d events, got %d", expectEvents, actualEvents)) + } + if actualEntries != expectEntries { + problems = append(problems, fmt.Sprintf("expected %d entries, got %d", expectEntries, actualEntries)) } + + return nil } - var problems []string + // Compare the AMT roots: we reconstruct the event AMT from the database data we have and + // compare it with the on-chain AMT root from the receipt. If it's the same CID then we have + // exactly the same event data. Any variation, in number of events, and even a single byte + // in event data, will be considered a mismatch. + + // cache for address -> actorID because it's typical for tipsets to generate many events for + // the same actors so we can try and avoid too many StateLookupID calls + addrIdCache := make(map[address.Address]abi.ActorID) + + eventIndex := 0 + for msgIndex, receipt := range receipts { + if receipt.EventsRoot == nil { + continue + } + + amtRoot, has, problem, err := amtRootForEvents( + ctx, + api, + tsKeyCid, + prevTs.Key(), + stmtSelectEventIdAndEmitter, + stmtSelectEventEntries, + messages[msgIndex], + addrIdCache, + ) + if err != nil { + return err + } + if has && !hasEvents { + hasEvents = true + } + + if problem != "" { + problems = append(problems, problem) + } else if amtRoot != *receipt.EventsRoot { + problems = append(problems, fmt.Sprintf("events root mismatch for event %d", eventIndex)) + // also provide more information about the mismatch + if err := checkEventAndEntryCounts(); err != nil { + return err + } + } + + eventIndex++ + } var seenHeight int var seenReverted int if err := stmtTipsetSeen.QueryRow(tsKeyCid.Bytes()).Scan(&seenHeight, &seenReverted); err != nil { if err == sql.ErrNoRows { - if expectEvents > 0 { + if hasEvents { problems = append(problems, "not in events_seen table") } else { problems = append(problems, "zero-event epoch not in events_seen table") } } else { - return fmt.Errorf("failed to check if tipset is seen: %w", err) + return xerrors.Errorf("failed to check if tipset is seen: %w", err) } } else { - if seenHeight != int(ts.Height()) { + if seenHeight != int(currTs.Height()) { problems = append(problems, fmt.Sprintf("events_seen height mismatch (%d)", seenHeight)) } if seenReverted != 0 { @@ -530,40 +601,36 @@ var inspectEventsCmd = &cli.Command{ } } - var actualEvents int - if err := stmtEventCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEvents); err != nil { - return fmt.Errorf("failed to count events for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err) - } - var actualEntries int - if err := stmtEntryCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEntries); err != nil { - return fmt.Errorf("failed to count entries for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err) - } - - if actualEvents != expectEvents { - problems = append(problems, fmt.Sprintf("expected %d events, got %d", expectEvents, actualEvents)) - } - if actualEntries != expectEntries { - problems = append(problems, fmt.Sprintf("expected %d entries, got %d", expectEntries, actualEntries)) - } - if len(problems) > 0 { - _, _ = fmt.Fprintf(cctx.App.Writer, "✗ Epoch %d (%s): %s\n", ts.Height(), tsKeyCid, problems) + _, _ = fmt.Fprintf(cctx.App.Writer, "✗ Epoch %d (%s): %s\n", currTs.Height(), tsKeyCid, strings.Join(problems, ", ")) } else if logGood { - _, _ = fmt.Fprintf(cctx.App.Writer, "✓ Epoch %d (%s)\n", ts.Height(), tsKeyCid) + _, _ = fmt.Fprintf(cctx.App.Writer, "✓ Epoch %d (%s)\n", currTs.Height(), tsKeyCid) } return nil } for i := 0; ctx.Err() == nil && i < epochs; i++ { - // get receipts for the parent of the previous tipset (which will be currTs) - receipts, err := api.ChainGetParentReceipts(ctx, prevTs.Blocks()[0].Cid()) + // get receipts and messages for the parent of the previous tipset (which will be currTs) + + blockCid := prevTs.Blocks()[0].Cid() + + messages, err := api.ChainGetParentMessages(ctx, blockCid) + if err != nil { + _, _ = fmt.Fprintf(cctx.App.ErrWriter, "Missing parent messages for epoch %d (checked %d epochs)", prevTs.Height(), i) + break + } + receipts, err := api.ChainGetParentReceipts(ctx, blockCid) if err != nil { _, _ = fmt.Fprintf(cctx.App.ErrWriter, "Missing parent receipts for epoch %d (checked %d epochs)", prevTs.Height(), i) break } - err = processHeight(ctx, currTs, receipts) + if len(messages) != len(receipts) { + return fmt.Errorf("mismatched in message and receipt count: %d != %d", len(messages), len(receipts)) + } + + err = processHeight(ctx, messages, receipts) if err != nil { return err } @@ -572,7 +639,7 @@ var inspectEventsCmd = &cli.Command{ prevTs = currTs currTs, err = api.ChainGetTipSet(ctx, currTs.Parents()) if err != nil { - return fmt.Errorf("failed to load tipset %s: %w", currTs, err) + return xerrors.Errorf("failed to load tipset %s: %w", currTs, err) } } @@ -580,6 +647,140 @@ var inspectEventsCmd = &cli.Command{ }, } +// amtRootForEvents generates the events AMT root CID for a given message's events, and returns +// whether the message has events, a string describing any non-fatal problem encountered, +// and a fatal error if one occurred. +func amtRootForEvents( + ctx context.Context, + api lapi.FullNode, + tsKeyCid cid.Cid, + prevTsKey types.TipSetKey, + stmtSelectEventIdAndEmitter, stmtSelectEventEntries *sql.Stmt, + message lapi.Message, + addrIdCache map[address.Address]abi.ActorID, +) (cid.Cid, bool, string, error) { + + events := make([]cbg.CBORMarshaler, 0) + + rows, err := stmtSelectEventIdAndEmitter.Query(tsKeyCid.Bytes(), message.Cid.Bytes()) + if err != nil { + return cid.Undef, false, "", xerrors.Errorf("failed to query events: %w", err) + } + defer func() { + _ = rows.Close() + }() + + for rows.Next() { + var eventId int + var emitterAddr []byte + if err := rows.Scan(&eventId, &emitterAddr); err != nil { + return cid.Undef, false, "", xerrors.Errorf("failed to scan row: %w", err) + } + + addr, err := address.NewFromBytes(emitterAddr) + if err != nil { + return cid.Undef, false, "", xerrors.Errorf("failed to parse address: %w", err) + } + var actorId abi.ActorID + if id, ok := addrIdCache[addr]; ok { + actorId = id + } else { + if addr.Protocol() != address.ID { + // use the previous tipset (height+1) to do an address lookup because the actor + // may have been created in the current tipset (i.e. deferred execution means the + // changed state isn't available until the next epoch) + idAddr, err := api.StateLookupID(ctx, addr, prevTsKey) + if err != nil { + // TODO: fix this? we should be able to resolve all addresses + return cid.Undef, false, fmt.Sprintf("failed to resolve address (%s), could not compare amt", addr.String()), nil + } + addr = idAddr + } + id, err := address.IDFromAddress(addr) + if err != nil { + return cid.Undef, false, "", xerrors.Errorf("failed to get ID from address: %w", err) + } + actorId = abi.ActorID(id) + addrIdCache[addr] = actorId + } + + event := types.Event{ + Emitter: actorId, + Entries: make([]types.EventEntry, 0), + } + + rows2, err := stmtSelectEventEntries.Query(eventId) + if err != nil { + return cid.Undef, false, "", xerrors.Errorf("failed to query event entries: %w", err) + } + defer func() { + _ = rows2.Close() + }() + + for rows2.Next() { + var flags []byte + var key string + var codec uint64 + var value []byte + if err := rows2.Scan(&flags, &key, &codec, &value); err != nil { + return cid.Undef, false, "", xerrors.Errorf("failed to scan row: %w", err) + } + entry := types.EventEntry{ + Flags: flags[0], + Key: key, + Codec: codec, + Value: value, + } + event.Entries = append(event.Entries, entry) + } + + events = append(events, &event) + } + + // construct the AMT from our slice to an in-memory IPLD store just so we can get the root, + // we don't need the blocks themselves + root, err := amt4.FromArray(ctx, cbor.NewCborStore(bstore.NewMemory()), events, amt4.UseTreeBitWidth(types.EventAMTBitwidth)) + if err != nil { + return cid.Undef, false, "", xerrors.Errorf("failed to create AMT: %w", err) + } + return root, len(events) > 0, "", nil +} + +func chainEventAndEntryCountsAt(ctx context.Context, ts *types.TipSet, receipts []*types.MessageReceipt, api lapi.FullNode) (int, int, error) { + var expectEvents int + var expectEntries int + for _, receipt := range receipts { + if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil { + continue + } + events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot) + if err != nil { + return 0, 0, xerrors.Errorf("failed to load events for tipset %s: %w", ts, err) + } + expectEvents += len(events) + for _, event := range events { + expectEntries += len(event.Entries) + } + } + return expectEvents, expectEntries, nil +} + +func dbEventAndEntryCountsAt(ts *types.TipSet, stmtEventCount, stmtEntryCount *sql.Stmt) (int, int, error) { + tsKeyCid, err := ts.Key().Cid() + if err != nil { + return 0, 0, xerrors.Errorf("failed to get tipset key cid: %w", err) + } + var actualEvents int + if err := stmtEventCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEvents); err != nil { + return 0, 0, xerrors.Errorf("failed to count events for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err) + } + var actualEntries int + if err := stmtEntryCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEntries); err != nil { + return 0, 0, xerrors.Errorf("failed to count entries for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err) + } + return actualEvents, actualEntries, nil +} + var backfillMsgIndexCmd = &cli.Command{ Name: "backfill-msgindex", Usage: "Backfill the msgindex.db for a number of epochs starting from a specified height",