Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support concurrent transaction handling #67

Merged
merged 16 commits into from
Jun 21, 2022
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## Unreleased
### Changes
- ([\#67](https://github.com/forbole/juno/pull/67)) Added support for concurrent transaction handling

## v3.2.0
### Changes
- ([\#61](https://github.com/forbole/juno/pull/61)) Updated v3 migration code to handle database users names with a hyphen
Expand Down
89 changes: 62 additions & 27 deletions parser/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package parser
import (
"encoding/json"
"fmt"
"sync"

"github.com/forbole/juno/v3/logging"

Expand Down Expand Up @@ -258,46 +259,80 @@ func (w Worker) ExportCommit(commit *tmtypes.Commit, vals *tmctypes.ResultValida
return nil
}

// ExportTxs accepts a slice of transactions and persists then inside the database.
// SaveTxs accepts the transaction and persists it inside the database.
// An error is returned if the write fails.
func (w Worker) ExportTxs(txs []*types.Tx) error {
// Handle all the transactions inside the block
for _, tx := range txs {
// Save the transaction itself
err := w.db.SaveTx(tx)
func (w Worker) SaveTxs(tx *types.Tx, wg *sync.WaitGroup) error {
defer wg.Done()
err := w.db.SaveTx(tx)
if err != nil {
return fmt.Errorf("failed to handle transaction with hash %s: %s", tx.TxHash, err)
}
return nil
}

// HandleTxs accepts the transaction and calls the tx handlers.
func (w Worker) HandleTxs(tx *types.Tx, wg *sync.WaitGroup) {
defer wg.Done()

// Call the tx handlers
for _, module := range w.modules {
if transactionModule, ok := module.(modules.TransactionModule); ok {
err := transactionModule.HandleTx(tx)
if err != nil {
w.logger.TxError(module, tx, err)
}
}
}
}

// HandleMessages accepts the transaction and handles messages contained
// inside the transaction. An error is returned if the message unpacking
// or calling handlers fails.
func (w Worker) HandleMessages(tx *types.Tx, wg *sync.WaitGroup) error {
defer wg.Done()

// Handle all the messages contained inside the transaction
for i, msg := range tx.Body.Messages {
var stdMsg sdk.Msg
err := w.codec.UnpackAny(msg, &stdMsg)
if err != nil {
return fmt.Errorf("failed to handle transaction with hash %s: %s", tx.TxHash, err)
return fmt.Errorf("error while unpacking message: %s", err)
}

// Call the tx handlers
// Call the handlers
for _, module := range w.modules {
if transactionModule, ok := module.(modules.TransactionModule); ok {
err = transactionModule.HandleTx(tx)
if messageModule, ok := module.(modules.MessageModule); ok {
err = messageModule.HandleMsg(i, stdMsg, tx)
if err != nil {
w.logger.TxError(module, tx, err)
w.logger.MsgError(module, tx, stdMsg, err)
}
}
}
}
return nil
}

// Handle all the messages contained inside the transaction
for i, msg := range tx.Body.Messages {
var stdMsg sdk.Msg
err = w.codec.UnpackAny(msg, &stdMsg)
if err != nil {
return fmt.Errorf("error while unpacking message: %s", err)
}
// ExportTxs accepts a slice of transactions and persists then inside the database.
// An error is returned if the write fails.
func (w Worker) ExportTxs(txs []*types.Tx) error {

// Call the handlers
for _, module := range w.modules {
if messageModule, ok := module.(modules.MessageModule); ok {
err = messageModule.HandleMsg(i, stdMsg, tx)
if err != nil {
w.logger.MsgError(module, tx, stdMsg, err)
}
}
}
var wg sync.WaitGroup
for _, tx := range txs {
wg.Add(3)
MonikaCat marked this conversation as resolved.
Show resolved Hide resolved

go w.HandleTxs(tx, &wg)

err := w.SaveTxs(tx, &wg)
if err != nil {
return fmt.Errorf("error while exporting txs: %s", err)
}
err = w.HandleMessages(tx, &wg)
if err != nil {
return fmt.Errorf("error while exporting txs: %s", err)
}
}

wg.Wait()

return nil
}