Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reconnect full node to sequencer with configurable time check #556

Merged
merged 8 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ func TestInitialState(t *testing.T) {

// Init p2p client
privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{}, privKey, "TestChain", 50, logger)
p2pClient, err := p2p.NewClient(config.P2PConfig{
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second}, privKey, "TestChain", logger)
assert.NoError(err)
assert.NotNil(p2pClient)

Expand Down
4 changes: 3 additions & 1 deletion block/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ func getManager(conf config.BlockManagerConfig, settlementlc settlement.LayerI,

// Init p2p client and validator
p2pKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{}, p2pKey, "TestChain", 50, logger)
p2pClient, err := p2p.NewClient(config.P2PConfig{
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second}, p2pKey, "TestChain", logger)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type NodeConfig struct {
SettlementLayer string `mapstructure:"settlement_layer"`
SettlementConfig settlement.Config `mapstructure:",squash"`
Instrumentation *InstrumentationConfig `mapstructure:"instrumentation"`
BootstrapTime time.Duration `mapstructure:"bootstrap_time"`
}

// BlockManagerConfig consists of all parameters required by BlockManagerConfig
Expand Down
1 change: 1 addition & 0 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func DefaultConfig(home, chainId string) *NodeConfig {
Prometheus: false,
PrometheusListenAddr: ":2112",
},
BootstrapTime: 30 * time.Second,
}

