Skip to content

Commit

Permalink
Add additional metrics
Browse files Browse the repository at this point in the history
Continues addressing cosmos/cosmos-sdk#2169.
  • Loading branch information
mslipper committed Sep 27, 2018
1 parent 4c4a95c commit 2a4ff4e
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 19 deletions.
10 changes: 10 additions & 0 deletions consensus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type Metrics struct {
CommittedHeight metrics.Gauge
// Whether or not a node is fast syncing. 1 if yes, 0 if no.
FastSyncing metrics.Gauge

// Number of blockparts transmitted by peer.
BlockParts metrics.Counter
}

// PrometheusMetrics returns Metrics build using Prometheus client library.
Expand Down Expand Up @@ -136,6 +139,12 @@ func PrometheusMetrics(namespace string) *Metrics {
Name: "fast_syncing",
Help: "Whether or not a node is fast syncing. 1 if yes, 0 if no.",
}, []string{}),
BlockParts: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_parts",
Help: "Number of blockparts transmitted by peer.",
}, []string{"peer_id"}),
}
}

Expand All @@ -160,5 +169,6 @@ func NopMetrics() *Metrics {
TotalTxs: discard.NewGauge(),
CommittedHeight: discard.NewGauge(),
FastSyncing: discard.NewGauge(),
BlockParts: discard.NewCounter(),
}
}
6 changes: 3 additions & 3 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)

conR.metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
Expand Down Expand Up @@ -827,12 +827,12 @@ func (conR *ConsensusReactor) peerStatsRoutine() {
case *VoteMessage:
if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 {
conR.Switch.MarkPeerAsGood(peer)
}
}
case *BlockPartMessage:
if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 {
conR.Switch.MarkPeerAsGood(peer)
}
}
}
case <-conR.conS.Quit():
return

Expand Down
4 changes: 3 additions & 1 deletion mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) {
if mem.recheckCursor == nil {
mem.resCbNormal(req, res)
} else {
mem.metrics.RecheckTimes.Add(1)
mem.resCbRecheck(req, res)
}
mem.metrics.Size.Set(float64(mem.Size()))
Expand All @@ -346,11 +347,12 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
}
mem.txs.PushBack(memTx)
mem.logger.Info("Added good transaction", "tx", TxID(tx), "res", r, "total", mem.Size())
mem.metrics.TxSizeBytes.Observe(float64(len(tx)))
mem.notifyTxsAvailable()
} else {
// ignore bad transaction
mem.logger.Info("Rejected bad transaction", "tx", TxID(tx), "res", r)

mem.metrics.FailedTxs.Add(1)
// remove from cache (it might be good later)
mem.cache.Remove(tx)
}
Expand Down
36 changes: 33 additions & 3 deletions mempool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,62 @@ package mempool
import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
prometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

const MetricsSubsytem = "mempool"

// Metrics contains metrics exposed by this package.
// see MetricsProvider for descriptions.
type Metrics struct {
// Size of the mempool.
Size metrics.Gauge
// Histogram of transaction sizes, in bytes.
TxSizeBytes metrics.Histogram
// Number of failed transactions.
FailedTxs metrics.Gauge
// Number of times transactions are rechecked in the mempool.
RecheckTimes metrics.Counter
}

// PrometheusMetrics returns Metrics build using Prometheus client library.
func PrometheusMetrics(namespace string) *Metrics {
return &Metrics{
Size: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "mempool",
Subsystem: MetricsSubsytem,
Name: "size",
Help: "Size of the mempool (number of uncommitted transactions).",
}, []string{}),
TxSizeBytes: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsytem,
Name: "tx_size_bytes",
Help: "Transaction sizes in bytes.",
Buckets: stdprometheus.ExponentialBuckets(1, 2, 10),
}, []string{}),
FailedTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsytem,
Name: "failed_txs",
Help: "Number of failed transactions.",
}, []string{}),
RecheckTimes: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsytem,
Name: "recheck_times",
Help: "Number of times transactions are rechecked in the mempool.",
}, []string{}),
}
}

// NopMetrics returns no-op Metrics.
func NopMetrics() *Metrics {
return &Metrics{
Size: discard.NewGauge(),
Size: discard.NewGauge(),
TxSizeBytes: discard.NewHistogram(),
FailedTxs: discard.NewGauge(),
RecheckTimes: discard.NewCounter(),
}
}
13 changes: 7 additions & 6 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,17 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
}

// MetricsProvider returns a consensus, p2p and mempool Metrics.
type MetricsProvider func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics)
type MetricsProvider func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics)

// DefaultMetricsProvider returns Metrics build using Prometheus client library
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
return func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics) {
return func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) {
if config.Prometheus {
return cs.PrometheusMetrics(config.Namespace), p2p.PrometheusMetrics(config.Namespace), mempl.PrometheusMetrics(config.Namespace)
return cs.PrometheusMetrics(config.Namespace), p2p.PrometheusMetrics(config.Namespace),
mempl.PrometheusMetrics(config.Namespace), sm.PrometheusMetrics(config.Namespace)
}
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics()
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics()
}
}

Expand Down Expand Up @@ -245,7 +246,7 @@ func NewNode(config *cfg.Config,
consensusLogger.Info("This node is not a validator", "addr", privValidator.GetAddress(), "pubKey", privValidator.GetPubKey())
}

csMetrics, p2pMetrics, memplMetrics := metricsProvider()
csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider()

