diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a260b1e..eaa4b02b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## Unreleased +### Changes +- ([\#67](https://github.com/forbole/juno/pull/67)) Added support for concurrent transaction handling + ## v3.2.1 ### Changes - ([\#68](https://github.com/forbole/juno/pull/68)) Added `chain_id` label to prometheus to improve alert monitoring diff --git a/parser/worker.go b/parser/worker.go index 5e30fef9..0e8c3698 100644 --- a/parser/worker.go +++ b/parser/worker.go @@ -262,44 +262,70 @@ func (w Worker) ExportCommit(commit *tmtypes.Commit, vals *tmctypes.ResultValida return nil } +// saveTx accepts the transaction and persists it inside the database. +// An error is returned if the write fails. +func (w Worker) saveTx(tx *types.Tx) error { + err := w.db.SaveTx(tx) + if err != nil { + return fmt.Errorf("failed to handle transaction with hash %s: %s", tx.TxHash, err) + } + return nil +} + +// handleTx accepts the transaction and calls the tx handlers. +func (w Worker) handleTx(tx *types.Tx) { + // Call the tx handlers + for _, module := range w.modules { + if transactionModule, ok := module.(modules.TransactionModule); ok { + err := transactionModule.HandleTx(tx) + if err != nil { + w.logger.TxError(module, tx, err) + } + } + } +} + +// handleMessage accepts the transaction and handles messages contained +// inside the transaction. +func (w Worker) handleMessage(index int, msg sdk.Msg, tx *types.Tx) { + for _, module := range w.modules { + if messageModule, ok := module.(modules.MessageModule); ok { + err := messageModule.HandleMsg(index, msg, tx) + if err != nil { + w.logger.MsgError(module, tx, msg, err) + } + } + } +} + // ExportTxs accepts a slice of transactions and persists then inside the database. // An error is returned if the write fails. func (w Worker) ExportTxs(txs []*types.Tx) error { - // Handle all the transactions inside the block + // handle all transactions inside the block for _, tx := range txs { - // Save the transaction itself - err := w.db.SaveTx(tx) + // save the transaction + err := w.saveTx(tx) if err != nil { - return fmt.Errorf("failed to handle transaction with hash %s: %s", tx.TxHash, err) + return fmt.Errorf("error while storing txs: %s", err) } - // Call the tx handlers - for _, module := range w.modules { - if transactionModule, ok := module.(modules.TransactionModule); ok { - err = transactionModule.HandleTx(tx) - if err != nil { - w.logger.TxError(module, tx, err) - } - } - } + // call the tx handlers + go w.handleTx(tx) - // Handle all the messages contained inside the transaction + // handle all messages contained inside the transaction + sdkMsgs := make([]sdk.Msg, len(tx.Body.Messages)) for i, msg := range tx.Body.Messages { var stdMsg sdk.Msg - err = w.codec.UnpackAny(msg, &stdMsg) + err := w.codec.UnpackAny(msg, &stdMsg) if err != nil { - return fmt.Errorf("error while unpacking message: %s", err) + return err } + sdkMsgs[i] = stdMsg + } - // Call the handlers - for _, module := range w.modules { - if messageModule, ok := module.(modules.MessageModule); ok { - err = messageModule.HandleMsg(i, stdMsg, tx) - if err != nil { - w.logger.MsgError(module, tx, stdMsg, err) - } - } - } + // call the msg handlers + for i, sdkMsg := range sdkMsgs { + go w.handleMessage(i, sdkMsg, tx) } }