Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: handle txns in pubsub validator #6070

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 39 additions & 19 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
appLimiter *appRateLimiter
appLimiterBacklogThreshold int

// batchVerifier provides synchronous verification of transaction groups
// batchVerifier provides synchronous verification of transaction groups, used only by pubsub validation in validateIncomingTxMessage.
batchVerifier verify.TxnGroupBatchSigVerifier
}

Expand Down Expand Up @@ -216,7 +216,7 @@
var err0 error
handler.batchVerifier, err0 = verify.MakeSigVerifier(handler.ledger, handler.ledger.VerifiedTransactionCache())
if err0 != nil {
return nil, err0

Check warning on line 219 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L219

Added line #L219 was not covered by tests
}

// prepare the transaction stream verifier
Expand Down Expand Up @@ -256,7 +256,6 @@
})

// libp2p pubsub validator and handler abstracted as TaggedMessageProcessor
// TODO: rename to validators
handler.net.RegisterValidatorHandlers([]network.TaggedMessageValidatorHandler{
{
Tag: protocol.TxnTag,
Expand Down Expand Up @@ -559,7 +558,7 @@

// dedupCanonical checks if the transaction group has been seen before after reencoding to canonical representation.
// returns a key used for insertion if the group was not found.
func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.SignedTxn, consumed int) (key *crypto.Digest, isDup bool) {
func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.SignedTxn, consumed int) (key *crypto.Digest, reencoded []byte, isDup bool) {
// consider situations where someone want to censor transactions A
// 1. Txn A is not part of a group => txn A with a valid signature is OK
// Censorship attempts are:
Expand All @@ -576,14 +575,16 @@
// - using individual txn from a group: {A, Z} could be poisoned by {A, B}, where B is invalid

var d crypto.Digest
var reencodedBuf []byte
ntx := len(unverifiedTxGroup)
if ntx == 1 {
// a single transaction => cache/dedup canonical txn with its signature
enc := unverifiedTxGroup[0].MarshalMsg(nil)
d = crypto.Hash(enc)
if handler.txCanonicalCache.CheckAndPut(&d) {
return nil, true
return nil, nil, true
}
reencodedBuf = enc
} else {
// a transaction group => cache/dedup the entire group canonical group
encodeBuf := make([]byte, 0, unverifiedTxGroup[0].Msgsize()*ntx)
Expand All @@ -594,14 +595,15 @@
// reallocated, some assumption on size was wrong
// log and skip
logging.Base().Warnf("Decoded size %d does not match to encoded %d", consumed, len(encodeBuf))
return nil, false
return nil, nil, false

Check warning on line 598 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L598

Added line #L598 was not covered by tests
}
d = crypto.Hash(encodeBuf)
if handler.txCanonicalCache.CheckAndPut(&d) {
return nil, true
return nil, nil, true
}
reencodedBuf = encodeBuf
}
return &d, false
return &d, reencodedBuf, false
}

// incomingMsgDupCheck runs the duplicate check on a raw incoming message.
Expand Down Expand Up @@ -696,28 +698,32 @@
return unverifiedTxGroup, consumed, false
}

