Skip to content

Commit

Permalink
WIP: p2p: handle txns in pubsub validator
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Jul 15, 2024
1 parent 48a539f commit 78fd790
Showing 1 changed file with 88 additions and 27 deletions.
115 changes: 88 additions & 27 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ type TxHandler struct {
erl *util.ElasticRateLimiter
appLimiter *appRateLimiter
appLimiterBacklogThreshold int

batchProcessor execpool.BatchProcessor
streamVerifierDropped2 chan *verify.UnverifiedTxnSigJob
postVerificationQueue2 chan *verify.VerificationResult
}

// TxHandlerOpts is TxHandler configuration options
Expand Down Expand Up @@ -173,6 +177,9 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) {
net: opts.Net,
streamVerifierChan: make(chan execpool.InputJob),
streamVerifierDropped: make(chan *verify.UnverifiedTxnSigJob),

postVerificationQueue2: make(chan *verify.VerificationResult, 1),
streamVerifierDropped2: make(chan *verify.UnverifiedTxnSigJob, 1),
}

if opts.Config.TxFilterRawMsgEnabled() {
Expand Down Expand Up @@ -209,6 +216,14 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) {
}
}

// prepare the batch processor for pubsub synchronous verification
var err0 error
handler.batchProcessor, err0 = verify.MakeSigVerifyJobProcessor(handler.ledger, handler.ledger.VerifiedTransactionCache(),
handler.postVerificationQueue2, handler.streamVerifierDropped2)
if err0 != nil {
return nil, err0

Check warning on line 224 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L224

Added line #L224 was not covered by tests
}

// prepare the transaction stream verifier
var err error
txnElementProcessor, err := verify.MakeSigVerifyJobProcessor(handler.ledger, handler.ledger.VerifiedTransactionCache(),
Expand Down Expand Up @@ -246,6 +261,7 @@ func (handler *TxHandler) Start() {
})

