Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(lib/babe): fix timing for transition between epochs #1636

Merged
merged 20 commits into from
Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ require (
github.com/libp2p/go-libp2p-core v0.7.0
github.com/libp2p/go-libp2p-discovery v0.5.0
github.com/libp2p/go-libp2p-kad-dht v0.11.1
github.com/libp2p/go-libp2p-kbucket v0.4.7
github.com/libp2p/go-libp2p-peerstore v0.2.6
github.com/libp2p/go-libp2p-secio v0.2.2
github.com/libp2p/go-sockaddr v0.1.0 // indirect
Expand Down
210 changes: 118 additions & 92 deletions lib/babe/babe.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ var logger log.Logger
type Service struct {
ctx context.Context
cancel context.CancelFunc
paused bool
authority bool
dev bool

Expand Down Expand Up @@ -123,7 +122,7 @@ func NewService(cfg *ServiceConfig) (*Service, error) {
rt: cfg.Runtime,
transactionState: cfg.TransactionState,
slotToProof: make(map[uint64]*VrfOutputAndProof),
blockChan: make(chan types.Block),
blockChan: make(chan types.Block, 16),
pause: make(chan struct{}),
authority: cfg.Authority,
dev: cfg.IsDev,
Expand All @@ -146,7 +145,7 @@ func NewService(cfg *ServiceConfig) (*Service, error) {
"epoch length (slots)", babeService.epochLength,
"authorities", Authorities(babeService.epochData.authorities),
"authority index", babeService.epochData.authorityIndex,
"threshold", babeService.epochData.threshold.ToLEBytes(),
"threshold", babeService.epochData.threshold,
"randomness", babeService.epochData.randomness,
)
return babeService, nil
Expand Down Expand Up @@ -226,39 +225,49 @@ func (b *Service) EpochLength() uint64 {

// Pause pauses the service ie. halts block production
func (b *Service) Pause() error {
if b.paused {
return errors.New("service already paused")
}

b.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock is not required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there might be a chance that Pause() is called when the b.pause channel is already close, thus causing a close of closed channel panic

defer b.Unlock()

b.pause <- struct{}{}
b.paused = true
if b.IsPaused() {
return nil
}

close(b.pause)
return nil
}

// Resume resumes the service ie. resumes block production
func (b *Service) Resume() error {
if !b.paused {
b.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acquire lock after checking b.IsPaused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function should be atomic in the case that Pause or Resume are called concurrently by multiple processes, process A might see it's not paused, acquire the lock, then do b.initiate() but in between seeing it's not paused an acquiring the lock another process might also see the same thing, thus causing b.initiate() to be called twice which would be bad

defer b.Unlock()

if !b.IsPaused() {
return nil
}

b.pause = make(chan struct{})

epoch, err := b.epochState.GetCurrentEpoch()
if err != nil {
logger.Error("failed to get current epoch", "error", err)
return err
}

b.Lock()
defer b.Unlock()

b.paused = false
go b.initiate(epoch)
logger.Info("service resumed", "epoch", epoch)
return nil
}

// IsPaused returns if the service is paused or not (ie. producing blocks)
func (b *Service) IsPaused() bool {
select {
case <-b.pause:
return true
default:
return false
}
}

// Stop stops the service. If stop is called, it cannot be resumed.
func (b *Service) Stop() error {
b.Lock()
Expand Down Expand Up @@ -301,13 +310,6 @@ func (b *Service) IsStopped() bool {
return b.ctx.Err() != nil
}

// IsPaused returns if the service is paused or not (ie. producing blocks)
func (b *Service) IsPaused() bool {
b.RLock()
defer b.RUnlock()
return b.paused
}

func (b *Service) safeSend(msg types.Block) error {
b.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to take lock here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return errors.New("service has been stopped")

S -> s
Error starts with smallcase letters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for your first comment, the lock is here so there isn't a send on a closed channel. maybe there is a better way to implement that? either way, I think I will be removing this section anyways and doing a refactor

defer b.Unlock()
Expand Down Expand Up @@ -351,93 +353,115 @@ func (b *Service) initiate(epoch uint64) {
return
}

b.invokeBlockAuthoring(epoch)
err := b.invokeBlockAuthoring(epoch)
if err != nil {
logger.Crit("block authoring error", "error", err)
}
}

func (b *Service) invokeBlockAuthoring(epoch uint64) {
// calculate current slot
startSlot := getCurrentSlot(b.slotDuration)
func (b *Service) invokeBlockAuthoring(epoch uint64) error {
for {
// get start slot for current epoch
epochStart, err := b.epochState.GetStartSlotForEpoch(epoch)
if err != nil {
logger.Error("failed to get start slot for current epoch", "epoch", epoch, "error", err)
return err
}

head, err := b.blockState.BestBlockHeader()
if err != nil {
logger.Error("failed to get best block header", "error", err)
return
}
head, err := b.blockState.BestBlockHeader()
if err != nil {
logger.Error("failed to get best block header", "error", err)
return err
}

// if we're at genesis, set the first slot number for the network
if head.Number.Cmp(big.NewInt(0)) == 0 {
err = b.epochState.SetFirstSlot(startSlot)
// if we're at genesis, set the first slot number for the network
if head.Number.Cmp(big.NewInt(0)) == 0 {
epochStart = getCurrentSlot(b.slotDuration)
err = b.epochState.SetFirstSlot(epochStart)
if err != nil {
logger.Error("failed to set first slot number", "error", err)
return err
}
}

logger.Info("initiating epoch", "number", epoch, "first slot of epoch", epochStart)
err = b.initiateEpoch(epoch)
if err != nil {
logger.Error("failed to set first slot number", "error", err)
return
logger.Error("failed to initiate epoch", "epoch", epoch, "error", err)
return err
}
}

logger.Info("initiating epoch", "number", epoch, "start slot", startSlot+b.epochLength)
err = b.initiateEpoch(epoch)
if err != nil {
logger.Error("failed to initiate epoch", "epoch", epoch, "error", err)
return
}
epochStartTime := getSlotStartTime(epochStart, b.slotDuration)
logger.Debug("checking if epoch started", "epoch start", epochStartTime, "now", time.Now())

// check if it's time to start the epoch yet. if not, wait until it is
if time.Since(epochStartTime) < 0 {
logger.Debug("waiting for epoch to start")
select {
case <-time.After(time.Until(epochStartTime)):
case <-b.ctx.Done():
return nil
case <-b.pause:
return nil
}
}

// get start slot for current epoch
epochStart, err := b.epochState.GetStartSlotForEpoch(0)
if err != nil {
logger.Error("failed to get start slot for current epoch", "epoch", epoch, "error", err)
return
}
// calculate current slot
startSlot := getCurrentSlot(b.slotDuration)
intoEpoch := startSlot - epochStart

intoEpoch := startSlot - epochStart
logger.Info("current epoch", "epoch", epoch, "slots into epoch", intoEpoch)
// if the calculated amount of slots "into the epoch" is greater than the epoch length,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to move this into another function? This seems like something that happens infrequently and can be tested independently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would happen if the node is down for over an epoch or if it's starting up for the first time on a network that's past epoch 0, which part would you put in a separate function? just this if check?

// we've been offline for more than an epoch, and need to sync. pause BABE for now, syncer will
// resume it when ready
if b.epochLength <= intoEpoch && !b.dev {
logger.Debug("pausing BABE, need to sync", "slots into epoch", intoEpoch, "startSlot", startSlot, "epochStart", epochStart)
return b.Pause()
}

// if the calculated amount of slots "into the epoch" is greater than the epoch length,
// we've been offline for more than an epoch, and need to sync. pause BABE for now, syncer will
// resume it when ready
if b.epochLength <= intoEpoch && !b.dev {
b.paused = true
return
}
if b.dev {
intoEpoch = intoEpoch % b.epochLength
}

if b.dev {
intoEpoch = intoEpoch % b.epochLength
}
logger.Info("current epoch", "epoch", epoch, "slots into epoch", intoEpoch)

slotDone := make([]<-chan time.Time, b.epochLength-intoEpoch)
for i := 0; i < int(b.epochLength-intoEpoch); i++ {
slotDone[i] = time.After(b.getSlotDuration() * time.Duration(i))
}
slotDone := make([]<-chan time.Time, b.epochLength-intoEpoch)
for i := 0; i < int(b.epochLength-intoEpoch); i++ {
slotDone[i] = time.After(b.getSlotDuration() * time.Duration(i))
}

for i := 0; i < int(b.epochLength-intoEpoch); i++ {
select {
case <-b.ctx.Done():
return
case <-b.pause:
return
case <-slotDone[i]:
if !b.authority {
continue
for i := 0; i < int(b.epochLength-intoEpoch); i++ {
select {
case <-b.ctx.Done():
return nil
case <-b.pause:
return nil
case <-slotDone[i]:
if !b.authority {
continue
}

slotNum := startSlot + uint64(i)
err = b.handleSlot(slotNum)
if err == ErrNotAuthorized {
logger.Debug("not authorized to produce a block in this slot", "slot", slotNum, "slots into epoch", i)
continue
} else if err != nil {
logger.Warn("failed to handle slot", "slot", slotNum, "error", err)
continue
}
}
}

slotNum := startSlot + uint64(i)
err = b.handleSlot(slotNum)
if err == ErrNotAuthorized {
logger.Debug("not authorized to produce a block in this slot", "slot", slotNum)
continue
} else if err != nil {
logger.Warn("failed to handle slot", "slot", slotNum, "error", err)
continue
}
// setup next epoch, re-invoke block authoring
next, err := b.incrementEpoch()
if err != nil {
logger.Error("failed to increment epoch", "error", err)
return err
}
}

// setup next epoch, re-invoke block authoring
next, err := b.incrementEpoch()
if err != nil {
logger.Error("failed to increment epoch", "error", err)
return
logger.Info("epoch complete!", "completed epoch", epoch, "upcoming epoch", next)
epoch = next
}

b.invokeBlockAuthoring(next)
}

func (b *Service) handleSlot(slotNum uint64) error {
Expand Down Expand Up @@ -466,8 +490,6 @@ func (b *Service) handleSlot(slotNum uint64) error {
number: slotNum,
}

logger.Debug("going to build block", "parent", parent)

// set runtime trie before building block
// if block building is successful, store the resulting trie in the storage state
ts, err := b.storageState.TrieState(&parent.StateRoot)
Expand Down Expand Up @@ -509,3 +531,7 @@ func (b *Service) handleSlot(slotNum uint64) error {
func getCurrentSlot(slotDuration time.Duration) uint64 {
return uint64(time.Now().UnixNano()) / uint64(slotDuration.Nanoseconds())
}

func getSlotStartTime(slot uint64, slotDuration time.Duration) time.Time {
return time.Unix(0, int64(slot)*slotDuration.Nanoseconds())
}
31 changes: 31 additions & 0 deletions lib/babe/babe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func createTestService(t *testing.T, cfg *ServiceConfig) *Service {
cfg.Runtime = rt
}

cfg.LogLvl = defaultTestLogLvl
babeService, err := NewService(cfg)
require.NoError(t, err)
return babeService
Expand Down Expand Up @@ -252,3 +253,33 @@ func TestStartAndStop(t *testing.T) {
err = bs.Stop()
require.NoError(t, err)
}

func TestService_PauseAndResume(t *testing.T) {
bs := createTestService(t, &ServiceConfig{
LogLvl: log.LvlCrit,
})
err := bs.Start()
require.NoError(t, err)
time.Sleep(time.Second)

go func() {
_ = bs.Pause()
}()

go func() {
_ = bs.Pause()
}()

go func() {
err := bs.Resume() //nolint
require.NoError(t, err)
}()

go func() {
err := bs.Resume() //nolint
require.NoError(t, err)
}()

err = bs.Stop()
require.NoError(t, err)
}
2 changes: 1 addition & 1 deletion lib/babe/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (b *BlockBuilder) addToQueue(txs []*transaction.ValidTransaction) {
}

func hasSlotEnded(slot Slot) bool {
slotEnd := slot.start.Add(slot.duration)
slotEnd := slot.start.Add(slot.duration * 2 / 3) // reserve last 1/3 of slot for block finalisation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of 1/3, Can we allocate a fixed time for block finalization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean exactly? 1/3 of the slot is a fixed slot duration, this is what substrate does afaik

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant 500ms or so. Instead of 1/3 of slot.duration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no that would not be ideal, substrate uses 1/3 of the slot duration, you can see their implementation on the issue

return time.Since(slotEnd) >= 0
}

Expand Down
7 changes: 6 additions & 1 deletion lib/babe/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,15 @@ func (b *Service) initiateEpoch(epoch uint64) error {
delete(b.slotToProof, i-b.epochLength) // clear data from previous epoch
}

b.slotToProof[i], err = b.runLottery(i, epoch)
proof, err := b.runLottery(i, epoch)
if err != nil {
return fmt.Errorf("error running slot lottery at slot %d: error %s", i, err)
}

if proof != nil {
b.slotToProof[i] = proof
logger.Trace("claimed slot!", "slot", startSlot, "slots into epoch", i-startSlot)
}
}

return nil
Expand Down
Loading