Skip to content

Commit

Permalink
feat(shed): add commands for importing/exporting datastore snapshots (#…
Browse files Browse the repository at this point in the history
…12685)

This makes it possible to import/export arbitrary datastore snapshots.
  • Loading branch information
Stebalien authored Nov 8, 2024
1 parent 4eb45ca commit b89a298
Show file tree
Hide file tree
Showing 5 changed files with 440 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Implement new `lotus f3` CLI commands to list F3 participants, dump manifest, get/list finality certificates and check the F3 status. ([filecoin-project/lotus#12617](https://github.com/filecoin-project/lotus/pull/12617), [filecoin-project/lotus#12627](https://github.com/filecoin-project/lotus/pull/12627))
- Return a `"data"` field on the `"error"` returned from RPC when `eth_call` and `eth_estimateGas` APIs encounter `execution reverted` errors. ([filecoin-project/lotus#12553](https://github.com/filecoin-project/lotus/pull/12553))
- Implement `EthGetTransactionByBlockNumberAndIndex` (`eth_getTransactionByBlockNumberAndIndex`) and `EthGetTransactionByBlockHashAndIndex` (`eth_getTransactionByBlockHashAndIndex`) methods. ([filecoin-project/lotus#12618](https://github.com/filecoin-project/lotus/pull/12618))
- Add a set of `lotus-shed datastore` commands for importing, exporting, and clearing parts of the datastore ([filecoin-project/lotus#12685](https://github.com/filecoin-project/lotus/pull/12685)):

## Bug Fixes
- Fix a bug in the `lotus-shed indexes backfill-events` command that may result in either duplicate events being backfilled where there are existing events (such an operation *should* be idempotent) or events erroneously having duplicate `logIndex` values when queried via ETH APIs. ([filecoin-project/lotus#12567](https://github.com/filecoin-project/lotus/pull/12567))
Expand Down
277 changes: 277 additions & 0 deletions cmd/lotus-shed/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand All @@ -17,10 +18,12 @@ import (
"github.com/mitchellh/go-homedir"
"github.com/polydawn/refmt/cbor"
"github.com/urfave/cli/v2"
cbg "github.com/whyrusleeping/cbor-gen"
"go.uber.org/multierr"
"golang.org/x/xerrors"

lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/cmd/lotus-shed/shedgen"
"github.com/filecoin-project/lotus/lib/backupds"
"github.com/filecoin-project/lotus/node/repo"
)
Expand All @@ -34,6 +37,9 @@ var datastoreCmd = &cli.Command{
datastoreGetCmd,
datastoreRewriteCmd,
datastoreVlog2CarCmd,
datastoreImportCmd,
datastoreExportCmd,
datastoreClearCmd,
},
}

Expand Down Expand Up @@ -106,6 +112,98 @@ var datastoreListCmd = &cli.Command{
},
}

var datastoreClearCmd = &cli.Command{
Name: "clear",
Description: "Clear a part or all of the given datastore.",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo-type",
Usage: "node type (FullNode, StorageMiner, Worker, Wallet)",
Value: "FullNode",
},
&cli.StringFlag{
Name: "prefix",
Usage: "only delete key/values with the given prefix",
Value: "",
},
&cli.BoolFlag{
Name: "really-do-it",
Usage: "must be specified for the action to take effect",
},
},
ArgsUsage: "[namespace]",
Action: func(cctx *cli.Context) (_err error) {
if cctx.NArg() != 2 {
return xerrors.Errorf("requires 2 arguments: the datastore prefix")
}
namespace := cctx.Args().Get(0)

r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
}

exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}

lr, err := r.Lock(repo.NewRepoTypeFromString(cctx.String("repo-type")))
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck

ds, err := lr.Datastore(cctx.Context, namespace)
if err != nil {
return err
}
defer func() {
_err = multierr.Append(_err, ds.Close())
}()

dryRun := !cctx.Bool("really-do-it")

query, err := ds.Query(cctx.Context, dsq.Query{
Prefix: cctx.String("prefix"),
})
if err != nil {
return err
}
defer query.Close() //nolint:errcheck

batch, err := ds.Batch(cctx.Context)
if err != nil {
return xerrors.Errorf("failed to create a datastore batch: %w", err)
}

for res, ok := query.NextSync(); ok; res, ok = query.NextSync() {
if res.Error != nil {
return xerrors.Errorf("failed to read from datastore: %w", res.Error)
}
_, _ = fmt.Fprintf(cctx.App.Writer, "deleting: %q\n", res.Key)
if !dryRun {
if err := batch.Delete(cctx.Context, datastore.NewKey(res.Key)); err != nil {
return xerrors.Errorf("failed to delete %q: %w", res.Key, err)
}
}
}

if !dryRun {
if err := batch.Commit(cctx.Context); err != nil {
return xerrors.Errorf("failed to flush the batch: %w", err)
}
} else {
_, _ = fmt.Fprintln(cctx.App.Writer, "NOTE: dry run complete, re-run with --really-do-it to actually delete this state.")
}

return nil
},
}

var datastoreGetCmd = &cli.Command{
Name: "get",
Description: "list datastore keys",
Expand Down Expand Up @@ -158,6 +256,185 @@ var datastoreGetCmd = &cli.Command{
},
}

var datastoreExportCmd = &cli.Command{
Name: "export",
Description: "Export part or all of the specified datastore, appending to the specified datastore snapshot.",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo-type",
Usage: "node type (FullNode, StorageMiner, Worker, Wallet)",
Value: "FullNode",
},
&cli.StringFlag{
Name: "prefix",
Usage: "export only keys with the given prefix",
Value: "",
},
},
ArgsUsage: "[namespace filename]",
Action: func(cctx *cli.Context) (_err error) {
if cctx.NArg() != 2 {
return xerrors.Errorf("requires 2 arguments: the datastore prefix and the filename to which the snapshot will be written")
}
namespace := cctx.Args().Get(0)
fname := cctx.Args().Get(1)

snapshot, err := os.OpenFile(fname, os.O_WRONLY|os.O_APPEND|os.O_CREATE, os.ModePerm)
if err != nil {
return xerrors.Errorf("failed to open snapshot: %w", err)
}
defer func() {
_err = multierr.Append(_err, snapshot.Close())
}()

r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
}

exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}

lr, err := r.Lock(repo.NewRepoTypeFromString(cctx.String("repo-type")))
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck

ds, err := lr.Datastore(cctx.Context, namespace)
if err != nil {
return err
}
defer func() {
_err = multierr.Append(_err, ds.Close())
}()

query, err := ds.Query(cctx.Context, dsq.Query{
Prefix: cctx.String("prefix"),
})
if err != nil {
return err
}

bufWriter := bufio.NewWriter(snapshot)
snapshotWriter := cbg.NewCborWriter(bufWriter)
for res, ok := query.NextSync(); ok; res, ok = query.NextSync() {
if res.Error != nil {
return xerrors.Errorf("failed to read from datastore: %w", res.Error)
}

entry := shedgen.DatastoreEntry{
Key: []byte(res.Key),
Value: res.Value,
}

_, _ = fmt.Fprintf(cctx.App.Writer, "exporting: %q\n", res.Key)
if err := entry.MarshalCBOR(snapshotWriter); err != nil {
return xerrors.Errorf("failed to write %q to snapshot: %w", res.Key, err)
}
}
if err := bufWriter.Flush(); err != nil {
return xerrors.Errorf("failed to flush snapshot: %w", err)
}

return nil
},
}