// incomingTxGroupDupRateLimit checks
// - if the incoming transaction group has been seen before after reencoding to canonical representation, and
// - if the sender is rate limited by the per-application rate limiter.
func (handler *TxHandler) incomingTxGroupDupRateLimit(unverifiedTxGroup []transactions.SignedTxn, encodedExpectedSize int, sender network.DisconnectablePeer) (*crypto.Digest, bool) {
// incomingTxGroupDupRateLimit checks if the incoming transaction group has been seen before after reencoding to canonical representation.
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
// It also return canonical representation of the transaction group allowing the caller to compare it with the input.
func (handler *TxHandler) incomingTxGroupCanonicalDedup(unverifiedTxGroup []transactions.SignedTxn, encodedExpectedSize int) (*crypto.Digest, []byte, bool) {
var canonicalKey *crypto.Digest
var reencoded []byte
if handler.txCanonicalCache != nil {
var isDup bool
if canonicalKey, isDup = handler.dedupCanonical(unverifiedTxGroup, encodedExpectedSize); isDup {
if canonicalKey, reencoded, isDup = handler.dedupCanonical(unverifiedTxGroup, encodedExpectedSize); isDup {
transactionMessagesDupCanonical.Inc(nil)
return canonicalKey, true
return nil, nil, true
}
}
return canonicalKey, reencoded, false
}

// incomingTxGroupAppRateLimit checks if the sender is rate limited by the per-application rate limiter.
func (handler *TxHandler) incomingTxGroupAppRateLimit(unverifiedTxGroup []transactions.SignedTxn, sender network.DisconnectablePeer) bool {
// rate limit per application in a group. Limiting any app in a group drops the entire message.
if handler.appLimiter != nil {
congestedARL := len(handler.backlogQueue) > handler.appLimiterBacklogThreshold
if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, sender.(network.IPAddressable).RoutingAddr()) {
transactionMessagesAppLimiterDrop.Inc(nil)
return canonicalKey, true
return true

Check warning on line 723 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L723

Added line #L723 was not covered by tests
}
}
return canonicalKey, false
return false
}

// processIncomingTxn decodes a transaction group from incoming message and enqueues into the back log for processing.
Expand Down Expand Up @@ -753,13 +759,17 @@
return network.OutgoingMessage{Action: network.Disconnect}
}

canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender)
canonicalKey, _, drop := handler.incomingTxGroupCanonicalDedup(unverifiedTxGroup, consumed)
if drop {
// this re-serialized txgroup was detected as a duplicate by the canonical message cache,
// or it was rate-limited by the per-app rate limiter
return network.OutgoingMessage{Action: network.Ignore}
}

if handler.incomingTxGroupAppRateLimit(unverifiedTxGroup, rawmsg.Sender) {
return network.OutgoingMessage{Action: network.Ignore}

Check warning on line 770 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L770

Added line #L770 was not covered by tests
}

select {
case handler.backlogQueue <- &txBacklogMsg{
rawmsg: &rawmsg,
Expand All @@ -785,7 +795,7 @@
func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) network.OutgoingMessage {
msgKey, isDup := handler.incomingMsgDupCheck(rawmsg.Data)
if isDup {
return network.OutgoingMessage{Action: network.Ignore}

Check warning on line 798 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L798

Added line #L798 was not covered by tests
}

unverifiedTxGroup, consumed, invalid := decodeMsg(rawmsg.Data)
Expand All @@ -794,11 +804,21 @@
return network.OutgoingMessage{Action: network.Disconnect}
}

canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender)
canonicalKey, reencoded, drop := handler.incomingTxGroupCanonicalDedup(unverifiedTxGroup, consumed)
if drop {
// this re-serialized txgroup was detected as a duplicate by the canonical message cache,
// or it was rate-limited by the per-app rate limiter
return network.OutgoingMessage{Action: network.Ignore}

Check warning on line 809 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L809

Added line #L809 was not covered by tests
}
if reencoded == nil {
reencoded = reencode(unverifiedTxGroup)
}

if !bytes.Equal(rawmsg.Data, reencoded) {
// ignore non-canonically encoded messages
return network.OutgoingMessage{Action: network.Ignore}
cce marked this conversation as resolved.
Show resolved Hide resolved
}

if handler.incomingTxGroupAppRateLimit(unverifiedTxGroup, rawmsg.Sender) {
return network.OutgoingMessage{Action: network.Ignore}

Check warning on line 821 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L821

Added line #L821 was not covered by tests
}

// apply backlog worker logic
Expand All @@ -812,9 +832,9 @@
}

if handler.checkAlreadyCommitted(wi) {
transactionMessagesAlreadyCommitted.Inc(nil)
return network.OutgoingMessage{
Action: network.Ignore,

Check warning on line 837 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L835-L837

Added lines #L835 - L837 were not covered by tests
}
}

