From 8fb47b368cd408f6a07df253b1e2afbb4ed9aea7 Mon Sep 17 00:00:00 2001 From: Hieu Vu <72878483+hieuvubk@users.noreply.github.com> Date: Tue, 6 Aug 2024 22:28:30 +0700 Subject: [PATCH] feat(server/v2): Add snapshots commands (#21065) --- server/v2/store/commands.go | 16 +- server/v2/store/flags.go | 5 +- server/v2/store/server.go | 26 +- server/v2/store/snapshot.go | 408 +++++++++++++++++++++++++++++++ simapp/v2/simdv2/cmd/commands.go | 4 +- simapp/v2/simdv2/cmd/testnet.go | 4 +- store/v2/root/factory.go | 4 +- 7 files changed, 432 insertions(+), 35 deletions(-) create mode 100644 server/v2/store/snapshot.go diff --git a/server/v2/store/commands.go b/server/v2/store/commands.go index dc44892f3b89..5de696a8d307 100644 --- a/server/v2/store/commands.go +++ b/server/v2/store/commands.go @@ -44,12 +44,8 @@ Supported app-db-backend types include 'goleveldb', 'rocksdb', 'pebbledb'.`, } logger := log.NewLogger(cmd.OutOrStdout()) - home, err := cmd.Flags().GetString(serverv2.FlagHome) - if err != nil { - return err - } - rootStore, keepRecent, err := createRootStore(cmd, home, vp, logger) + rootStore, keepRecent, err := createRootStore(cmd, vp, logger) if err != nil { return fmt.Errorf("can not create root store %w", err) } @@ -78,14 +74,14 @@ Supported app-db-backend types include 'goleveldb', 'rocksdb', 'pebbledb'.`, } cmd.Flags().String(FlagAppDBBackend, "", "The type of database for application and snapshots databases") - cmd.Flags().Uint64(FlagPruningKeepRecent, 0, "Number of recent heights to keep on disk (ignored if pruning is not 'custom')") + cmd.Flags().Uint64(FlagKeepRecent, 0, "Number of recent heights to keep on disk (ignored if pruning is not 'custom')") return cmd } -func createRootStore(cmd *cobra.Command, rootDir string, v *viper.Viper, logger log.Logger) (storev2.RootStore, uint64, error) { +func createRootStore(cmd *cobra.Command, v *viper.Viper, logger log.Logger) (storev2.RootStore, uint64, error) { tempViper := v - + rootDir := v.GetString(serverv2.FlagHome) // handle FlagAppDBBackend var dbType db.DBType if cmd.Flags().Changed(FlagAppDBBackend) { @@ -103,8 +99,8 @@ func createRootStore(cmd *cobra.Command, rootDir string, v *viper.Viper, logger } // handle KeepRecent & Interval flags - if cmd.Flags().Changed(FlagPruningKeepRecent) { - keepRecent, err := cmd.Flags().GetUint64(FlagPruningKeepRecent) + if cmd.Flags().Changed(FlagKeepRecent) { + keepRecent, err := cmd.Flags().GetUint64(FlagKeepRecent) if err != nil { return nil, 0, err } diff --git a/server/v2/store/flags.go b/server/v2/store/flags.go index 9bf1af5282f4..19b917a9d9ee 100644 --- a/server/v2/store/flags.go +++ b/server/v2/store/flags.go @@ -1,6 +1,7 @@ package store const ( - FlagAppDBBackend = "app-db-backend" - FlagPruningKeepRecent = "keep-recent" + FlagAppDBBackend = "app-db-backend" + FlagKeepRecent = "keep-recent" + FlagInterval = "interval" ) diff --git a/server/v2/store/server.go b/server/v2/store/server.go index 78c4cd759cd9..1177c6a0c414 100644 --- a/server/v2/store/server.go +++ b/server/v2/store/server.go @@ -16,10 +16,12 @@ import ( // and contains prune & snapshot commands type StoreComponent[T transaction.Tx] struct { config *Config + // saving appCreator for only RestoreSnapshotCmd + appCreator serverv2.AppCreator[T] } -func New[T transaction.Tx]() *StoreComponent[T] { - return &StoreComponent[T]{} +func New[T transaction.Tx](appCreator serverv2.AppCreator[T]) *StoreComponent[T] { + return &StoreComponent[T]{appCreator: appCreator} } func (s *StoreComponent[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger log.Logger) error { @@ -45,24 +47,16 @@ func (s *StoreComponent[T]) Stop(ctx context.Context) error { return nil } -func (s *StoreComponent[T]) GetCommands() []*cobra.Command { - return []*cobra.Command{ - s.PrunesCmd(), - } -} - -func (s *StoreComponent[T]) GetTxs() []*cobra.Command { - return nil -} - -func (s *StoreComponent[T]) GetQueries() []*cobra.Command { - return nil -} - func (s *StoreComponent[T]) CLICommands() serverv2.CLIConfig { return serverv2.CLIConfig{ Commands: []*cobra.Command{ s.PrunesCmd(), + s.ExportSnapshotCmd(), + s.DeleteSnapshotCmd(), + s.ListSnapshotsCmd(), + s.DumpArchiveCmd(), + s.LoadArchiveCmd(), + s.RestoreSnapshotCmd(s.appCreator), }, } } diff --git a/server/v2/store/snapshot.go b/server/v2/store/snapshot.go new file mode 100644 index 000000000000..3289d94f5e7d --- /dev/null +++ b/server/v2/store/snapshot.go @@ -0,0 +1,408 @@ +package store + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "reflect" + "strconv" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + + "cosmossdk.io/log" + serverv2 "cosmossdk.io/server/v2" + storev2 "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/snapshots" + "cosmossdk.io/store/v2/snapshots/types" +) + +const SnapshotFileName = "_snapshot" + +// QueryBlockResultsCmd implements the default command for a BlockResults query. +func (s *StoreComponent[T]) ExportSnapshotCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "export", + Short: "Export app state to snapshot store", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + v := serverv2.GetViperFromCmd(cmd) + + height, err := cmd.Flags().GetInt64("height") + if err != nil { + return err + } + + logger := log.NewLogger(cmd.OutOrStdout()) + // app := appCreator(logger, db, nil, viper) + rootStore, _, err := createRootStore(cmd, v, logger) + if err != nil { + return err + } + if height == 0 { + lastCommitId, err := rootStore.LastCommitID() + if err != nil { + return err + } + height = int64(lastCommitId.Version) + } + + cmd.Printf("Exporting snapshot for height %d\n", height) + + sm, err := createSnapshotsManager(cmd, v, logger, rootStore) + if err != nil { + return err + } + + snapshot, err := sm.Create(uint64(height)) + if err != nil { + return err + } + + cmd.Printf("Snapshot created at height %d, format %d, chunks %d\n", snapshot.Height, snapshot.Format, snapshot.Chunks) + return nil + }, + } + + addSnapshotFlagsToCmd(cmd) + cmd.Flags().Int64("height", 0, "Height to export, default to latest state height") + + return cmd +} + +// RestoreSnapshotCmd returns a command to restore a snapshot +func (s *StoreComponent[T]) RestoreSnapshotCmd(newApp serverv2.AppCreator[T]) *cobra.Command { + cmd := &cobra.Command{ + Use: "restore ", + Short: "Restore app state from local snapshot", + Long: "Restore app state from local snapshot", + Args: cobra.ExactArgs(2), + RunE: func(cmd *cobra.Command, args []string) error { + v := serverv2.GetViperFromCmd(cmd) + + height, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + return err + } + format, err := strconv.ParseUint(args[1], 10, 32) + if err != nil { + return err + } + + logger := log.NewLogger(cmd.OutOrStdout()) + app := newApp(logger, v) + rootStore := app.GetStore().(storev2.RootStore) + + sm, err := createSnapshotsManager(cmd, v, logger, rootStore) + if err != nil { + return err + } + + return sm.RestoreLocalSnapshot(height, uint32(format)) + }, + } + + addSnapshotFlagsToCmd(cmd) + + return cmd +} + +// ListSnapshotsCmd returns the command to list local snapshots +func (s *StoreComponent[T]) ListSnapshotsCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List local snapshots", + RunE: func(cmd *cobra.Command, args []string) error { + v := serverv2.GetViperFromCmd(cmd) + snapshotStore, err := snapshots.NewStore(filepath.Join(v.GetString(serverv2.FlagHome), "data", "snapshots")) + if err != nil { + return err + } + snapshots, err := snapshotStore.List() + if err != nil { + return fmt.Errorf("failed to list snapshots: %w", err) + } + for _, snapshot := range snapshots { + cmd.Println("height:", snapshot.Height, "format:", snapshot.Format, "chunks:", snapshot.Chunks) + } + + return nil + }, + } + + return cmd +} + +// DeleteSnapshotCmd returns the command to delete a local snapshot +func (s *StoreComponent[T]) DeleteSnapshotCmd() *cobra.Command { + return &cobra.Command{ + Use: "delete ", + Short: "Delete a local snapshot", + Args: cobra.ExactArgs(2), + RunE: func(cmd *cobra.Command, args []string) error { + v := serverv2.GetViperFromCmd(cmd) + + height, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + return err + } + format, err := strconv.ParseUint(args[1], 10, 32) + if err != nil { + return err + } + + snapshotStore, err := snapshots.NewStore(filepath.Join(v.GetString(serverv2.FlagHome), "data", "snapshots")) + if err != nil { + return err + } + + return snapshotStore.Delete(height, uint32(format)) + }, + } +} + +// DumpArchiveCmd returns a command to dump the snapshot as portable archive format +func (s *StoreComponent[T]) DumpArchiveCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "dump ", + Short: "Dump the snapshot as portable archive format", + Args: cobra.ExactArgs(2), + RunE: func(cmd *cobra.Command, args []string) error { + v := serverv2.GetViperFromCmd(cmd) + snapshotStore, err := snapshots.NewStore(filepath.Join(v.GetString(serverv2.FlagHome), "data", "snapshots")) + if err != nil { + return err + } + + output, err := cmd.Flags().GetString("output") + if err != nil { + return err + } + + height, err := strconv.ParseUint(args[0], 10, 64) + if err != nil { + return err + } + format, err := strconv.ParseUint(args[1], 10, 32) + if err != nil { + return err + } + + if output == "" { + output = fmt.Sprintf("%d-%d.tar.gz", height, format) + } + + snapshot, err := snapshotStore.Get(height, uint32(format)) + if err != nil { + return err + } + + if snapshot == nil { + return errors.New("snapshot doesn't exist") + } + + bz, err := snapshot.Marshal() + if err != nil { + return err + } + + fp, err := os.Create(output) + if err != nil { + return err + } + defer fp.Close() + + // since the chunk files are already compressed, we just use fastest compression here + gzipWriter, err := gzip.NewWriterLevel(fp, gzip.BestSpeed) + if err != nil { + return err + } + tarWriter := tar.NewWriter(gzipWriter) + if err := tarWriter.WriteHeader(&tar.Header{ + Name: SnapshotFileName, + Mode: 0o644, + Size: int64(len(bz)), + }); err != nil { + return fmt.Errorf("failed to write snapshot header to tar: %w", err) + } + if _, err := tarWriter.Write(bz); err != nil { + return fmt.Errorf("failed to write snapshot to tar: %w", err) + } + + for i := uint32(0); i < snapshot.Chunks; i++ { + path := snapshotStore.PathChunk(height, uint32(format), i) + tarName := strconv.FormatUint(uint64(i), 10) + if err := processChunk(tarWriter, path, tarName); err != nil { + return err + } + } + + if err := tarWriter.Close(); err != nil { + return fmt.Errorf("failed to close tar writer: %w", err) + } + + if err := gzipWriter.Close(); err != nil { + return fmt.Errorf("failed to close gzip writer: %w", err) + } + + return fp.Close() + }, + } + + cmd.Flags().StringP("output", "o", "", "output file") + + return cmd +} + +// LoadArchiveCmd load a portable archive format snapshot into snapshot store +func (s *StoreComponent[T]) LoadArchiveCmd() *cobra.Command { + return &cobra.Command{ + Use: "load ", + Short: "Load a snapshot archive file (.tar.gz) into snapshot store", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + v := serverv2.GetViperFromCmd(cmd) + snapshotStore, err := snapshots.NewStore(filepath.Join(v.GetString(serverv2.FlagHome), "data", "snapshots")) + if err != nil { + return err + } + + path := args[0] + fp, err := os.Open(path) + if err != nil { + return fmt.Errorf("failed to open archive file: %w", err) + } + reader, err := gzip.NewReader(fp) + if err != nil { + return fmt.Errorf("failed to create gzip reader: %w", err) + } + + var snapshot types.Snapshot + tr := tar.NewReader(reader) + + hdr, err := tr.Next() + if err != nil { + return fmt.Errorf("failed to read snapshot file header: %w", err) + } + if hdr.Name != SnapshotFileName { + return fmt.Errorf("invalid archive, expect file: snapshot, got: %s", hdr.Name) + } + bz, err := io.ReadAll(tr) + if err != nil { + return fmt.Errorf("failed to read snapshot file: %w", err) + } + if err := snapshot.Unmarshal(bz); err != nil { + return fmt.Errorf("failed to unmarshal snapshot: %w", err) + } + + // make sure the channel is unbuffered, because the tar reader can't do concurrency + chunks := make(chan io.ReadCloser) + quitChan := make(chan *types.Snapshot) + go func() { + defer close(quitChan) + + savedSnapshot, err := snapshotStore.Save(snapshot.Height, snapshot.Format, chunks) + if err != nil { + cmd.Println("failed to save snapshot", err) + return + } + quitChan <- savedSnapshot + }() + + for i := uint32(0); i < snapshot.Chunks; i++ { + hdr, err = tr.Next() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err + } + + if hdr.Name != strconv.FormatInt(int64(i), 10) { + return fmt.Errorf("invalid archive, expect file: %d, got: %s", i, hdr.Name) + } + + bz, err := io.ReadAll(tr) + if err != nil { + return fmt.Errorf("failed to read chunk file: %w", err) + } + chunks <- io.NopCloser(bytes.NewReader(bz)) + } + close(chunks) + + savedSnapshot := <-quitChan + if savedSnapshot == nil { + return fmt.Errorf("failed to save snapshot") + } + + if !reflect.DeepEqual(&snapshot, savedSnapshot) { + _ = snapshotStore.Delete(snapshot.Height, snapshot.Format) + return fmt.Errorf("invalid archive, the saved snapshot is not equal to the original one") + } + + return nil + }, + } +} + +func createSnapshotsManager(cmd *cobra.Command, v *viper.Viper, logger log.Logger, store storev2.RootStore) (*snapshots.Manager, error) { + home := v.GetString(serverv2.FlagHome) + snapshotStore, err := snapshots.NewStore(filepath.Join(home, "data", "snapshots")) + if err != nil { + return nil, err + } + var interval, keepRecent uint64 + // if flag was not passed, use as 0. + if cmd.Flags().Changed(FlagKeepRecent) { + keepRecent, err = cmd.Flags().GetUint64(FlagKeepRecent) + if err != nil { + return nil, err + } + } + if cmd.Flags().Changed(FlagInterval) { + interval, err = cmd.Flags().GetUint64(FlagInterval) + if err != nil { + return nil, err + } + } + + sm := snapshots.NewManager(snapshotStore, snapshots.NewSnapshotOptions(interval, uint32(keepRecent)), store.GetStateCommitment().(snapshots.CommitSnapshotter), store.GetStateStorage().(snapshots.StorageSnapshotter), nil, logger) + return sm, nil +} + +func addSnapshotFlagsToCmd(cmd *cobra.Command) { + cmd.Flags().Uint64(FlagKeepRecent, 0, "KeepRecent defines how many snapshots to keep in heights") + cmd.Flags().Uint64(FlagInterval, 0, "Interval defines at which heights the snapshot is taken") +} + +func processChunk(tarWriter *tar.Writer, path, tarName string) error { + file, err := os.Open(path) + if err != nil { + return fmt.Errorf("failed to open chunk file %s: %w", path, err) + } + defer file.Close() + + st, err := file.Stat() + if err != nil { + return fmt.Errorf("failed to stat chunk file %s: %w", path, err) + } + + if err := tarWriter.WriteHeader(&tar.Header{ + Name: tarName, + Mode: 0o644, + Size: st.Size(), + }); err != nil { + return fmt.Errorf("failed to write chunk header to tar: %w", err) + } + + if _, err := io.Copy(tarWriter, file); err != nil { + return fmt.Errorf("failed to write chunk to tar: %w", err) + } + + return nil +} diff --git a/simapp/v2/simdv2/cmd/commands.go b/simapp/v2/simdv2/cmd/commands.go index 8d1e8bd40a63..ec9998f5c8f2 100644 --- a/simapp/v2/simdv2/cmd/commands.go +++ b/simapp/v2/simdv2/cmd/commands.go @@ -52,8 +52,6 @@ func initRootCmd[T transaction.Tx]( debug.Cmd(), confixcmd.ConfigCommand(), NewTestnetCmd(moduleManager), - // pruning.Cmd(newApp), // TODO add to comet server - // snapshot.Cmd(newApp), // TODO add to comet server ) logger, err := serverv2.NewLogger(viper.New(), rootCmd.OutOrStdout()) @@ -77,7 +75,7 @@ func initRootCmd[T transaction.Tx]( logger, cometbft.New(&genericTxDecoder[T]{txConfig}, cometbft.DefaultServerOptions[T]()), grpc.New[T](), - store.New[T](), + store.New[T](newApp), ); err != nil { panic(err) } diff --git a/simapp/v2/simdv2/cmd/testnet.go b/simapp/v2/simdv2/cmd/testnet.go index 2d4c3b348db5..ea56e3a8fe2f 100644 --- a/simapp/v2/simdv2/cmd/testnet.go +++ b/simapp/v2/simdv2/cmd/testnet.go @@ -343,8 +343,8 @@ func initTestnetFiles[T transaction.Tx]( cometbft.ServerOptions[T]{}, cometbft.OverwriteDefaultConfigTomlConfig(nodeConfig), ) + storeServer := store.New[T](newApp) grpcServer := grpc.New[T](grpc.OverwriteDefaultConfig(grpcConfig)) - storeServer := store.New[T]() server := serverv2.NewServer(log.NewNopLogger(), cometServer, grpcServer, storeServer) err = server.WriteConfig(filepath.Join(nodeDir, "config")) if err != nil { @@ -366,7 +366,7 @@ func initTestnetFiles[T transaction.Tx]( } // Update viper root since root dir become rootdir/node/simd - client.GetViperFromCmd(cmd).Set(flags.FlagHome, nodeConfig.RootDir) + serverv2.GetViperFromCmd(cmd).Set(flags.FlagHome, nodeConfig.RootDir) cmd.PrintErrf("Successfully initialized %d node directories\n", args.numValidators) return nil diff --git a/store/v2/root/factory.go b/store/v2/root/factory.go index 886eafb70bc2..17e2d39e87eb 100644 --- a/store/v2/root/factory.go +++ b/store/v2/root/factory.go @@ -55,11 +55,11 @@ func DefaultStoreOptions() Options { SCType: 0, SCPruningOption: &store.PruningOption{ KeepRecent: 2, - Interval: 1, + Interval: 100, }, SSPruningOption: &store.PruningOption{ KeepRecent: 2, - Interval: 1, + Interval: 100, }, IavlConfig: &iavl.Config{ CacheSize: 100_000,