// Make MempoolReactor
mempool := mempl.NewMempool(
Expand Down Expand Up @@ -289,7 +290,7 @@ func NewNode(config *cfg.Config,

blockExecLogger := logger.With("module", "state")
// make block executor for consensus and blockchain reactors to execute blocks
blockExec := sm.NewBlockExecutor(stateDB, blockExecLogger, proxyApp.Consensus(), mempool, evidencePool)
blockExec := sm.NewBlockExecutor(stateDB, blockExecLogger, proxyApp.Consensus(), mempool, evidencePool, sm.BlockExecutorWithMetrics(smMetrics))

// Make BlockchainReactor
bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
Expand Down
12 changes: 11 additions & 1 deletion p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package p2p
import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
prometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -19,6 +19,8 @@ type Metrics struct {
PeerSendBytesTotal metrics.Counter
// Pending bytes to be sent to a given peer.
PeerPendingSendBytes metrics.Gauge
// Number of transactions submitted by each peer.
NumTxs metrics.Gauge
}

// PrometheusMetrics returns Metrics build using Prometheus client library.
Expand Down Expand Up @@ -48,6 +50,13 @@ func PrometheusMetrics(namespace string) *Metrics {
Name: "peer_pending_send_bytes",
Help: "Number of pending bytes to be sent to a given peer.",
}, []string{"peer_id"}),
NumTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "num_txs",
Help: "Number of transactions submitted by each peer.",
}, []string{"peer_id"}),

}
}

Expand All @@ -58,5 +67,6 @@ func NopMetrics() *Metrics {
PeerReceiveBytesTotal: discard.NewCounter(),
PeerSendBytesTotal: discard.NewCounter(),
PeerPendingSendBytes: discard.NewGauge(),
NumTxs: discard.NewGauge(),
}
}
30 changes: 25 additions & 5 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package state

import (
"fmt"
"time"

fail "github.com/ebuchman/fail-test"
abci "github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -33,20 +34,36 @@ type BlockExecutor struct {
evpool EvidencePool

logger log.Logger

metrics *Metrics
}

type BlockExecutorOption func(executor *BlockExecutor)

func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
return func(blockExec *BlockExecutor) {
blockExec.metrics = metrics
}
}

// NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
// Call SetEventBus to provide one.
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
mempool Mempool, evpool EvidencePool) *BlockExecutor {
return &BlockExecutor{
mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor {
res := &BlockExecutor{
db: db,
proxyApp: proxyApp,
eventBus: types.NopEventBus{},
mempool: mempool,
evpool: evpool,
logger: logger,
}

for _, option := range options {
option(res)
}

return res
}

// SetEventBus - sets the event bus for publishing block related events.
Expand Down Expand Up @@ -74,7 +91,10 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b
return state, ErrInvalidBlock(err)
}

startTime := time.Now().UnixNano()
abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block, state.LastValidators, blockExec.db)
endTime := time.Now().UnixNano()
blockExec.metrics.BlockProcessingTime.Observe(float64(endTime - startTime) / 1000000)
if err != nil {
return state, ErrProxyAppConn(err)
}
Expand Down Expand Up @@ -177,7 +197,7 @@ func (blockExec *BlockExecutor) Commit(
// Executes block's transactions on proxyAppConn.
// Returns a list of transaction results and updates to the validator set
func execBlockOnProxyApp(logger log.Logger, proxyAppConn proxy.AppConnConsensus,
block *types.Block, lastValSet *types.ValidatorSet, stateDB dbm.DB) (*ABCIResponses, error) {
block *types.Block, lastValSet *types.ValidatorSet, stateDB dbm.DB) (*ABCIResponses, error) {
var validTxs, invalidTxs = 0, 0

txIndex := 0
Expand Down Expand Up @@ -334,7 +354,7 @@ func updateValidators(currentSet *types.ValidatorSet, abciUpdates []abci.Validat

// updateState returns a new State updated according to the header and responses.
func updateState(state State, blockID types.BlockID, header *types.Header,
abciResponses *ABCIResponses) (State, error) {
abciResponses *ABCIResponses) (State, error) {

// Copy the valset so we can apply changes from EndBlock
// and update s.LastValidators and s.Validators.
Expand Down Expand Up @@ -418,7 +438,7 @@ func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *ty
// ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
// It returns the application root hash (result of abci.Commit).
func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block,
logger log.Logger, lastValSet *types.ValidatorSet, stateDB dbm.DB) ([]byte, error) {
logger log.Logger, lastValSet *types.ValidatorSet, stateDB dbm.DB) ([]byte, error) {
_, err := execBlockOnProxyApp(logger, appConnConsensus, block, lastValSet, stateDB)
if err != nil {
logger.Error("Error executing block on proxy app", "height", block.Height, "err", err)
Expand Down
33 changes: 33 additions & 0 deletions state/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package state

import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"github.com/go-kit/kit/metrics/discard"
)

const MetricsSubsystem = "state"

type Metrics struct {
// Time between BeginBlock and EndBlock.
BlockProcessingTime metrics.Histogram
}

func PrometheusMetrics(namespace string) *Metrics {
return &Metrics{
BlockProcessingTime: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_processing_time",
Help: "Time between BeginBlock and EndBlock.",
Buckets: stdprometheus.LinearBuckets(1, 10, 10),
}, []string{}),
}
}

func NopMetrics() *Metrics {
return &Metrics{
BlockProcessingTime: discard.NewHistogram(),
}
}

0 comments on commit 2a4ff4e

Please sign in to comment.