Expand All @@ -831,10 +851,10 @@
// save the transaction, if it has high enough fee and not already in the cache
err = handler.txPool.Remember(verifiedTxGroup)
if err != nil {
handler.rememberReportErrors(err)
logging.Base().Debugf("could not remember tx: %v", err)
return network.OutgoingMessage{
Action: network.Ignore,

Check warning on line 857 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L854-L857

Added lines #L854 - L857 were not covered by tests
}
}

Expand All @@ -843,7 +863,7 @@
// if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions.
err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup)
if err != nil {
logging.Base().Infof("unable to pin transaction: %v", err)

Check warning on line 866 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L866

Added line #L866 was not covered by tests
}
return network.OutgoingMessage{
cce marked this conversation as resolved.
Show resolved Hide resolved
Action: network.Accept,
Expand Down
97 changes: 62 additions & 35 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,42 +646,42 @@ func TestTxHandlerProcessIncomingGroup(t *testing.T) {
}
}

func craftNonCanonical(t *testing.T, stxn *transactions.SignedTxn, blobStxn []byte) []byte {
// make non-canonical encoding and ensure it is not accepted
stxnNonCanTxn := transactions.SignedTxn{Txn: stxn.Txn}
blobTxn := protocol.Encode(&stxnNonCanTxn)
stxnNonCanAuthAddr := transactions.SignedTxn{AuthAddr: stxn.AuthAddr}
blobAuthAddr := protocol.Encode(&stxnNonCanAuthAddr)
stxnNonCanAuthSig := transactions.SignedTxn{Sig: stxn.Sig}
blobSig := protocol.Encode(&stxnNonCanAuthSig)

if blobStxn == nil {
blobStxn = protocol.Encode(stxn)
}

// double check our skills for transactions.SignedTxn creation by creating a new canonical encoding and comparing to the original
blobValidation := make([]byte, 0, len(blobTxn)+len(blobAuthAddr)+len(blobSig))
blobValidation = append(blobValidation[:], blobAuthAddr...)
blobValidation = append(blobValidation[:], blobSig[1:]...) // cut transactions.SignedTxn's field count
blobValidation = append(blobValidation[:], blobTxn[1:]...) // cut transactions.SignedTxn's field count
blobValidation[0] += 2 // increase field count
require.Equal(t, blobStxn, blobValidation)

// craft non-canonical
blobNonCan := make([]byte, 0, len(blobTxn)+len(blobAuthAddr)+len(blobSig))
blobNonCan = append(blobNonCan[:], blobTxn...)
blobNonCan = append(blobNonCan[:], blobAuthAddr[1:]...) // cut transactions.SignedTxn's field count
blobNonCan = append(blobNonCan[:], blobSig[1:]...) // cut transactions.SignedTxn's field count
blobNonCan[0] += 2 // increase field count
require.Len(t, blobNonCan, len(blobStxn))
require.NotEqual(t, blobStxn, blobNonCan)
return blobNonCan
}

