Skip to content

Commit

Permalink
add pubsub raw tracer for metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Jul 18, 2024
1 parent 614ba2d commit cd340f1
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 1 deletion.
1 change: 1 addition & 0 deletions network/p2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub.
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
// pubsub.WithValidateThrottle(cfg.TxBacklogSize),
pubsub.WithValidateWorkers(incomingThreads),
pubsub.WithRawTracer(pubsubTracer{}),
}

return pubsub.NewGossipSub(ctx, host, options...)
Expand Down
86 changes: 86 additions & 0 deletions network/p2p/pubsubTracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package p2p

import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"

"github.com/algorand/go-algorand/util/metrics"
)

var _ = pubsub.RawTracer(pubsubTracer{})

var transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog)
var transactionMessagesDupRawMsg = metrics.MakeCounter(metrics.TransactionMessagesDupRawMsg)

// pubsubTracer is a tracer for pubsub events used to track metrics.
type pubsubTracer struct{}

// AddPeer is invoked when a new peer is added.
func (t pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) {}

Check warning on line 36 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L36

Added line #L36 was not covered by tests

// RemovePeer is invoked when a peer is removed.
func (t pubsubTracer) RemovePeer(p peer.ID) {}

Check warning on line 39 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L39

Added line #L39 was not covered by tests

// Join is invoked when a new topic is joined
func (t pubsubTracer) Join(topic string) {}

Check warning on line 42 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L42

Added line #L42 was not covered by tests

// Leave is invoked when a topic is abandoned
func (t pubsubTracer) Leave(topic string) {}

Check warning on line 45 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L45

Added line #L45 was not covered by tests

// Graft is invoked when a new peer is grafted on the mesh (gossipsub)
func (t pubsubTracer) Graft(p peer.ID, topic string) {}

Check warning on line 48 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L48

Added line #L48 was not covered by tests

// Prune is invoked when a peer is pruned from the message (gossipsub)
func (t pubsubTracer) Prune(p peer.ID, topic string) {}

Check warning on line 51 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L51

Added line #L51 was not covered by tests

// ValidateMessage is invoked when a message first enters the validation pipeline.
func (t pubsubTracer) ValidateMessage(msg *pubsub.Message) {}

Check warning on line 54 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L54

Added line #L54 was not covered by tests

// DeliverMessage is invoked when a message is delivered
func (t pubsubTracer) DeliverMessage(msg *pubsub.Message) {}

Check warning on line 57 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L57

Added line #L57 was not covered by tests

// RejectMessage is invoked when a message is Rejected or Ignored.
// The reason argument can be one of the named strings Reject*.
func (t pubsubTracer) RejectMessage(msg *pubsub.Message, reason string) {
if reason == pubsub.RejectValidationThrottled || reason == pubsub.RejectValidationQueueFull {
transactionMessagesDroppedFromBacklog.Inc(nil)

Check warning on line 63 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L61-L63

Added lines #L61 - L63 were not covered by tests
}
}

// DuplicateMessage is invoked when a duplicate message is dropped.
func (t pubsubTracer) DuplicateMessage(msg *pubsub.Message) {
transactionMessagesDupRawMsg.Inc(nil)

Check warning on line 69 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L68-L69

Added lines #L68 - L69 were not covered by tests
}

// ThrottlePeer is invoked when a peer is throttled by the peer gater.
func (t pubsubTracer) ThrottlePeer(p peer.ID) {}

Check warning on line 73 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L73

Added line #L73 was not covered by tests

// RecvRPC is invoked when an incoming RPC is received.
func (t pubsubTracer) RecvRPC(rpc *pubsub.RPC) {}

Check warning on line 76 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L76

Added line #L76 was not covered by tests

// SendRPC is invoked when a RPC is sent.
func (t pubsubTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {}

Check warning on line 79 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L79

Added line #L79 was not covered by tests

// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full.
func (t pubsubTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {}

Check warning on line 82 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L82

Added line #L82 was not covered by tests

// UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and
// the pressure release mechanism trigger, dropping messages.
func (t pubsubTracer) UndeliverableMessage(msg *pubsub.Message) {}

Check warning on line 86 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L86

Added line #L86 was not covered by tests
2 changes: 1 addition & 1 deletion network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ var networkIncomingBufferMicros = metrics.MakeCounter(metrics.MetricName{Name: "
var networkHandleMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_handle_micros_total", Description: "microseconds spent by protocol handlers in the receive thread"})

var networkBroadcasts = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_total", Description: "number of broadcast operations"})
var networkBroadcastQueueFull = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_full_total", Description: "number of messages that were drops due to full broadcast queue"})
var networkBroadcastQueueMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_micros_total", Description: "microseconds broadcast requests sit on queue"})
var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_send_micros_total", Description: "microseconds spent broadcasting"})
var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"})
Expand All @@ -135,7 +136,6 @@ var networkPeerAlreadyClosed = metrics.MakeCounter(metrics.MetricName{Name: "alg

var networkSlowPeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_slow_drops_total", Description: "number of peers dropped for being slow to send to"})
var networkIdlePeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_idle_drops_total", Description: "number of peers dropped due to idle connection"})
var networkBroadcastQueueFull = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_full_total", Description: "number of messages that were drops due to full broadcast queue"})

var minPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_min_ping_seconds", Description: "Network round trip time to fastest peer in seconds."})
var meanPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_mean_ping_seconds", Description: "Network round trip time to average peer in seconds."})
Expand Down

0 comments on commit cd340f1

Please sign in to comment.