Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve publish/republish logic #3592

Merged
merged 5 commits into from
Sep 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 61 additions & 22 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var rbfDenomBig = types.NewInt(RbfDenom)

const RbfDenom = 256

var RepublishInterval = pubsub.TimeCacheDuration + time.Duration(5*build.BlockDelaySecs+build.PropagationDelaySecs)*time.Second
var RepublishInterval = time.Duration(10*build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second

var minimumBaseFee = types.NewInt(uint64(build.MinimumBaseFee))
var baseFeeLowerBoundFactor = types.NewInt(10)
Expand Down Expand Up @@ -81,6 +81,14 @@ const (
localUpdates = "update"
)

func init() {
// if the republish interval is too short compared to the pubsub timecache, adjust it
minInterval := pubsub.TimeCacheDuration + time.Duration(build.PropagationDelaySecs)
if RepublishInterval < minInterval {
RepublishInterval = minInterval
}
}

type MessagePool struct {
lk sync.Mutex

Expand Down Expand Up @@ -355,31 +363,48 @@ func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error {
return nil
}

func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.TipSet, local bool) error {
// verifyMsgBeforeAdd verifies that the message meets the minimum criteria for block inclusio
// and whether the message has enough funds to be included in the next 20 blocks.
// If the message is not valid for block inclusion, it returns an error.
// For local messages, if the message can be included in the next 20 blocks, it returns true to
// signal that it should be immediately published. If the message cannot be included in the next 20
// blocks, it returns false so that the message doesn't immediately get published (and ignored by our
// peers); instead it will be published through the republish loop, once the base fee has fallen
// sufficiently.
// For non local messages, if the message cannot be included in the next 20 blocks it returns
// a (soft) validation error.
func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.TipSet, local bool) (bool, error) {
vyzo marked this conversation as resolved.
Show resolved Hide resolved
epoch := curTs.Height()
minGas := vm.PricelistByEpoch(epoch).OnChainMessage(m.ChainLength())

if err := m.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil {
return xerrors.Errorf("message will not be included in a block: %w", err)
return false, xerrors.Errorf("message will not be included in a block: %w", err)
}

// this checks if the GasFeeCap is suffisciently high for inclusion in the next 20 blocks
// if the GasFeeCap is too low, we soft reject the message (Ignore in pubsub) and rely
// on republish to push it through later, if the baseFee has fallen.
// this is a defensive check that stops minimum baseFee spam attacks from overloading validation
// queues.
// Note that we don't do that for local messages, so that they can be accepted and republished
// automatically
if !local && len(curTs.Blocks()) > 0 {
// Note that for local messages, we always add them so that they can be accepted and republished
// automatically.
publish := local
if len(curTs.Blocks()) > 0 {
baseFee := curTs.Blocks()[0].ParentBaseFee
baseFeeLowerBound := types.BigDiv(baseFee, baseFeeLowerBoundFactor)
if m.Message.GasFeeCap.LessThan(baseFeeLowerBound) {
return xerrors.Errorf("GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: %s, baseFeeLowerBound: %s): %w",
m.Message.GasFeeCap, baseFeeLowerBound, ErrSoftValidationFailure)
if local {
log.Warnf("local message will not be immediately published because GasFeeCap doesn't meet the lower bound for inclusion in the next 20 blocks (GasFeeCap: %s, baseFeeLowerBound: %s)",
m.Message.GasFeeCap, baseFeeLowerBound)
publish = false
} else {
return false, xerrors.Errorf("GasFeeCap doesn't meet base fee lower bound for inclusion in the next 20 blocks (GasFeeCap: %s, baseFeeLowerBound: %s): %w",
m.Message.GasFeeCap, baseFeeLowerBound, ErrSoftValidationFailure)
}
}
}

return nil
return publish, nil
}

func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
Expand All @@ -400,7 +425,8 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
}

mp.curTsLk.Lock()
if err := mp.addTs(m, mp.curTs, true); err != nil {
publish, err := mp.addTs(m, mp.curTs, true)
if err != nil {
mp.curTsLk.Unlock()
return cid.Undef, err
}
Expand All @@ -413,7 +439,11 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
}
mp.lk.Unlock()

return m.Cid(), mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
if publish {
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
}

return m.Cid(), err
}