// libp2p pubsub validator and handler abstracted as TaggedMessageProcessor
// TODO: rename to validators
handler.net.RegisterProcessors([]network.TaggedMessageProcessor{
{
Tag: protocol.TxnTag,
Expand Down Expand Up @@ -348,7 +364,7 @@ func (handler *TxHandler) backlogWorker() {
}
continue
}
// handler.streamVerifierChan does not receive if ctx is cancled
// handler.streamVerifierChan does not receive if ctx is cancelled
select {
case handler.streamVerifierChan <- &verify.UnverifiedTxnSigJob{TxnGroup: wi.unverifiedTxGroup, BacklogMessage: wi}:
case <-handler.ctx.Done():
Expand Down Expand Up @@ -799,37 +815,82 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa
return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil}
}

return network.ValidatedMessage{
Action: network.Accept,
Tag: rawmsg.Tag,
ValidatedMessage: &validatedIncomingTxMessage{
rawmsg: rawmsg,
unverifiedTxGroup: unverifiedTxGroup,
msgKey: msgKey,
canonicalKey: canonicalKey,
},
// apply backlog worker logic

wi := &txBacklogMsg{
rawmsg: &rawmsg,
unverifiedTxGroup: unverifiedTxGroup,
rawmsgDataHash: msgKey,
unverifiedTxGroupHash: canonicalKey,
capguard: nil,

Check warning on line 825 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L820-L825

Added lines #L820 - L825 were not covered by tests
}
}

// processIncomingTxMessage is the handler for the MessageProcessor implementation used by P2PNetwork.
func (handler *TxHandler) processIncomingTxMessage(validatedMessage network.ValidatedMessage) network.OutgoingMessage {
msg := validatedMessage.ValidatedMessage.(*validatedIncomingTxMessage)
if handler.checkAlreadyCommitted(wi) {
transactionMessagesAlreadyCommitted.Inc(nil)
return network.ValidatedMessage{
Action: network.Ignore,
ValidatedMessage: nil,

Check warning on line 832 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L828-L832

Added lines #L828 - L832 were not covered by tests
}
}

jobs := []execpool.InputJob{&verify.UnverifiedTxnSigJob{TxnGroup: wi.unverifiedTxGroup, BacklogMessage: wi}}
handler.batchProcessor.ProcessBatch(jobs)

Check warning on line 837 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L836-L837

Added lines #L836 - L837 were not covered by tests

select {
case handler.backlogQueue <- &txBacklogMsg{
rawmsg: &msg.rawmsg,
unverifiedTxGroup: msg.unverifiedTxGroup,
rawmsgDataHash: msg.msgKey,
unverifiedTxGroupHash: msg.canonicalKey,
capguard: nil,
}:
default:
// if we failed here we want to increase the corresponding metric. It might suggest that we
// want to increase the queue size.
transactionMessagesDroppedFromBacklog.Inc(nil)
case wi := <-handler.postVerificationQueue2:
m := wi.BacklogMessage.(*txBacklogMsg)
if wi.Err != nil {
handler.postProcessReportErrors(wi.Err)
logging.Base().Warnf("Received a malformed tx group %v: %v", m.unverifiedTxGroup, wi.Err)
return network.ValidatedMessage{
Action: network.Disconnect,
ValidatedMessage: nil,

Check warning on line 847 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L840-L847

Added lines #L840 - L847 were not covered by tests
}
}
// at this point, we've verified the transaction, so we can safely treat the transaction as a verified transaction.
verifiedTxGroup := m.unverifiedTxGroup

Check warning on line 851 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L851

Added line #L851 was not covered by tests

// additionally, remove the txn from duplicate caches to ensure it can be re-submitted
handler.deleteFromCaches(msg.msgKey, msg.canonicalKey)
// save the transaction, if it has high enough fee and not already in the cache
err := handler.txPool.Remember(verifiedTxGroup)
if err != nil {
handler.rememberReportErrors(err)
logging.Base().Debugf("could not remember tx: %v", err)
return network.ValidatedMessage{
Action: network.Ignore,
ValidatedMessage: nil,

Check warning on line 860 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L854-L860

Added lines #L854 - L860 were not covered by tests
}
}

transactionMessagesRemember.Inc(nil)

Check warning on line 864 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L864

Added line #L864 was not covered by tests

// if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions.
err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup)
if err != nil {
logging.Base().Infof("unable to pin transaction: %v", err)

Check warning on line 869 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L867-L869

Added lines #L867 - L869 were not covered by tests
}
return network.ValidatedMessage{
Action: network.Accept,
ValidatedMessage: nil,

Check warning on line 873 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L871-L873

Added lines #L871 - L873 were not covered by tests
}

case <-handler.streamVerifierDropped2:
transactionMessagesDroppedFromBacklog.Inc(nil)
return network.ValidatedMessage{
Action: network.Ignore,
ValidatedMessage: nil,

Check warning on line 880 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L876-L880

Added lines #L876 - L880 were not covered by tests
}
case <-handler.ctx.Done():
transactionMessagesDroppedFromBacklog.Inc(nil)
return network.ValidatedMessage{
Action: network.Ignore,
ValidatedMessage: nil,

Check warning on line 886 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L882-L886

Added lines #L882 - L886 were not covered by tests
}
}
}

// processIncomingTxMessage is the handler for the MessageProcessor implementation used by P2PNetwork.
func (handler *TxHandler) processIncomingTxMessage(validatedMessage network.ValidatedMessage) network.OutgoingMessage {

Check warning on line 892 in data/txHandler.go

View check run for this annotation

Codecov / codecov/patch

data/txHandler.go#L892

Added line #L892 was not covered by tests
// process is noop, all work is done in validateIncomingTxMessage above
return network.OutgoingMessage{Action: network.Ignore}
}

Expand Down

0 comments on commit 78fd790

Please sign in to comment.