Skip to content

Commit

Permalink
Add metrics reporting to conduit pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric-Warehime committed Sep 19, 2022
1 parent 04862b0 commit df10036
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
23 changes: 23 additions & 0 deletions conduit/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package conduit
import (
"context"
"fmt"
"github.com/algorand/indexer/util/metrics"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"os"
"path/filepath"
"runtime/pprof"
"sync"
"time"

"github.com/algorand/go-algorand-sdk/encoding/json"
"github.com/algorand/go-algorand/data/basics"
Expand Down Expand Up @@ -274,6 +276,19 @@ func (p *pipelineImpl) Stop() {
}
}

func (p *pipelineImpl) addMetrics(block data.BlockData, importTime time.Duration) {
metrics.BlockImportTimeSeconds.Observe(importTime.Seconds())
metrics.ImportedTxnsPerBlock.Observe(float64(len(block.Payset)))
metrics.ImportedRoundGauge.Set(float64(block.Round()))
txnCountByType := make(map[string]int)
for _, txn := range block.Payset {
txnCountByType[string(txn.Txn.Type)]++
}
for k, v := range txnCountByType {
metrics.ImportedTxns.WithLabelValues(k).Set(float64(v))
}
}

// Start pushes block data through the pipeline
func (p *pipelineImpl) Start() {
p.wg.Add(1)
Expand All @@ -294,6 +309,9 @@ func (p *pipelineImpl) Start() {
p.setError(err)
goto pipelineRun
}
// Start time currently measures operations after block fetching is complete.
// This is for backwards compatibility w/ Indexer's metrics
start := time.Now()
// run through processors
for _, proc := range p.processors {
blkData, err = (*proc).Process(blkData)
Expand All @@ -319,6 +337,11 @@ func (p *pipelineImpl) Start() {
goto pipelineRun
}
}
importTime := time.Since(start)
// Ignore round 0 (which is empty).
if p.round > 0 {
p.addMetrics(blkData, importTime)
}
// Increment Round
p.setError(nil)
p.round++
Expand Down
5 changes: 5 additions & 0 deletions importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"fmt"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/indexer/util/metrics"
"net/url"
"time"

"github.com/algorand/go-algorand-sdk/client/v2/algod"
"github.com/algorand/go-algorand/protocol"
Expand Down Expand Up @@ -120,7 +122,10 @@ func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) {
"r=%d error getting status %d", retries, rnd)
continue
}
start := time.Now()
blockbytes, err = algodImp.aclient.BlockRaw(rnd).Do(algodImp.ctx)
dt := time.Since(start)
metrics.GetAlgodRawBlockTimeSeconds.Observe(dt.Seconds())
if err != nil {
return blk, err
}
Expand Down

0 comments on commit df10036

Please sign in to comment.