Skip to content

Commit

Permalink
Merge branch 'development' into ed/author_submitAndWatchExtrinsic
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardmack committed Mar 2, 2021
2 parents a066327 + f9a40ff commit 276e61b
Show file tree
Hide file tree
Showing 36 changed files with 553 additions and 201 deletions.
4 changes: 4 additions & 0 deletions cmd/gossamer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ func createDotConfig(ctx *cli.Context) (cfg *dot.Config, err error) {
setDotNetworkConfig(ctx, tomlCfg.Network, &cfg.Network)
setDotRPCConfig(ctx, tomlCfg.RPC, &cfg.RPC)

if rewind := ctx.GlobalInt(RewindFlag.Name); rewind != 0 {
cfg.State.Rewind = rewind
}

// set system info
setSystemInfoConfig(ctx, cfg)

Expand Down
6 changes: 6 additions & 0 deletions cmd/gossamer/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ var (
Name: "roles",
Usage: "Roles of the gossamer node",
}
// RewindFlag rewinds the head of the chain by the given number of blocks. Useful for development
RewindFlag = cli.IntFlag{
Name: "rewind",
Usage: "Rewind head of chain by given number of blocks",
}
)

// Global node configuration flags
Expand Down Expand Up @@ -232,6 +237,7 @@ var (
BasePathFlag,
CPUProfFlag,
MemProfFlag,
RewindFlag,
}

// StartupFlags are flags that are valid for use with the root command and the export subcommand
Expand Down
6 changes: 6 additions & 0 deletions dot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Config struct {
Network NetworkConfig
RPC RPCConfig
System types.SystemInfo
State StateConfig
}

// GlobalConfig is used for every node command
Expand Down Expand Up @@ -107,6 +108,11 @@ type RPCConfig struct {
WSExternal bool
}

// StateConfig is the config for the State service
type StateConfig struct {
Rewind int
}

// String will return the json representation for a Config
func (c *Config) String() string {
out, _ := json.MarshalIndent(c, "", "\t")
Expand Down
3 changes: 3 additions & 0 deletions dot/core/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,8 @@ func (h *DigestHandler) handleNextEpochData(d *types.ConsensusDigest, header *ty
if err != nil {
return err
}

logger.Debug("setting epoch data", "blocknum", header.Number, "epoch", currEpoch+1, "data", data)
return h.epochState.SetEpochData(currEpoch+1, data)
}

Expand All @@ -467,6 +469,7 @@ func (h *DigestHandler) handleNextConfigData(d *types.ConsensusDigest, header *t
return err
}

logger.Debug("setting BABE config data", "blocknum", header.Number, "epoch", currEpoch+1, "data", od.ToConfigData())
// set EpochState config data for upcoming epoch
return h.epochState.SetConfigData(currEpoch+1, od.ToConfigData())
}
13 changes: 7 additions & 6 deletions dot/core/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
// adds valid transactions to the transaction queue of the BABE session
func (s *Service) HandleTransactionMessage(msg *network.TransactionMessage) error {
logger.Debug("received TransactionMessage")
if !s.isBlockProducer {
return nil
}

// get transactions from message extrinsics
txs := msg.Extrinsics
Expand All @@ -36,17 +39,15 @@ func (s *Service) HandleTransactionMessage(msg *network.TransactionMessage) erro
val, err := s.rt.ValidateTransaction(tx)
if err != nil {
logger.Error("failed to validate transaction", "err", err)
return err // exit
return err
}

// create new valid transaction
vtx := transaction.NewValidTransaction(tx, val)

if s.isBlockProducer {
// push to the transaction queue of BABE session
hash := s.transactionState.AddToPool(vtx)
logger.Trace("Added transaction to queue", "hash", hash)
}
// push to the transaction queue of BABE session
hash := s.transactionState.AddToPool(vtx)
logger.Trace("Added transaction to queue", "hash", hash)
}