func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
Expand Down Expand Up @@ -461,7 +491,9 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {

mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
return mp.addTs(m, mp.curTs, false)

_, err = mp.addTs(m, mp.curTs, false)
return err
}

func sigCacheKey(m *types.SignedMessage) (string, error) {
Expand Down Expand Up @@ -528,28 +560,29 @@ func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet)
return nil
}

func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local bool) error {
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local bool) (bool, error) {
snonce, err := mp.getStateNonce(m.Message.From, curTs)
if err != nil {
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
}

if snonce > m.Message.Nonce {
return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
return false, xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
}

mp.lk.Lock()
defer mp.lk.Unlock()

if err := mp.verifyMsgBeforeAdd(m, curTs, local); err != nil {
return err
publish, err := mp.verifyMsgBeforeAdd(m, curTs, local)
if err != nil {
return false, err
}

if err := mp.checkBalance(m, curTs); err != nil {
return err
return false, err
}

return mp.addLocked(m, true)
return publish, mp.addLocked(m, true)
}

func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
Expand All @@ -575,7 +608,8 @@ func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
mp.lk.Lock()
defer mp.lk.Unlock()

if err := mp.verifyMsgBeforeAdd(m, curTs, true); err != nil {
_, err = mp.verifyMsgBeforeAdd(m, curTs, true)
if err != nil {
return err
}

Expand Down Expand Up @@ -761,7 +795,8 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address,
return nil, ErrTryAgain
}

if err := mp.verifyMsgBeforeAdd(msg, curTs, true); err != nil {
publish, err := mp.verifyMsgBeforeAdd(msg, curTs, true)
if err != nil {
return nil, err
}

Expand All @@ -776,7 +811,11 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address,
log.Errorf("addLocal failed: %+v", err)
}

return msg, mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
if publish {
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
}

return msg, err
}

func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) {
Expand Down
24 changes: 20 additions & 4 deletions chain/messagepool/repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

const repubMsgLimit = 30

var RepublishBatchDelay = 200 * time.Millisecond
var RepublishBatchDelay = 100 * time.Millisecond

func (mp *MessagePool) republishPendingMessages() error {
mp.curTsLk.Lock()
Expand All @@ -27,6 +27,7 @@ func (mp *MessagePool) republishPendingMessages() error {
mp.curTsLk.Unlock()
return xerrors.Errorf("computing basefee: %w", err)
}
baseFeeLowerBound := types.BigDiv(baseFee, baseFeeLowerBoundFactor)

pending := make(map[address.Address]map[uint64]*types.SignedMessage)
mp.lk.Lock()
Expand Down Expand Up @@ -55,7 +56,11 @@ func (mp *MessagePool) republishPendingMessages() error {

var chains []*msgChain
for actor, mset := range pending {
next := mp.createMessageChains(actor, mset, baseFee, ts)
// We use the baseFee lower bound for createChange so that we optimistically include
// chains that might become profitable in the next 20 blocks.
// We still check the lowerBound condition for individual messages so that we don't send
// messages that will be rejected by the mpool spam protector, so this is safe to do.
next := mp.createMessageChains(actor, mset, baseFeeLowerBound, ts)
chains = append(chains, next...)
}

Expand All @@ -70,6 +75,7 @@ func (mp *MessagePool) republishPendingMessages() error {
gasLimit := int64(build.BlockGasLimit)
minGas := int64(gasguess.MinGas)
var msgs []*types.SignedMessage
loop:
for i := 0; i < len(chains); {
chain := chains[i]

Expand All @@ -91,8 +97,18 @@ func (mp *MessagePool) republishPendingMessages() error {

// does it fit in a block?
if chain.gasLimit <= gasLimit {
gasLimit -= chain.gasLimit
msgs = append(msgs, chain.msgs...)
// check the baseFee lower bound -- only republish messages that can be included in the chain
// within the next 20 blocks.
for _, m := range chain.msgs {
if m.Message.GasFeeCap.LessThan(baseFeeLowerBound) {
vyzo marked this conversation as resolved.
Show resolved Hide resolved
chain.Invalidate()
continue loop
}
gasLimit -= m.Message.GasLimit
msgs = append(msgs, m)
}

// we processed the whole chain, advance
i++
continue
}
Expand Down