func TestTxHandlerProcessIncomingCensoring(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

craftNonCanonical := func(t *testing.T, stxn *transactions.SignedTxn, blobStxn []byte) []byte {
// make non-canonical encoding and ensure it is not accepted
stxnNonCanTxn := transactions.SignedTxn{Txn: stxn.Txn}
blobTxn := protocol.Encode(&stxnNonCanTxn)
stxnNonCanAuthAddr := transactions.SignedTxn{AuthAddr: stxn.AuthAddr}
blobAuthAddr := protocol.Encode(&stxnNonCanAuthAddr)
stxnNonCanAuthSig := transactions.SignedTxn{Sig: stxn.Sig}
blobSig := protocol.Encode(&stxnNonCanAuthSig)

if blobStxn == nil {
blobStxn = protocol.Encode(stxn)
}

// double check our skills for transactions.SignedTxn creation by creating a new canonical encoding and comparing to the original
blobValidation := make([]byte, 0, len(blobTxn)+len(blobAuthAddr)+len(blobSig))
blobValidation = append(blobValidation[:], blobAuthAddr...)
blobValidation = append(blobValidation[:], blobSig[1:]...) // cut transactions.SignedTxn's field count
blobValidation = append(blobValidation[:], blobTxn[1:]...) // cut transactions.SignedTxn's field count
blobValidation[0] += 2 // increase field count
require.Equal(t, blobStxn, blobValidation)

// craft non-canonical
blobNonCan := make([]byte, 0, len(blobTxn)+len(blobAuthAddr)+len(blobSig))
blobNonCan = append(blobNonCan[:], blobTxn...)
blobNonCan = append(blobNonCan[:], blobAuthAddr[1:]...) // cut transactions.SignedTxn's field count
blobNonCan = append(blobNonCan[:], blobSig[1:]...) // cut transactions.SignedTxn's field count
blobNonCan[0] += 2 // increase field count
require.Len(t, blobNonCan, len(blobStxn))
require.NotEqual(t, blobStxn, blobNonCan)
return blobNonCan
}

forgeSig := func(t *testing.T, stxn *transactions.SignedTxn, blobStxn []byte) (transactions.SignedTxn, []byte) {
stxnForged := *stxn
crypto.RandBytes(stxnForged.Sig[:])
Expand Down Expand Up @@ -2750,8 +2750,6 @@ func TestTxHandlerCapGuard(t *testing.T) {
}

func TestTxHandlerValidateIncomingTxMessage(t *testing.T) {
partitiontest.PartitionTest(t)

partitiontest.PartitionTest(t)
t.Parallel()

Expand All @@ -2777,8 +2775,16 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) {
outmsg := handler.validateIncomingTxMessage(network.IncomingMessage{Data: blob})
require.Equal(t, outmsg.Action, network.Accept)

// non-canonical message
// for some reason craftNonCanonical cannot handle makeTxns output so make a simpler random txn
stxns, blob := makeRandomTransactions(1)
stxn := stxns[0]
blobNonCan := craftNonCanonical(t, &stxn, blob)
outmsg = handler.validateIncomingTxMessage(network.IncomingMessage{Data: blobNonCan})
require.Equal(t, outmsg.Action, network.Ignore)

// invalid signature
stxns, _ := makeTxns(addresses, secrets, 1, 2, genesisHash)
stxns, _ = makeTxns(addresses, secrets, 1, 2, genesisHash)
stxns[0].Sig[0] = stxns[0].Sig[0] + 1
blob2 := protocol.Encode(&stxns[0])
outmsg = handler.validateIncomingTxMessage(network.IncomingMessage{Data: blob2})
Expand All @@ -2789,4 +2795,25 @@ func TestTxHandlerValidateIncomingTxMessage(t *testing.T) {
blob[0] = blob[0] + 1
outmsg = handler.validateIncomingTxMessage(network.IncomingMessage{Data: blob})
require.Equal(t, outmsg.Action, network.Disconnect)

t.Run("with-canonical", func(t *testing.T) {
// make sure the reencoding from the canonical dedup checker's reencoding buf is correctly reused
cfg.TxIncomingFilteringFlags = 2
require.True(t, cfg.TxFilterCanonicalEnabled())
handler, err := makeTestTxHandler(ledger, cfg)
require.NoError(t, err)

// valid message
_, blob := makeTxns(addresses, secrets, 1, 2, genesisHash)
outmsg := handler.validateIncomingTxMessage(network.IncomingMessage{Data: blob})
require.Equal(t, outmsg.Action, network.Accept)

// non-canonical message
// for some reason craftNonCanonical cannot handle makeTxns output so make a simpler random txn
stxns, blob := makeRandomTransactions(1)
stxn := stxns[0]
blobNonCan := craftNonCanonical(t, &stxn, blob)
outmsg = handler.validateIncomingTxMessage(network.IncomingMessage{Data: blobNonCan})
require.Equal(t, outmsg.Action, network.Ignore)
})
}
28 changes: 20 additions & 8 deletions network/p2p/pubsubTracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,61 +26,73 @@

var _ = pubsub.RawTracer(pubsubTracer{})
jasonpaulos marked this conversation as resolved.
Show resolved Hide resolved

var transactionMessagesDroppedFromBacklogP2P = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklogP2P)
var transactionMessagesDupRawMsg = metrics.MakeCounter(metrics.TransactionMessagesDupRawMsg)
var transactionMessagesP2PRejectMessage = metrics.NewTagCounter(metrics.TransactionMessagesP2PRejectMessage.Name, metrics.TransactionMessagesP2PRejectMessage.Description)
var transactionMessagesP2PDuplicateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDuplicateMessage)
var transactionMessagesP2PDeliverMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDeliverMessage)
var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PUndeliverableMessage)
var transactionMessagesP2PValidateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PValidateMessage)

