Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(twap): prune records over multiple blocks (backport #7427) #7441

Merged
merged 1 commit into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### State Breaking

<<<<<<< HEAD
=======
* [#7181](https://github.com/osmosis-labs/osmosis/pull/7181) Improve errors for out of gas
* [#7357](https://github.com/osmosis-labs/osmosis/pull/7357) Fix: Ensure rate limits are not applied to packets that aren't ics20s

### Bug Fixes

* [#7360](https://github.com/osmosis-labs/osmosis/pull/7360) fix: use gov type for SetScalingFactorController

### Misc Improvements

* [#7360](https://github.com/osmosis-labs/osmosis/pull/7360) Bump cometbft-db from 0.8.0 to 0.10.0
* [#7385](https://github.com/osmosis-labs/osmosis/pull/7385) Add missing protobuf interface
* [#7427](https://github.com/osmosis-labs/osmosis/pull/7427) Prune TWAP records over multiple blocks, instead of all at once at epoch

>>>>>>> 1c9e4918 (feat(twap): prune records over multiple blocks (#7427))
## v23.0.0

* [#7409](https://github.com/osmosis-labs/osmosis/pull/7409) Scaling factor for pool uptime accumulator to avoid truncation
Expand Down
20 changes: 20 additions & 0 deletions proto/osmosis/twap/v1beta1/twap_record.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,23 @@ message TwapRecord {
(gogoproto.moretags) = "yaml:\"last_error_time\""
];
}

// PruningState allows us to spread out the pruning of TWAP records over time,
// instead of pruning all at once at the end of the epoch.
message PruningState {
// is_pruning is true if the pruning process is ongoing.
// This tells the module to continue pruning the TWAP records
// at the EndBlock.
bool is_pruning = 1;
// last_kept_time is the time of the last kept TWAP record.
// This is used to determine all TWAP records that are older than
// last_kept_time and should be pruned.
google.protobuf.Timestamp last_kept_time = 2 [
(gogoproto.nullable) = false,
(gogoproto.stdtime) = true,
(gogoproto.moretags) = "yaml:\"last_kept_time\""
];
// last_key_seen is the last key of the TWAP records that were pruned
// before reaching the block's prune limit
bytes last_key_seen = 3;
}
8 changes: 2 additions & 6 deletions x/twap/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,8 @@ func (k Keeper) UpdateRecords(ctx sdk.Context, poolId uint64) error {
return k.updateRecords(ctx, poolId)
}

func (k Keeper) PruneRecordsBeforeTimeButNewest(ctx sdk.Context, lastKeptTime time.Time) error {
return k.pruneRecordsBeforeTimeButNewest(ctx, lastKeptTime)
}

func (k Keeper) PruneRecords(ctx sdk.Context) error {
return k.pruneRecords(ctx)
func (k Keeper) PruneRecordsBeforeTimeButNewest(ctx sdk.Context, state types.PruningState) error {
return k.pruneRecordsBeforeTimeButNewest(ctx, state)
}

func (k Keeper) GetInterpolatedRecord(ctx sdk.Context, poolId uint64, asset0Denom string, asset1Denom string, t time.Time) (types.TwapRecord, error) {
Expand Down
29 changes: 29 additions & 0 deletions x/twap/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"

paramtypes "github.com/cosmos/cosmos-sdk/x/params/types"

Expand Down Expand Up @@ -100,3 +101,31 @@ func (k Keeper) GetGeometricStrategy() *geometric {
func (k Keeper) GetArithmeticStrategy() *arithmetic {
return &arithmetic{k}
}

// GetPruningState gets the current pruning state, which is used to determine
// whether to prune historical records in the EndBlock. This allows us to spread
// out the computational cost of pruning over time rather than all at once at epoch.
func (k Keeper) GetPruningState(ctx sdk.Context) types.PruningState {
store := ctx.KVStore(k.storeKey)
state := types.PruningState{}

bz := store.Get(types.PruningStateKey)
if bz == nil {
return state
}
err := proto.Unmarshal(bz, &state)
if err != nil {
panic(err)
}
return state
}

func (k Keeper) SetPruningState(ctx sdk.Context, state types.PruningState) {
store := ctx.KVStore(k.storeKey)

bz, err := proto.Marshal(&state)
if err != nil {
panic(err)
}
store.Set(types.PruningStateKey, bz)
}
10 changes: 7 additions & 3 deletions x/twap/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/osmosis-labs/osmosis/osmomath"
concentratedliquiditytypes "github.com/osmosis-labs/osmosis/v23/x/concentrated-liquidity/types"
gammtypes "github.com/osmosis-labs/osmosis/v23/x/gamm/types"
"github.com/osmosis-labs/osmosis/v23/x/twap/types"
epochtypes "github.com/osmosis-labs/osmosis/x/epochs/types"
)

Expand All @@ -24,9 +25,12 @@ func (k Keeper) EpochHooks() epochtypes.EpochHooks {

func (hook *epochhook) AfterEpochEnd(ctx sdk.Context, epochIdentifier string, epochNumber int64) error {
if epochIdentifier == hook.k.PruneEpochIdentifier(ctx) {
if err := hook.k.pruneRecords(ctx); err != nil {
ctx.Logger().Error("Error pruning old twaps at the epoch end", err)
}
lastKeptTime := ctx.BlockTime().Add(-hook.k.RecordHistoryKeepPeriod(ctx))
hook.k.SetPruningState(ctx, types.PruningState{
IsPruning: true,
LastKeptTime: lastKeptTime,
LastKeySeen: types.FormatHistoricalTimeIndexTWAPKey(lastKeptTime, 0, "", ""),
})
}
return nil
}
Expand Down
18 changes: 9 additions & 9 deletions x/twap/listeners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,20 +275,20 @@ func (s *TestSuite) TestAfterEpochEnd() {
err = s.App.TwapKeeper.EpochHooks().AfterEpochEnd(s.Ctx, allEpochs[i].Identifier, int64(1))
s.Require().NoError(err)

recordsAfterEpoch, err := s.twapkeeper.GetAllHistoricalTimeIndexedTWAPs(s.Ctx)
lastKeptTime := s.Ctx.BlockTime().Add(-s.twapkeeper.RecordHistoryKeepPeriod(s.Ctx))
pruneState := s.twapkeeper.GetPruningState(s.Ctx)

// old record should have been pruned here
// however, the newest younger than the prune threshold
// is kept.
// state entry should be set for pruning state
if allEpochs[i].Identifier == pruneEpochIdentifier {
s.Require().Equal(1, len(recordsAfterEpoch))
s.Require().Equal(newestRecord, recordsAfterEpoch[0])
s.Require().Equal(true, pruneState.IsPruning)
s.Require().Equal(lastKeptTime, pruneState.LastKeptTime)

// quit test once the record has been pruned
return
// reset pruning state to make sure other epochs do not modify it
s.twapkeeper.SetPruningState(s.Ctx, types.PruningState{})
} else { // pruning should not be triggered at first, not pruning epoch
s.Require().NoError(err)
s.Require().Equal(twapsBeforeEpoch, recordsAfterEpoch)
s.Require().Equal(false, pruneState.IsPruning)
s.Require().Equal(time.Time{}, pruneState.LastKeptTime)
}
}
}
Expand Down
20 changes: 8 additions & 12 deletions x/twap/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ func (k Keeper) EndBlock(ctx sdk.Context) {
" Skipping record update. Underlying err: %w", id, err).Error())
}
}

state := k.GetPruningState(ctx)
if state.IsPruning {
err := k.pruneRecordsBeforeTimeButNewest(ctx, state)
if err != nil {
ctx.Logger().Error("Error pruning old twaps at the end block", err)
}
}
}

// updateRecords updates all records for a given pool id.
Expand Down Expand Up @@ -195,18 +203,6 @@ func (k Keeper) updateRecord(ctx sdk.Context, record types.TwapRecord) (types.Tw
return newRecord, nil
}

// pruneRecords prunes twap records that happened earlier than recordHistoryKeepPeriod
// before current block time while preserving the most recent record before the threshold.
// Such record is preserved for each pool.
// See TWAP keeper's `pruneRecordsBeforeTimeButNewest(...)` for more details about the reasons for
// keeping this record.
func (k Keeper) pruneRecords(ctx sdk.Context) error {
recordHistoryKeepPeriod := k.RecordHistoryKeepPeriod(ctx)

lastKeptTime := ctx.BlockTime().Add(-recordHistoryKeepPeriod)
return k.pruneRecordsBeforeTimeButNewest(ctx, lastKeptTime)
}

// recordWithUpdatedAccumulators returns a record, with updated accumulator values and time for provided newTime,
// otherwise referred to as "interpolating the record" to the target time.
// This does not mutate the passed in record.
Expand Down
56 changes: 0 additions & 56 deletions x/twap/logic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,62 +572,6 @@ type computeThreeAssetArithmeticTwapTestCase struct {
expErr bool
}

// TestPruneRecords tests that twap records earlier than
// current block time - RecordHistoryKeepPeriod are pruned from the store
// while keeping the newest record before the above time threshold.
// Such record is kept for each pool.
func (s *TestSuite) TestPruneRecords() {
recordHistoryKeepPeriod := s.twapkeeper.RecordHistoryKeepPeriod(s.Ctx)

pool1OlderMin2MsRecord, // deleted
pool2OlderMin1MsRecordAB, pool2OlderMin1MsRecordAC, pool2OlderMin1MsRecordBC, // deleted
pool3OlderBaseRecord, // kept as newest under keep period
pool4OlderPlus1Record := // kept as newest under keep period
s.createTestRecordsFromTime(baseTime.Add(2 * -recordHistoryKeepPeriod))

pool1Min2MsRecord, // kept as newest under keep period
pool2Min1MsRecordAB, pool2Min1MsRecordAC, pool2Min1MsRecordBC, // kept as newest under keep period
pool3BaseRecord, // kept as it is at the keep period boundary
pool4Plus1Record := // kept as it is above the keep period boundary
s.createTestRecordsFromTime(baseTime.Add(-recordHistoryKeepPeriod))

// non-ascending insertion order.
recordsToPreSet := []types.TwapRecord{
pool2OlderMin1MsRecordAB, pool2OlderMin1MsRecordAC, pool2OlderMin1MsRecordBC,
pool4Plus1Record,
pool4OlderPlus1Record,
pool3OlderBaseRecord,
pool2Min1MsRecordAB, pool2Min1MsRecordAC, pool2Min1MsRecordBC,
pool3BaseRecord,
pool1Min2MsRecord,
pool1OlderMin2MsRecord,
}

// tMin2Record is before the threshold and is pruned away.
// tmin1Record is the newest record before current block time - record history keep period.
// All other records happen after the threshold and are kept.
expectedKeptRecords := []types.TwapRecord{
pool3OlderBaseRecord,
pool4OlderPlus1Record,
pool1Min2MsRecord,
pool2Min1MsRecordAB, pool2Min1MsRecordAC, pool2Min1MsRecordBC,
pool3BaseRecord,
pool4Plus1Record,
}
s.SetupTest()
s.preSetRecords(recordsToPreSet)

ctx := s.Ctx
twapKeeper := s.twapkeeper

ctx = ctx.WithBlockTime(baseTime)

err := twapKeeper.PruneRecords(ctx)
s.Require().NoError(err)

s.validateExpectedRecords(expectedKeptRecords)
}

// TestUpdateRecords tests that the records are updated correctly.
// It tests the following:
// - two-asset pools
Expand Down
38 changes: 36 additions & 2 deletions x/twap/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"github.com/osmosis-labs/osmosis/v23/x/twap/types"
)

// NumRecordsToPrunePerBlock is the number of records to prune per block.
// Two records are deleted per incentive record:
// 1. by time index
// 2. by pool index
// Therefore, setting this to 1000 means 500 complete incentive records are deleted per block.
var NumRecordsToPrunePerBlock uint16 = 1000

type timeTooOldError struct {
Time time.Time
}
Expand Down Expand Up @@ -73,15 +80,22 @@ func (k Keeper) StoreHistoricalTWAP(ctx sdk.Context, twap types.TwapRecord) {
// So, in order to have correct behavior for the desired guarantee,
// we keep the newest record that is older than the pruning time.
// This is why we would keep the -50 hour and -1hour twaps despite a 48hr pruning period
func (k Keeper) pruneRecordsBeforeTimeButNewest(ctx sdk.Context, lastKeptTime time.Time) error {
//
// If we reach the per block pruning limit, we store the last key seen in the pruning state.
// This is so that we can continue pruning from where we left off in the next block.
// If we have pruned all records, we set the pruning state to not pruning.
// There is a small bug here where we store more seenPoolAssetTriplets than we need to.
// Issue added here: https://github.com/osmosis-labs/osmosis/issues/7435
// The bloat is minimal though, and is not at risk of getting out of hand.
func (k Keeper) pruneRecordsBeforeTimeButNewest(ctx sdk.Context, state types.PruningState) error {
store := ctx.KVStore(k.storeKey)

// Reverse iterator guarantees that we iterate through the newest per pool first.
// Due to how it is indexed, we will only iterate times starting from
// lastKeptTime exclusively down to the oldest record.
iter := store.ReverseIterator(
[]byte(types.HistoricalTWAPTimeIndexPrefix),
types.FormatHistoricalTimeIndexTWAPKey(lastKeptTime, 0, "", ""))
state.LastKeySeen)
defer iter.Close()

// We mark what (pool id, asset 0, asset 1) triplets we've seen.
Expand All @@ -93,6 +107,8 @@ func (k Keeper) pruneRecordsBeforeTimeButNewest(ctx sdk.Context, lastKeptTime ti
}
seenPoolAssetTriplets := map[uniqueTriplet]struct{}{}

var numPruned uint16

for ; iter.Valid(); iter.Next() {
timeIndexKey := iter.Key()
timeS, poolId, asset0, asset1, err := types.ParseFieldsFromHistoricalTimeKey(timeIndexKey)
Expand All @@ -117,6 +133,24 @@ func (k Keeper) pruneRecordsBeforeTimeButNewest(ctx sdk.Context, lastKeptTime ti
store.Delete(timeIndexKey)
poolIndexKey := types.FormatHistoricalPoolIndexTWAPKeyFromStrTime(poolId, asset0, asset1, timeS)
store.Delete(poolIndexKey)

// Increment the number of records pruned by 2, since we delete two records per iteration.
numPruned += 2

if numPruned >= NumRecordsToPrunePerBlock {
// We have hit the limit, so we stop pruning.
break
}
}

if !iter.Valid() {
// The iterator is exhausted, so we have pruned all records.
state.IsPruning = false
k.SetPruningState(ctx, state)
} else {
// We have not pruned all records, so we update the last key seen.
state.LastKeySeen = iter.Key()
k.SetPruningState(ctx, state)
}
return nil
}
Expand Down
Loading