Skip to content

Commit

Permalink
parallelise epoch processing subtasks (#12191)
Browse files Browse the repository at this point in the history
Parallelising some tasks by using worker pool. 

Worker pool will save (last)error if any task returns it and we return
it by `Error()` call after closing worker pool

we do:

`wp := CreateWorkerPool(runtime.NumCPU())`
`wp.AddWork(func() error { some work })`
`wp.WaitAndClose()`
`return wp.Error()`

---------

Co-authored-by: shota.silagadze <[email protected]>
Co-authored-by: Giulio <[email protected]>
  • Loading branch information
3 people authored Oct 8, 2024
1 parent 4a28b7f commit e6a0caa
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 23 deletions.
1 change: 0 additions & 1 deletion cl/transition/impl/eth2/statechange/process_epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func ProcessEpoch(s abstract.BeaconState) error {
}
monitor.ObserveProcessJustificationBitsAndFinalityTime(start)
// fmt.Println("ProcessJustificationBitsAndFinality", time.Since(start))
// start = time.Now()

if s.Version() >= clparams.AltairVersion {
start = time.Now()
Expand Down
14 changes: 11 additions & 3 deletions cl/transition/impl/eth2/statechange/process_inactivity_scores.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package statechange

import (
"runtime"

"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/phase1/core/state"
)
Expand All @@ -27,12 +29,18 @@ func ProcessInactivityScores(s abstract.BeaconState, eligibleValidatorsIndicies
return nil
}

for _, validatorIndex := range eligibleValidatorsIndicies {
return ParallellForLoop(runtime.NumCPU(), 0, len(eligibleValidatorsIndicies), func(i int) error {
validatorIndex := eligibleValidatorsIndicies[i]

// retrieve validator inactivity score index.
score, err := s.ValidatorInactivityScore(int(validatorIndex))
if err != nil {
return err
}
if score == 0 && unslashedIndicies[s.BeaconConfig().TimelyTargetFlagIndex][validatorIndex] {
return nil
}

if unslashedIndicies[s.BeaconConfig().TimelyTargetFlagIndex][validatorIndex] {
score -= min(1, score)
} else {
Expand All @@ -44,6 +52,6 @@ func ProcessInactivityScores(s abstract.BeaconState, eligibleValidatorsIndicies
if err := s.SetValidatorInactivityScore(int(validatorIndex), score); err != nil {
return err
}
}
return nil
return nil
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package statechange

import (
"runtime"

"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/cltypes/solid"
Expand Down Expand Up @@ -47,13 +49,13 @@ func processRewardsAndPenaltiesPostAltair(s abstract.BeaconState, eligibleValida
rewardMultipliers[i] = weights[i] * (flagsTotalBalances[i] / beaconConfig.EffectiveBalanceIncrement)
}
rewardDenominator := (totalActiveBalance / beaconConfig.EffectiveBalanceIncrement) * beaconConfig.WeightDenominator
var baseReward uint64
inactivityLeaking := state.InactivityLeaking(s)
// Now process deltas and whats nots.
for _, index := range eligibleValidators {
baseReward, err = s.BaseReward(index)

return ParallellForLoop(runtime.NumCPU(), 0, len(eligibleValidators), func(i int) error {
index := eligibleValidators[i]
baseReward, err := s.BaseReward(index)
if err != nil {
return
return err
}
delta := int64(0)
for flagIdx := range weights {
Expand Down Expand Up @@ -84,8 +86,8 @@ func processRewardsAndPenaltiesPostAltair(s abstract.BeaconState, eligibleValida
} else if err := state.DecreaseBalance(s, index, uint64(-delta)); err != nil {
return err
}
}
return
return nil
})
}

// processRewardsAndPenaltiesPhase0 process rewards and penalties for phase0 state.
Expand Down
18 changes: 6 additions & 12 deletions cl/transition/impl/eth2/statechange/process_slashings.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package statechange

import (
"runtime"

"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/cltypes/solid"
"github.com/erigontech/erigon/cl/phase1/core/state"
)

Expand All @@ -37,10 +38,10 @@ func processSlashings(s abstract.BeaconState, slashingMultiplier uint64) error {
}
beaconConfig := s.BeaconConfig()
// Apply penalties to validators who have been slashed and reached the withdrawable epoch
var err error
s.ForEachValidator(func(validator solid.Validator, i, total int) bool {
return ParallellForLoop(runtime.NumCPU(), 0, s.ValidatorSet().Length(), func(i int) error {
validator := s.ValidatorSet().Get(i)
if !validator.Slashed() || epoch+beaconConfig.EpochsPerSlashingsVector/2 != validator.WithdrawableEpoch() {
return true
return nil
}
// Get the effective balance increment
increment := beaconConfig.EffectiveBalanceIncrement
Expand All @@ -49,15 +50,8 @@ func processSlashings(s abstract.BeaconState, slashingMultiplier uint64) error {
// Calculate the penalty by dividing the penalty numerator by the total balance and multiplying by the increment
penalty := penaltyNumerator / totalBalance * increment
// Decrease the validator's balance by the calculated penalty
if err = state.DecreaseBalance(s, uint64(i), penalty); err != nil {
return false
}
return true
return state.DecreaseBalance(s, uint64(i), penalty)
})
if err != nil {
return err
}
return nil
}

func ProcessSlashings(state abstract.BeaconState) error {
Expand Down
95 changes: 95 additions & 0 deletions cl/transition/impl/eth2/statechange/worker_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2024 The Erigon Authors
// This file is part of Erigon.
//
// Erigon is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Erigon is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with Erigon. If not, see <http://www.gnu.org/licenses/>.

package statechange

import (
"sync"
"sync/atomic"
"unsafe"
)

type WorkerPool struct {
work chan func() error
wg sync.WaitGroup
atomicErr unsafe.Pointer
}

// CreateWorkerPool initializes a pool of workers to process tasks.
func CreateWorkerPool(numWorkers int) *WorkerPool {
wp := WorkerPool{
work: make(chan func() error, 1000),
}
for i := 1; i <= numWorkers; i++ {
go wp.StartWorker()
}
return &wp
}

// close work channel and finish
func (wp *WorkerPool) WaitAndClose() {
// Wait for all workers to finish.
wp.wg.Wait()
// Close the task channel to indicate no more tasks will be sent.
close(wp.work)
}

// Worker is the worker that processes tasks.
func (wp *WorkerPool) StartWorker() {
for task := range wp.work {
if err := task(); err != nil {
atomic.StorePointer(&wp.atomicErr, unsafe.Pointer(&err))
}
wp.wg.Done()
}
}

func (wp *WorkerPool) Error() error {
errPointer := atomic.LoadPointer(&wp.atomicErr)
if errPointer == nil {
return nil
}
return *(*error)(errPointer)
}

// enqueue work
func (wp *WorkerPool) AddWork(f func() error) {
wp.wg.Add(1)
wp.work <- f
}

func ParallellForLoop(numWorkers int, from, to int, f func(int) error) error {
// divide the work into numWorkers parts
size := (to - from) / numWorkers
wp := CreateWorkerPool(numWorkers)
for i := 0; i < numWorkers; i++ {
start := from + i*size
end := start + size
if i == numWorkers-1 {
end = to
}
wp.AddWork(func() error {
for j := start; j < end; j++ {
if err := f(j); err != nil {
return err
}
}
return nil
})
}
wp.WaitAndClose()
return wp.Error()
}

0 comments on commit e6a0caa

Please sign in to comment.