// pubsubTracer is a tracer for pubsub events used to track metrics.
type pubsubTracer struct{}

// AddPeer is invoked when a new peer is added.
func (t pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) {}

Check warning on line 39 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L39

Added line #L39 was not covered by tests

// RemovePeer is invoked when a peer is removed.
func (t pubsubTracer) RemovePeer(p peer.ID) {}

Check warning on line 42 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L42

Added line #L42 was not covered by tests

// Join is invoked when a new topic is joined
func (t pubsubTracer) Join(topic string) {}

Check warning on line 45 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L45

Added line #L45 was not covered by tests

// Leave is invoked when a topic is abandoned
func (t pubsubTracer) Leave(topic string) {}

Check warning on line 48 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L48

Added line #L48 was not covered by tests

// Graft is invoked when a new peer is grafted on the mesh (gossipsub)
func (t pubsubTracer) Graft(p peer.ID, topic string) {}

Check warning on line 51 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L51

Added line #L51 was not covered by tests

// Prune is invoked when a peer is pruned from the message (gossipsub)
func (t pubsubTracer) Prune(p peer.ID, topic string) {}

Check warning on line 54 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L54

Added line #L54 was not covered by tests

// ValidateMessage is invoked when a message first enters the validation pipeline.
func (t pubsubTracer) ValidateMessage(msg *pubsub.Message) {}
func (t pubsubTracer) ValidateMessage(msg *pubsub.Message) {
transactionMessagesP2PValidateMessage.Inc(nil)

Check warning on line 58 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L57-L58

Added lines #L57 - L58 were not covered by tests
}

// DeliverMessage is invoked when a message is delivered
func (t pubsubTracer) DeliverMessage(msg *pubsub.Message) {}
func (t pubsubTracer) DeliverMessage(msg *pubsub.Message) {
transactionMessagesP2PDeliverMessage.Inc(nil)

Check warning on line 63 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L62-L63

Added lines #L62 - L63 were not covered by tests
}