return nil
Expand Down
15 changes: 9 additions & 6 deletions dot/core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *Service) StorageRoot() (common.Hash, error) {

func (s *Service) handleBlocks(ctx context.Context) {
for {
prev := s.blockState.BestBlockHash()
//prev := s.blockState.BestBlockHash()

select {
case block := <-s.blockAddCh:
Expand All @@ -234,9 +234,10 @@ func (s *Service) handleBlocks(ctx context.Context) {
logger.Warn("failed to handle epoch for block", "block", block.Header.Hash(), "error", err)
}

if err := s.handleChainReorg(prev, block.Header.Hash()); err != nil {
logger.Warn("failed to re-add transactions to chain upon re-org", "error", err)
}
// TODO: add inherent check
// if err := s.handleChainReorg(prev, block.Header.Hash()); err != nil {
// logger.Warn("failed to re-add transactions to chain upon re-org", "error", err)
// }

if err := s.maintainTransactionPool(block); err != nil {
logger.Warn("failed to maintain transaction pool", "error", err)
Expand Down Expand Up @@ -350,12 +351,14 @@ func (s *Service) handleChainReorg(prev, curr common.Hash) error {
continue
}

// TODO: decode extrinsic and make sure it's not an inherent.
// currently we are attempting to re-add inherents, causing lots of "'Bad input data provided to validate_transaction" errors.
for _, ext := range exts {
logger.Trace("validating transaction on re-org chain", "extrinsic", ext)
logger.Debug("validating transaction on re-org chain", "extrinsic", ext)

txv, err := s.rt.ValidateTransaction(ext)
if err != nil {
logger.Trace("failed to validate transaction", "extrinsic", ext)
logger.Debug("failed to validate transaction", "extrinsic", ext)
continue
}

Expand Down
10 changes: 0 additions & 10 deletions dot/network/block_announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,16 +219,6 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err
}

go func() {
if s.notificationsProtocols[BlockAnnounceMsgType] == nil {
logger.Error("s.notificationsProtocols[BlockAnnounceMsgType] is nil")
return
}

if s.notificationsProtocols[BlockAnnounceMsgType].handshakeData[peer] == nil {
logger.Error("peer handshake data is nil")
return
}

s.syncQueue.handleBlockAnnounceHandshake(bhs.BestBlockNumber, peer)
}()

Expand Down
2 changes: 1 addition & 1 deletion dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
}

// create runtime
rt, err := createRuntime(cfg, stateSrvc, ks.Acco.(*keystore.GenericKeystore), networkSrvc)
rt, err := createRuntime(cfg, stateSrvc, ks, networkSrvc)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions dot/rpc/modules/author.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ fmt.Printf("SubExt ext %v\n", extBytes)
// validate the transaction
txv, err := cm.runtimeAPI.ValidateTransaction(ext)
if err != nil {
cm.logger.Warn("failed to validate transaction", "ext", ext)
return err
}

Expand Down
2 changes: 2 additions & 0 deletions dot/rpc/modules/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func newNetworkService(t *testing.T) *network.Service {
// Test RPC's System.Health() response
func TestSystemModule_Health(t *testing.T) {
net := newNetworkService(t)
net.Stop()
sys := NewSystemModule(net, nil, nil, nil, nil)

res := &SystemHealthResponse{}
Expand Down Expand Up @@ -145,6 +146,7 @@ func TestSystemModule_NetworkState(t *testing.T) {
// Test RPC's System.Peers() response
func TestSystemModule_Peers(t *testing.T) {
net := newNetworkService(t)
net.Stop()
sys := NewSystemModule(net, nil, nil, nil, nil)

res := &SystemPeersResponse{}
Expand Down
9 changes: 8 additions & 1 deletion dot/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ func createStateService(cfg *Config) (*state.Service, error) {
return nil, fmt.Errorf("failed to start state service: %s", err)
}

if cfg.State.Rewind != 0 {
err = stateSrvc.Rewind(cfg.State.Rewind)
if err != nil {
return nil, fmt.Errorf("failed to rewind state: %w", err)
}
}

// load most recent state from database
latestState, err := state.LoadLatestStorageHash(stateSrvc.DB())
if err != nil {
Expand All @@ -78,7 +85,7 @@ func createStateService(cfg *Config) (*state.Service, error) {
return stateSrvc, nil
}

func createRuntime(cfg *Config, st *state.Service, ks *keystore.GenericKeystore, net *network.Service) (runtime.Instance, error) {
func createRuntime(cfg *Config, st *state.Service, ks *keystore.GlobalKeystore, net *network.Service) (runtime.Instance, error) {
logger.Info(
"creating runtime...",
"interpreter", cfg.Core.WasmInterpreter,
Expand Down
12 changes: 6 additions & 6 deletions dot/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestCreateCoreService(t *testing.T) {

networkSrvc := &network.Service{}

rt, err := createRuntime(cfg, stateSrvc, ks.Acco.(*keystore.GenericKeystore), networkSrvc)
rt, err := createRuntime(cfg, stateSrvc, ks, networkSrvc)
require.NoError(t, err)

dh, err := createDigestHandler(stateSrvc, nil, nil)
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestCreateSyncService(t *testing.T) {

ks := keystore.NewGlobalKeystore()
require.NotNil(t, ks)
rt, err := createRuntime(cfg, stateSrvc, ks.Acco.(*keystore.GenericKeystore), &network.Service{})
rt, err := createRuntime(cfg, stateSrvc, ks, &network.Service{})
require.NoError(t, err)

cfg.Core.BabeThresholdNumerator = 0
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestCreateRPCService(t *testing.T) {
ed25519Keyring, _ := keystore.NewEd25519Keyring()
ks.Gran.Insert(ed25519Keyring.Alice())

rt, err := createRuntime(cfg, stateSrvc, ks.Acco.(*keystore.GenericKeystore), networkSrvc)
rt, err := createRuntime(cfg, stateSrvc, ks, networkSrvc)
require.NoError(t, err)

dh, err := createDigestHandler(stateSrvc, nil, nil)
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestCreateBABEService(t *testing.T) {
require.Nil(t, err)
ks.Babe.Insert(kr.Alice())

rt, err := createRuntime(cfg, stateSrvc, ks.Acco.(*keystore.GenericKeystore), &network.Service{})
rt, err := createRuntime(cfg, stateSrvc, ks, &network.Service{})
require.NoError(t, err)

bs, err := createBABEService(cfg, rt, stateSrvc, ks.Babe)
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestCreateGrandpaService(t *testing.T) {
require.NoError(t, err)
ks.Gran.Insert(kr.Alice())

rt, err := createRuntime(cfg, stateSrvc, ks.Acco.(*keystore.GenericKeystore), &network.Service{})
rt, err := createRuntime(cfg, stateSrvc, ks, &network.Service{})
require.NoError(t, err)

dh, err := createDigestHandler(stateSrvc, nil, nil)
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestNewWebSocketServer(t *testing.T) {
ks := keystore.NewGlobalKeystore()
ed25519Keyring, _ := keystore.NewEd25519Keyring()
ks.Gran.Insert(ed25519Keyring.Alice())
rt, err := createRuntime(cfg, stateSrvc, ks.Acco.(*keystore.GenericKeystore), networkSrvc)
rt, err := createRuntime(cfg, stateSrvc, ks, networkSrvc)
require.NoError(t, err)

dh, err := createDigestHandler(stateSrvc, nil, nil)
Expand Down
25 changes: 25 additions & 0 deletions dot/state/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,31 @@ func (s *Service) Start() error {
return nil
}

// Rewind rewinds the chain by the given number of blocks.
// If the given number of blocks is greater than the chain height, it will rewind to genesis.
func (s *Service) Rewind(numBlocks int) error {
num, _ := s.Block.BestBlockNumber()

logger.Info("rewinding state...", "current height", num, "to rewind", numBlocks)
s.Block.bt.Rewind(numBlocks)
newHead := s.Block.BestBlockHash()

header, _ := s.Block.BestBlockHeader()
logger.Info("rewinding state...", "new height", header.Number)

epoch, err := s.Epoch.GetEpochForBlock(header)
if err != nil {
return err
}

err = s.Epoch.SetCurrentEpoch(epoch)
if err != nil {
return err
}

return StoreBestBlockHash(s.db, newHead)
}

// Stop closes each state database
func (s *Service) Stop() error {
head, err := s.Block.BestBlockStateRoot()
Expand Down
23 changes: 23 additions & 0 deletions dot/state/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,26 @@ func TestService_PruneStorage(t *testing.T) {
require.Equal(t, false, ok)
}
}

func TestService_Rewind(t *testing.T) {
testDir := utils.NewTestDir(t)
defer utils.RemoveTestDir(t)

serv := NewService(testDir, log.LvlTrace)
serv.UseMemDB()

genData, genTrie, genesisHeader := newTestGenesisWithTrieAndHeader(t)
err := serv.Initialize(genData, genesisHeader, genTrie)
require.NoError(t, err)

err = serv.Start()
require.NoError(t, err)

AddBlocksToState(t, serv.Block, 12)
err = serv.Rewind(6)
require.NoError(t, err)

num, err := serv.Block.BestBlockNumber()
require.NoError(t, err)
require.Equal(t, big.NewInt(6), num)
}
2 changes: 2 additions & 0 deletions dot/state/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ func AddBlocksToState(t *testing.T, blockState *BlockState, depth int) ([]*types
// create base tree
startNum := int(head.Number.Int64())
for i := startNum + 1; i <= depth; i++ {
d := types.NewBabePrimaryPreDigest(0, uint64(i), [32]byte{}, [64]byte{})
block := &types.Block{
Header: &types.Header{
ParentHash: previousHash,
Number: big.NewInt(int64(i)),
StateRoot: trie.EmptyHash,
Digest: types.Digest{d.ToPreRuntimeDigest()},
},
Body: &types.Body{},
}
Expand Down
18 changes: 6 additions & 12 deletions dot/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,7 @@ func (s *Service) handleBlock(block *types.Block) error {

// handle consensus digest for authority changes
if s.digestHandler != nil {
go func() {
err = s.handleDigests(block.Header)
if err != nil {
s.logger.Error("failed to handle block digest", "error", err)
}
}()
s.handleDigests(block.Header)
}

return s.handleRuntimeChanges(ts)
Expand Down Expand Up @@ -365,22 +360,21 @@ func (s *Service) handleRuntimeChanges(newState *rtstorage.TrieState) error {
return nil
}

func (s *Service) handleDigests(header *types.Header) error {
for _, d := range header.Digest {
func (s *Service) handleDigests(header *types.Header) {
for i, d := range header.Digest {
if d.Type() == types.ConsensusDigestType {
cd, ok := d.(*types.ConsensusDigest)
if !ok {
return errors.New("cannot cast invalid consensus digest item")
s.logger.Error("handleDigests", "index", i, "error", "cannot cast invalid consensus digest item")
continue
}

err := s.digestHandler.HandleConsensusDigest(cd, header)
if err != nil {
return err
s.logger.Error("handleDigests", "index", i, "digest", cd, "error", err)
}
}
}

return nil
}

// IsSynced exposes the synced state
Expand Down
5 changes: 5 additions & 0 deletions dot/types/babe_digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func NewBabePrimaryPreDigest(authorityIndex uint32, slotNumber uint64, vrfOutput
}
}

// ToPreRuntimeDigest returns the BabePrimaryPreDigest as a PreRuntimeDigest
func (d *BabePrimaryPreDigest) ToPreRuntimeDigest() *PreRuntimeDigest {
return NewBABEPreRuntimeDigest(d.Encode())
}

// Type returns BabePrimaryPreDigestType
func (d *BabePrimaryPreDigest) Type() byte {
return BabePrimaryPreDigestType
Expand Down
Loading

0 comments on commit 276e61b

Please sign in to comment.