diff --git a/.gitignore b/.gitignore index f4adfcaa168..23146d9b352 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,7 @@ scripts/cutWALUntil/cutWALUntil .vscode/ libs/pubsub/query/fuzz_test/output +libs/db/test* shunit2 .tendermint-lite @@ -42,4 +43,4 @@ terraform.tfstate terraform.tfstate.backup terraform.tfstate.d -.vscode \ No newline at end of file +.vscode diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index c4b776246b5..efb0fac4ae2 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -23,5 +23,7 @@ IMPROVEMENTS: - [types] add Address to GenesisValidator [\#1714](https://github.com/tendermint/tendermint/issues/1714) - [metrics] `consensus.block_interval_metrics` is now gauge, not histogram (you will be able to see spikes, if any) +- Added additional metrics to p2p and consensus + BUG FIXES: - [node] \#2294 Delay starting node until Genesis time diff --git a/config/metrics.go b/config/metrics.go new file mode 100644 index 00000000000..35839cb1a43 --- /dev/null +++ b/config/metrics.go @@ -0,0 +1,3 @@ +package config + +const MetricsNamespace = "tendermint" diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 3903e6b9e86..7ea6616a50f 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -72,7 +72,7 @@ func TestByzantine(t *testing.T) { err := eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, eventChans[i]) require.NoError(t, err) - conR := NewConsensusReactor(css[i], true) // so we dont start the consensus states + conR := NewConsensusReactor(css[i], true, NopMetrics()) // so we dont start the consensus states conR.SetLogger(logger.With("validator", i)) conR.SetEventBus(eventBus) diff --git a/consensus/metrics.go b/consensus/metrics.go index 91ae738d585..5b0973058d6 100644 --- a/consensus/metrics.go +++ b/consensus/metrics.go @@ -4,10 +4,13 @@ import ( "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/discard" - prometheus "github.com/go-kit/kit/metrics/prometheus" + "github.com/go-kit/kit/metrics/prometheus" stdprometheus "github.com/prometheus/client_golang/prometheus" + tmcfg "github.com/tendermint/tendermint/config" ) +const MetricsSubsystem = "consensus" + // Metrics contains metrics exposed by this package. type Metrics struct { // Height of the chain. @@ -38,75 +41,102 @@ type Metrics struct { BlockSizeBytes metrics.Gauge // Total number of transactions. TotalTxs metrics.Gauge + // The latest block height. + LatestBlockHeight metrics.Gauge + // Whether or not a node is synced. 0 if no, 1 if yes. + CatchingUp metrics.Gauge } // PrometheusMetrics returns Metrics build using Prometheus client library. func PrometheusMetrics() *Metrics { return &Metrics{ Height: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: tmcfg.MetricsNamespace, + Subsystem: MetricsSubsystem, Name: "height", Help: "Height of the chain.", }, []string{}), Rounds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: tmcfg.MetricsNamespace, + Subsystem: MetricsSubsystem, Name: "rounds", Help: "Number of rounds.", }, []string{}), Validators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: tmcfg.MetricsNamespace, + Subsystem: MetricsSubsystem, Name: "validators", Help: "Number of validators.", }, []string{}), ValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: tmcfg.MetricsNamespace, + Subsystem: MetricsSubsystem, Name: "validators_power", Help: "Total power of all validators.", }, []string{}), MissingValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: tmcfg.MetricsNamespace, + Subsystem: MetricsSubsystem, Name: "missing_validators", Help: "Number of validators who did not sign.", }, []string{}), MissingValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: tmcfg.MetricsNamespace, + Subsystem: MetricsSubsystem, Name: "missing_validators_power", Help: "Total power of the missing validators.", }, []string{}), ByzantineValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: tmcfg.MetricsNamespace, + Subsystem: MetricsSubsystem, Name: "byzantine_validators", Help: "Number of validators who tried to double sign.", }, []string{}), ByzantineValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: tmcfg.MetricsNamespace, + Subsystem: MetricsSubsystem, Name: "byzantine_validators_power", Help: "Total power of the byzantine validators.", }, []string{}), - - BlockIntervalSeconds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + BlockIntervalSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: tmcfg.MetricsNamespace, + Subsystem: MetricsSubsystem, Name: "block_interval_seconds", Help: "Time between this and the last block.", }, []string{}), NumTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: tmcfg.MetricsNamespace, + Subsystem: MetricsSubsystem, Name: "num_txs", Help: "Number of transactions.", }, []string{}), BlockSizeBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: tmcfg.MetricsNamespace, + Subsystem: MetricsSubsystem, Name: "block_size_bytes", Help: "Size of the block.", }, []string{}), TotalTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: tmcfg.MetricsNamespace, + Subsystem: MetricsSubsystem, Name: "total_txs", Help: "Total number of transactions.", }, []string{}), + LatestBlockHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: tmcfg.MetricsNamespace, + Subsystem: MetricsSubsystem, + Name: "latest_block_height", + Help: "The latest block height.", + }, []string{}), + CatchingUp: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: tmcfg.MetricsNamespace, + Subsystem: MetricsSubsystem, + Name: "catching_up", + Help: "Whether or not a node is synced. 0 if syncing, 1 if synced.", + }, []string{}), } } @@ -126,8 +156,10 @@ func NopMetrics() *Metrics { BlockIntervalSeconds: discard.NewGauge(), - NumTxs: discard.NewGauge(), - BlockSizeBytes: discard.NewGauge(), - TotalTxs: discard.NewGauge(), + NumTxs: discard.NewGauge(), + BlockSizeBytes: discard.NewGauge(), + TotalTxs: discard.NewGauge(), + LatestBlockHeight: discard.NewGauge(), + CatchingUp: discard.NewGauge(), } } diff --git a/consensus/reactor.go b/consensus/reactor.go index 6ba8172641c..60a1ccd3c19 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -8,7 +8,7 @@ import ( "github.com/pkg/errors" - amino "github.com/tendermint/go-amino" + "github.com/tendermint/go-amino" cstypes "github.com/tendermint/tendermint/consensus/types" cmn "github.com/tendermint/tendermint/libs/common" @@ -42,15 +42,19 @@ type ConsensusReactor struct { mtx sync.RWMutex fastSync bool eventBus *types.EventBus + + metrics *Metrics } // NewConsensusReactor returns a new ConsensusReactor with the given // consensusState. -func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor { +func NewConsensusReactor(consensusState *ConsensusState, fastSync bool, csMetrics *Metrics) *ConsensusReactor { conR := &ConsensusReactor{ conS: consensusState, fastSync: fastSync, + metrics: csMetrics, } + conR.setCatchingUp() conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR) return conR } @@ -94,6 +98,7 @@ func (conR *ConsensusReactor) SwitchToConsensus(state sm.State, blocksSynced int conR.mtx.Lock() conR.fastSync = false conR.mtx.Unlock() + conR.metrics.CatchingUp.Set(0) if blocksSynced > 0 { // dont bother with the WAL if we fast synced @@ -814,6 +819,16 @@ func (conR *ConsensusReactor) StringIndented(indent string) string { return s } +func (conR *ConsensusReactor) setCatchingUp() { + var catchingUp float64 + if conR.fastSync { + catchingUp = 1 + } else { + catchingUp = 0 + } + conR.metrics.CatchingUp.Set(catchingUp) +} + //----------------------------------------------------------------------------- var ( diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 98b058b8d61..16350076c9c 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -45,7 +45,7 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus for i := 0; i < N; i++ { /*logger, err := tmflags.ParseLogLevel("consensus:info,*:error", logger, "info") if err != nil { t.Fatal(err)}*/ - reactors[i] = NewConsensusReactor(css[i], true) // so we dont start the consensus states + reactors[i] = NewConsensusReactor(css[i], true, NopMetrics()) // so we dont start the consensus states reactors[i].SetLogger(css[i].Logger) // eventBus is already started with the cs @@ -255,7 +255,7 @@ func TestReactorRecordsBlockParts(t *testing.T) { // create reactor css := randConsensusNet(1, "consensus_reactor_records_block_parts_test", newMockTickerFunc(true), newPersistentKVStore) - reactor := NewConsensusReactor(css[0], false) // so we dont start the consensus states + reactor := NewConsensusReactor(css[0], false, NopMetrics()) // so we dont start the consensus states reactor.SetEventBus(css[0].eventBus) reactor.SetLogger(log.TestingLogger()) sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) @@ -306,7 +306,7 @@ func TestReactorRecordsVotes(t *testing.T) { // Create reactor. css := randConsensusNet(1, "consensus_reactor_records_votes_test", newMockTickerFunc(true), newPersistentKVStore) - reactor := NewConsensusReactor(css[0], false) // so we dont start the consensus states + reactor := NewConsensusReactor(css[0], false, NopMetrics()) // so we dont start the consensus states reactor.SetEventBus(css[0].eventBus) reactor.SetLogger(log.TestingLogger()) sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) diff --git a/consensus/state.go b/consensus/state.go index bee0f893ee4..1ab7c6e6cdb 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1382,6 +1382,8 @@ func (cs *ConsensusState) recordMetrics(height int64, block *types.Block) { cs.metrics.NumTxs.Set(float64(block.NumTxs)) cs.metrics.BlockSizeBytes.Set(float64(block.Size())) cs.metrics.TotalTxs.Set(float64(block.TotalTxs)) + cs.metrics.LatestBlockHeight.Set(float64(block.Height)) + } //----------------------------------------------------------------------------- diff --git a/mempool/metrics.go b/mempool/metrics.go index f381678cb31..473b7cb5b1d 100644 --- a/mempool/metrics.go +++ b/mempool/metrics.go @@ -4,8 +4,9 @@ import ( "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/discard" - prometheus "github.com/go-kit/kit/metrics/prometheus" + "github.com/go-kit/kit/metrics/prometheus" stdprometheus "github.com/prometheus/client_golang/prometheus" + "github.com/tendermint/tendermint/config" ) // Metrics contains metrics exposed by this package. @@ -19,6 +20,7 @@ type Metrics struct { func PrometheusMetrics() *Metrics { return &Metrics{ Size: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: config.MetricsNamespace, Subsystem: "mempool", Name: "size", Help: "Size of the mempool (number of uncommitted transactions).", diff --git a/node/node.go b/node/node.go index 995d1c88991..429cf7f9476 100644 --- a/node/node.go +++ b/node/node.go @@ -287,6 +287,8 @@ func NewNode(config *cfg.Config, bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) bcReactor.SetLogger(logger.With("module", "blockchain")) + csm := cs.WithMetrics(csMetrics) + // Make ConsensusReactor consensusState := cs.NewConsensusState( config.Consensus, @@ -295,13 +297,13 @@ func NewNode(config *cfg.Config, blockStore, mempool, evidencePool, - cs.WithMetrics(csMetrics), + csm, ) consensusState.SetLogger(consensusLogger) if privValidator != nil { consensusState.SetPrivValidator(privValidator) } - consensusReactor := cs.NewConsensusReactor(consensusState, fastSync) + consensusReactor := cs.NewConsensusReactor(consensusState, fastSync, csMetrics) consensusReactor.SetLogger(consensusLogger) eventBus := types.NewEventBus() diff --git a/p2p/metrics.go b/p2p/metrics.go index ab876ee7c62..ca3082ccf1d 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -4,24 +4,52 @@ import ( "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/discard" - prometheus "github.com/go-kit/kit/metrics/prometheus" + "github.com/go-kit/kit/metrics/prometheus" stdprometheus "github.com/prometheus/client_golang/prometheus" + "github.com/tendermint/tendermint/config" ) +const MetricsSubsystem = "p2p" + // Metrics contains metrics exposed by this package. type Metrics struct { // Number of peers. Peers metrics.Gauge + // Number of bytes received from a given peer. + PeerReceiveBytesTotal metrics.Counter + // Number of bytes sent to a given peer. + PeerSendBytesTotal metrics.Counter + // Pending bytes to be sent to a given peer. + PeerPendingSendBytes metrics.Gauge } // PrometheusMetrics returns Metrics build using Prometheus client library. func PrometheusMetrics() *Metrics { return &Metrics{ Peers: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "p2p", + Namespace: config.MetricsNamespace, + Subsystem: MetricsSubsystem, Name: "peers", Help: "Number of peers.", }, []string{}), + PeerReceiveBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: config.MetricsNamespace, + Subsystem: MetricsSubsystem, + Name: "peer_receive_bytes_total", + Help: "Number of bytes received from a given peer.", + }, []string{"peer_id"}), + PeerSendBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: config.MetricsNamespace, + Subsystem: MetricsSubsystem, + Name: "peer_send_bytes_total", + Help: "Number of bytes sent to a given peer.", + }, []string{"peer_id"}), + PeerPendingSendBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: config.MetricsNamespace, + Subsystem: MetricsSubsystem, + Name: "peer_pending_send_bytes", + Help: "Number of pending bytes to be sent to a given peer.", + }, []string{"peer_id"}), } } @@ -29,5 +57,8 @@ func PrometheusMetrics() *Metrics { func NopMetrics() *Metrics { return &Metrics{ Peers: discard.NewGauge(), + PeerReceiveBytesTotal: discard.NewCounter(), + PeerSendBytesTotal: discard.NewCounter(), + PeerPendingSendBytes: discard.NewGauge(), } } diff --git a/p2p/peer.go b/p2p/peer.go index 5dbc582c07d..5efcdce88ff 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -13,6 +13,8 @@ import ( tmconn "github.com/tendermint/tendermint/p2p/conn" ) +const MetricsTickerDuration = 10 * time.Second + var testIPSuffix uint32 // Peer is an interface representing a peer connected on a reactor. @@ -99,6 +101,11 @@ type peer struct { // User data Data *cmn.CMap + + metrics *Metrics + + metricsTicker *time.Ticker + quitChan chan bool } func newPeer( @@ -108,12 +115,16 @@ func newPeer( reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), + metrics *Metrics, ) *peer { p := &peer{ - peerConn: pc, - nodeInfo: nodeInfo, - channels: nodeInfo.Channels, - Data: cmn.NewCMap(), + peerConn: pc, + nodeInfo: nodeInfo, + channels: nodeInfo.Channels, + Data: cmn.NewCMap(), + metrics: metrics, + metricsTicker: time.NewTicker(MetricsTickerDuration), + quitChan: make(chan bool), } p.mconn = createMConnection( @@ -143,12 +154,36 @@ func (p *peer) OnStart() error { if err := p.BaseService.OnStart(); err != nil { return err } + err := p.mconn.Start() + if err != nil { return err } + go func() { + for { + select { + case <-p.metricsTicker.C: + status := p.mconn.Status() + var sendQueueSize float64 + for _, chStatus := range status.Channels { + sendQueueSize += float64(chStatus.SendQueueSize) + } + + p.metrics.PeerPendingSendBytes.With("peer-id", string(p.ID())).Set(sendQueueSize) + case <-p.quitChan: + return + } + } + }() + + return nil +} + // OnStop implements BaseService. func (p *peer) OnStop() { + p.metricsTicker.Stop() + p.quitChan <- true p.BaseService.OnStop() p.mconn.Stop() // stop everything and close the conn } @@ -200,7 +235,11 @@ func (p *peer) Send(chID byte, msgBytes []byte) bool { } else if !p.hasChannel(chID) { return false } - return p.mconn.Send(chID, msgBytes) + res := p.mconn.Send(chID, msgBytes) + if res { + p.metrics.PeerSendBytesTotal.With("peer-id", string(p.ID())).Add(float64(len(msgBytes))) + } + return res } // TrySend msg bytes to the channel identified by chID byte. Immediately returns @@ -211,7 +250,11 @@ func (p *peer) TrySend(chID byte, msgBytes []byte) bool { } else if !p.hasChannel(chID) { return false } - return p.mconn.TrySend(chID, msgBytes) + res := p.mconn.TrySend(chID, msgBytes) + if res { + p.metrics.PeerSendBytesTotal.With("peer-id", string(p.ID())).Add(float64(len(msgBytes))) + } + return res } // Get the data for a given key. @@ -333,6 +376,7 @@ func createMConnection( // which does onPeerError. panic(fmt.Sprintf("Unknown channel %X", chID)) } + p.metrics.PeerReceiveBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes))) reactor.Receive(chID, p, msgBytes) } diff --git a/p2p/peer_test.go b/p2p/peer_test.go index a2a2946a13a..cd6b5fe021a 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -92,7 +92,7 @@ func createOutboundPeerAndPerformHandshake( return nil, err } - p := newPeer(pc, mConfig, nodeInfo, reactorsByCh, chDescs, func(p Peer, r interface{}) {}) + p := newPeer(pc, mConfig, nodeInfo, reactorsByCh, chDescs, func(p Peer, r interface{}) {}, NopMetrics()) p.SetLogger(log.TestingLogger().With("peer", addr)) return p, nil }