// RejectMessage is invoked when a message is Rejected or Ignored.
// The reason argument can be one of the named strings Reject*.
func (t pubsubTracer) RejectMessage(msg *pubsub.Message, reason string) {
if reason == pubsub.RejectValidationThrottled || reason == pubsub.RejectValidationQueueFull {
transactionMessagesDroppedFromBacklogP2P.Inc(nil)
switch reason {
case pubsub.RejectValidationThrottled, pubsub.RejectValidationQueueFull, pubsub.RejectValidationFailed, pubsub.RejectValidationIgnored:
transactionMessagesP2PRejectMessage.Add(reason, 1)
default:
transactionMessagesP2PRejectMessage.Add("other", 1)

Check warning on line 73 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L68-L73

Added lines #L68 - L73 were not covered by tests
}
}

// DuplicateMessage is invoked when a duplicate message is dropped.
func (t pubsubTracer) DuplicateMessage(msg *pubsub.Message) {
transactionMessagesDupRawMsg.Inc(nil)
transactionMessagesP2PDuplicateMessage.Inc(nil)

Check warning on line 79 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L78-L79

Added lines #L78 - L79 were not covered by tests
}

// ThrottlePeer is invoked when a peer is throttled by the peer gater.
func (t pubsubTracer) ThrottlePeer(p peer.ID) {}

Check warning on line 83 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L83

Added line #L83 was not covered by tests

// RecvRPC is invoked when an incoming RPC is received.
func (t pubsubTracer) RecvRPC(rpc *pubsub.RPC) {}

Check warning on line 86 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L86

Added line #L86 was not covered by tests

// SendRPC is invoked when a RPC is sent.
func (t pubsubTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {}

Check warning on line 89 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L89

Added line #L89 was not covered by tests

// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full.
func (t pubsubTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {}

Check warning on line 92 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L92

Added line #L92 was not covered by tests

// UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and
// the pressure release mechanism trigger, dropping messages.
func (t pubsubTracer) UndeliverableMessage(msg *pubsub.Message) {}
func (t pubsubTracer) UndeliverableMessage(msg *pubsub.Message) {
transactionMessagesP2PUnderdeliverableMessage.Inc(nil)

Check warning on line 97 in network/p2p/pubsubTracer.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsubTracer.go#L96-L97

Added lines #L96 - L97 were not covered by tests
}
13 changes: 11 additions & 2 deletions util/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ var (
TransactionMessagesHandled = MetricName{Name: "algod_transaction_messages_handled", Description: "Number of transaction messages handled"}
// TransactionMessagesDroppedFromBacklog "Number of transaction messages dropped from backlog"
TransactionMessagesDroppedFromBacklog = MetricName{Name: "algod_transaction_messages_dropped_backlog", Description: "Number of transaction messages dropped from backlog"}
// TransactionMessagesDroppedFromBacklogP2P "Number of transaction messages throttled with p2p pubsub"
TransactionMessagesDroppedFromBacklogP2P = MetricName{Name: "algod_transaction_messages_dropped_backlog_p2p", Description: "Number of transaction messages throttled with p2p pubsub"}
// TransactionMessagesDroppedFromPool "Number of transaction messages dropped from pool"
TransactionMessagesDroppedFromPool = MetricName{Name: "algod_transaction_messages_dropped_pool", Description: "Number of transaction messages dropped from pool"}
// TransactionMessagesAlreadyCommitted "Number of duplicate or error transaction messages before placing into a backlog"
Expand Down Expand Up @@ -130,6 +128,17 @@ var (
// TransactionMessagesBacklogSize "Number of transaction messages in the TX handler backlog queue"
TransactionMessagesBacklogSize = MetricName{Name: "algod_transaction_messages_backlog_size", Description: "Number of transaction messages in the TX handler backlog queue"}

// TransactionMessagesP2PRejectMessage "Number of rejected p2p pubsub transaction messages"
TransactionMessagesP2PRejectMessage = MetricName{Name: "algod_transaction_messages_p2p_reject", Description: "Number of rejected p2p pubsub transaction messages"}
// TransactionMessagesP2PDuplicateMessage "Number of duplicate p2p pubsub transaction messages"}
TransactionMessagesP2PDuplicateMessage = MetricName{Name: "algod_transaction_messages_p2p_duplicate", Description: "Number of duplicate p2p pubsub transaction messages"}
// TransactionMessagesP2PDeliverMessage "Number of delivered p2p pubsub transaction messages"
TransactionMessagesP2PDeliverMessage = MetricName{Name: "algod_transaction_messages_p2p_delivered", Description: "Number of delivered p2p pubsub transaction messages"}
// TransactionMessagesP2PUndeliverableMessage "Number of undeliverable p2p pubsub transaction messages"
TransactionMessagesP2PUndeliverableMessage = MetricName{Name: "algod_transaction_messages_p2p_undeliverable", Description: "Number of undeliverable p2p pubsub transaction messages"}
// TransactionMessagesP2PValidateMessage "Number of p2p pubsub transaction messages received for validation"
TransactionMessagesP2PValidateMessage = MetricName{Name: "algod_transaction_messages_p2p_validate", Description: "Number of p2p pubsub transaction messages received for validation"}

// TransactionGroupTxSyncHandled "Number of transaction groups handled via txsync"
TransactionGroupTxSyncHandled = MetricName{Name: "algod_transaction_group_txsync_handled", Description: "Number of transaction groups handled via txsync"}
// TransactionGroupTxSyncRemember "Number of transaction groups remembered via txsync"
Expand Down
Loading