Skip to content

Commit

Permalink
get all the pubsub RawTracer message metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
cce committed Jul 31, 2024
1 parent 5977cca commit 686d9a2
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
28 changes: 20 additions & 8 deletions network/p2p/pubsubTracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ import (

var _ = pubsub.RawTracer(pubsubTracer{})

var transactionMessagesDroppedFromBacklogP2P = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklogP2P)
var transactionMessagesDupRawMsg = metrics.MakeCounter(metrics.TransactionMessagesDupRawMsg)
var transactionMessagesP2PRejectMessage = metrics.NewTagCounter(metrics.TransactionMessagesP2PRejectMessage.Name, metrics.TransactionMessagesP2PRejectMessage.Description)
var transactionMessagesP2PDuplicateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDuplicateMessage)
var transactionMessagesP2PDeliverMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDeliverMessage)
var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PUndeliverableMessage)
var transactionMessagesP2PValidateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PValidateMessage)

// pubsubTracer is a tracer for pubsub events used to track metrics.
type pubsubTracer struct{}
Expand All @@ -51,22 +54,29 @@ func (t pubsubTracer) Graft(p peer.ID, topic string) {}
func (t pubsubTracer) Prune(p peer.ID, topic string) {}

// ValidateMessage is invoked when a message first enters the validation pipeline.
func (t pubsubTracer) ValidateMessage(msg *pubsub.Message) {}
func (t pubsubTracer) ValidateMessage(msg *pubsub.Message) {
transactionMessagesP2PValidateMessage.Inc(nil)
}

// DeliverMessage is invoked when a message is delivered
func (t pubsubTracer) DeliverMessage(msg *pubsub.Message) {}
func (t pubsubTracer) DeliverMessage(msg *pubsub.Message) {
transactionMessagesP2PDeliverMessage.Inc(nil)
}

// RejectMessage is invoked when a message is Rejected or Ignored.
// The reason argument can be one of the named strings Reject*.
func (t pubsubTracer) RejectMessage(msg *pubsub.Message, reason string) {
if reason == pubsub.RejectValidationThrottled || reason == pubsub.RejectValidationQueueFull {
transactionMessagesDroppedFromBacklogP2P.Inc(nil)
switch reason {
case pubsub.RejectValidationThrottled, pubsub.RejectValidationQueueFull, pubsub.RejectValidationFailed, pubsub.RejectValidationIgnored:
transactionMessagesP2PRejectMessage.Add(reason, 1)
default:
transactionMessagesP2PRejectMessage.Add("other", 1)
}
}

// DuplicateMessage is invoked when a duplicate message is dropped.
func (t pubsubTracer) DuplicateMessage(msg *pubsub.Message) {
transactionMessagesDupRawMsg.Inc(nil)
transactionMessagesP2PDuplicateMessage.Inc(nil)
}

// ThrottlePeer is invoked when a peer is throttled by the peer gater.
Expand All @@ -83,4 +93,6 @@ func (t pubsubTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {}

// UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and
// the pressure release mechanism trigger, dropping messages.
func (t pubsubTracer) UndeliverableMessage(msg *pubsub.Message) {}
func (t pubsubTracer) UndeliverableMessage(msg *pubsub.Message) {
transactionMessagesP2PUnderdeliverableMessage.Inc(nil)
}
13 changes: 11 additions & 2 deletions util/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ var (
TransactionMessagesHandled = MetricName{Name: "algod_transaction_messages_handled", Description: "Number of transaction messages handled"}
// TransactionMessagesDroppedFromBacklog "Number of transaction messages dropped from backlog"
TransactionMessagesDroppedFromBacklog = MetricName{Name: "algod_transaction_messages_dropped_backlog", Description: "Number of transaction messages dropped from backlog"}
// TransactionMessagesDroppedFromBacklogP2P "Number of transaction messages throttled with p2p pubsub"
TransactionMessagesDroppedFromBacklogP2P = MetricName{Name: "algod_transaction_messages_dropped_backlog_p2p", Description: "Number of transaction messages throttled with p2p pubsub"}
// TransactionMessagesDroppedFromPool "Number of transaction messages dropped from pool"
TransactionMessagesDroppedFromPool = MetricName{Name: "algod_transaction_messages_dropped_pool", Description: "Number of transaction messages dropped from pool"}
// TransactionMessagesAlreadyCommitted "Number of duplicate or error transaction messages before placing into a backlog"
Expand Down Expand Up @@ -130,6 +128,17 @@ var (
// TransactionMessagesBacklogSize "Number of transaction messages in the TX handler backlog queue"
TransactionMessagesBacklogSize = MetricName{Name: "algod_transaction_messages_backlog_size", Description: "Number of transaction messages in the TX handler backlog queue"}

// TransactionMessagesP2PRejectMessage "Number of rejected p2p pubsub transaction messages"
TransactionMessagesP2PRejectMessage = MetricName{Name: "algod_transaction_messages_p2p_reject", Description: "Number of rejected p2p pubsub transaction messages"}
// TransactionMessagesP2PDuplicateMessage "Number of duplicate p2p pubsub transaction messages"}
TransactionMessagesP2PDuplicateMessage = MetricName{Name: "algod_transaction_messages_p2p_duplicate", Description: "Number of duplicate p2p pubsub transaction messages"}
// TransactionMessagesP2PDeliverMessage "Number of delivered p2p pubsub transaction messages"
TransactionMessagesP2PDeliverMessage = MetricName{Name: "algod_transaction_messages_p2p_delivered", Description: "Number of delivered p2p pubsub transaction messages"}
// TransactionMessagesP2PUndeliverableMessage "Number of undeliverable p2p pubsub transaction messages"
TransactionMessagesP2PUndeliverableMessage = MetricName{Name: "algod_transaction_messages_p2p_undeliverable", Description: "Number of undeliverable p2p pubsub transaction messages"}
// TransactionMessagesP2PValidateMessage "Number of p2p pubsub transaction messages received for validation"
TransactionMessagesP2PValidateMessage = MetricName{Name: "algod_transaction_messages_p2p_validate", Description: "Number of p2p pubsub transaction messages received for validation"}

// TransactionGroupTxSyncHandled "Number of transaction groups handled via txsync"
TransactionGroupTxSyncHandled = MetricName{Name: "algod_transaction_group_txsync_handled", Description: "Number of transaction groups handled via txsync"}
// TransactionGroupTxSyncRemember "Number of transaction groups remembered via txsync"
Expand Down

0 comments on commit 686d9a2

Please sign in to comment.