var datastoreImportCmd = &cli.Command{
Name: "import",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo-type",
Usage: "node type (FullNode, StorageMiner, Worker, Wallet)",
Value: "FullNode",
},
},
Description: "Import the specified datastore snapshot.",
ArgsUsage: "[namespace filename]",
Action: func(cctx *cli.Context) (_err error) {
if cctx.NArg() != 2 {
return xerrors.Errorf("requires 2 arguments: the datastore prefix and the filename of the snapshot to import")
}
namespace := cctx.Args().Get(0)
fname := cctx.Args().Get(1)

snapshot, err := os.Open(fname)
if err != nil {
return xerrors.Errorf("failed to open snapshot: %w", err)
}
defer snapshot.Close() //nolint:errcheck

r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
}

exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}

lr, err := r.Lock(repo.NewRepoTypeFromString(cctx.String("repo-type")))
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck

ds, err := lr.Datastore(cctx.Context, namespace)
if err != nil {
return err
}
defer func() {
_err = multierr.Append(_err, ds.Close())
}()

batch, err := ds.Batch(cctx.Context)
if err != nil {
return err
}

dryRun := !cctx.Bool("really-do-it")

snapshotReader := cbg.NewCborReader(bufio.NewReader(snapshot))
for {
var entry shedgen.DatastoreEntry
if err := entry.UnmarshalCBOR(snapshotReader); err != nil {
if errors.Is(err, io.EOF) {
break
}
return xerrors.Errorf("failed to read entry from snapshot: %w", err)
}

_, _ = fmt.Fprintf(cctx.App.Writer, "importing: %q\n", string(entry.Key))

if !dryRun {
key := datastore.NewKey(string(entry.Key))
if err := batch.Put(cctx.Context, key, entry.Value); err != nil {
return xerrors.Errorf("failed to put %q: %w", key, err)
}
}
}

if !dryRun {
if err := batch.Commit(cctx.Context); err != nil {
return xerrors.Errorf("failed to commit batch: %w", err)
}
} else {
_, _ = fmt.Fprintln(cctx.App.Writer, "NOTE: dry run complete, re-run with --really-do-it to actually import the datastore snapshot, overwriting any conflicting state.")
}
return nil
},
}

var datastoreBackupCmd = &cli.Command{
Name: "backup",
Description: "manage datastore backups",
Expand Down
Loading

0 comments on commit b89a298

Please sign in to comment.