if home == "" {
Expand Down
8 changes: 6 additions & 2 deletions config/p2p.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package config

import "time"

// P2PConfig stores configuration related to peer-to-peer networking.
type P2PConfig struct {
ListenAddress string // Address to listen for incoming connections
Seeds string // Comma separated list of seed nodes to connect to
ListenAddress string // Address to listen for incoming connections
Seeds string // Comma separated list of seed nodes to connect to
GossipCacheSize int
BoostrapTime time.Duration
}
3 changes: 3 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ block_batch_max_size_bytes = {{ .BlockManagerConfig.BlockBatchMaxSizeBytes }}
# max number of cached messages by gossipsub protocol
gossiped_blocks_cache_size = {{ .BlockManagerConfig.GossipedBlocksCacheSize }}

# time interval to check if no p2p nodes are connected to bootstrap again
bootstrap_time = "{{ .BootstrapTime }}"

#celestia config example:
# da_config = "{\"base_url\": \"http://127.0.0.1:26658\", \"timeout\": 60000000000, \"gas_prices\":0.1, \"gas_adjustment\": 1.3, \"token\":\"TOKEN\"}"
# Avail config example:
Expand Down
5 changes: 4 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ func NewNode(ctx context.Context, conf config.NodeConfig, p2pKey crypto.PrivKey,

// Set p2p client and it's validators
p2pValidator := p2p.NewValidator(logger.With("module", "p2p_validator"), pubsubServer)
p2pClient, err := p2p.NewClient(conf.P2P, p2pKey, genesis.ChainID, conf.GossipedBlocksCacheSize, logger.With("module", "p2p"))

conf.P2P.GossipCacheSize = conf.BlockManagerConfig.GossipedBlocksCacheSize
conf.P2P.BoostrapTime = conf.BootstrapTime
p2pClient, err := p2p.NewClient(conf.P2P, p2pKey, genesis.ChainID, logger.With("module", "p2p"))
if err != nil {
return nil, err
}
Expand Down
43 changes: 32 additions & 11 deletions p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,27 +72,24 @@
cancel context.CancelFunc

logger log.Logger

gossipCacheSize int
}

// NewClient creates new Client object.
//
// Basic checks on parameters are done, and default parameters are provided for unset-configuration
// TODO(tzdybal): consider passing entire config, not just P2P config, to reduce number of arguments
func NewClient(conf config.P2PConfig, privKey crypto.PrivKey, chainID string, gossipCacheSize int, logger log.Logger) (*Client, error) {
func NewClient(conf config.P2PConfig, privKey crypto.PrivKey, chainID string, logger log.Logger) (*Client, error) {
if privKey == nil {
return nil, errNoPrivKey
}
if conf.ListenAddress == "" {
conf.ListenAddress = config.DefaultListenAddress
}
return &Client{
conf: conf,
privKey: privKey,
chainID: chainID,
logger: logger,
gossipCacheSize: gossipCacheSize,
conf: conf,
privKey: privKey,
chainID: chainID,
logger: logger,
}, nil
}

Expand Down Expand Up @@ -266,6 +263,10 @@
return fmt.Errorf("failed to bootstrap DHT: %w", err)
}

if len(seedNodes) > 0 {
go c.bootstrapLoop(ctx)
}

c.host = routedhost.Wrap(c.host, c.dht)

return nil
Expand Down Expand Up @@ -332,9 +333,9 @@

func (c *Client) setupGossiping(ctx context.Context) error {

pubsub.GossipSubHistoryGossip = c.gossipCacheSize
pubsub.GossipSubHistoryLength = c.gossipCacheSize
pubsub.GossipSubMaxIHaveMessages = c.gossipCacheSize
pubsub.GossipSubHistoryGossip = c.conf.GossipCacheSize
pubsub.GossipSubHistoryLength = c.conf.GossipCacheSize
pubsub.GossipSubMaxIHaveMessages = c.conf.GossipCacheSize

ps, err := pubsub.NewGossipSub(ctx, c.host)
if err != nil {
Expand Down Expand Up @@ -413,3 +414,23 @@
return true
}
}

func (c *Client) bootstrapLoop(ctx context.Context) {
ticker := time.NewTicker(c.conf.BoostrapTime)
defer ticker.Stop()
for {
select {
//Context canceled
case <-ctx.Done():
return
case <-ticker.C:
if len(c.Peers()) == 0 {
err := c.dht.Bootstrap(ctx)
if err != nil {
c.logger.Error("failed to re-bootstrap DHT: %w", err)
}

Check warning on line 431 in p2p/client.go

View check run for this annotation

Codecov / codecov/patch

p2p/client.go#L426-L431

Added lines #L426 - L431 were not covered by tests
}

}
}
}
8 changes: 6 additions & 2 deletions p2p/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (

func TestClientStartup(t *testing.T) {
privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
client, err := NewClient(config.P2PConfig{}, privKey, "TestChain", 50, log.TestingLogger())
client, err := NewClient(config.P2PConfig{
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second}, privKey, "TestChain", log.TestingLogger())
assert := assert.New(t)
assert.NoError(err)
assert.NotNil(client)
Expand Down Expand Up @@ -165,7 +167,9 @@ func TestSeedStringParsing(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
logger := &test.MockLogger{}
client, err := NewClient(config.P2PConfig{}, privKey, "TestNetwork", 50, logger)
client, err := NewClient(config.P2PConfig{
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second}, privKey, "TestNetwork", logger)
require.NoError(err)
require.NotNil(client)
actual := client.getSeedAddrInfo(c.input)
Expand Down
6 changes: 4 additions & 2 deletions p2p/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"strings"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -101,10 +102,11 @@ func startTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]hos
clients := make([]*Client, n)
for i := 0; i < n; i++ {
client, err := NewClient(config.P2PConfig{
Seeds: seeds[i]},
Seeds: seeds[i],
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second},
mnet.Hosts()[i].Peerstore().PrivKey(mnet.Hosts()[i].ID()),
conf[i].chainID,
50,
logger)
require.NoError(err)
require.NotNil(client)
Expand Down
6 changes: 6 additions & 0 deletions rpc/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func TestGenesisChunked(t *testing.T) {
BlockBatchMaxSizeBytes: 1000,
GossipedBlocksCacheSize: 50,
},
BootstrapTime: 30 * time.Second,
DALayer: "mock",
DAConfig: "",
SettlementLayer: "mock",
Expand Down Expand Up @@ -441,6 +442,7 @@ func TestTx(t *testing.T) {
BlockBatchMaxSizeBytes: 1000,
GossipedBlocksCacheSize: 50,
},
BootstrapTime: 30 * time.Second,
SettlementConfig: settlement.Config{ProposerPubKey: hex.EncodeToString(pubKeybytes)},
},
key, signingKey, proxy.NewLocalClientCreator(mockApp),
Expand Down Expand Up @@ -709,6 +711,7 @@ func TestValidatorSetHandling(t *testing.T) {
BlockBatchMaxSizeBytes: 1000,
GossipedBlocksCacheSize: 50,
},
BootstrapTime: 30 * time.Second,
SettlementConfig: settlement.Config{ProposerPubKey: hex.EncodeToString(proposerPubKeyBytes)},
}

Expand Down Expand Up @@ -828,6 +831,7 @@ func getRPC(t *testing.T) (*mocks.Application, *Client) {
BlockBatchMaxSizeBytes: 1000,
GossipedBlocksCacheSize: 50,
},
BootstrapTime: 30 * time.Second,
DALayer: "mock",
DAConfig: "",
SettlementLayer: "mock",
Expand Down Expand Up @@ -915,6 +919,7 @@ func TestMempool2Nodes(t *testing.T) {
BlockBatchMaxSizeBytes: 1000,
GossipedBlocksCacheSize: 50,
},
BootstrapTime: 30 * time.Second,
P2P: config.P2PConfig{
ListenAddress: "/ip4/127.0.0.1/tcp/9001",
},
Expand All @@ -933,6 +938,7 @@ func TestMempool2Nodes(t *testing.T) {
BlockBatchMaxSizeBytes: 1000,
GossipedBlocksCacheSize: 50,
},
BootstrapTime: 30 * time.Second,
P2P: config.P2PConfig{
ListenAddress: "/ip4/127.0.0.1/tcp/9002",
Seeds: "/ip4/127.0.0.1/tcp/9001/p2p/" + id1.String(),
Expand Down
Loading