Skip to content

Commit

Permalink
feat(twap): prune records over multiple blocks (#7427)
Browse files Browse the repository at this point in the history
* twap prune over multiple blocks

* changelog entry

* fix after epoch end test

* track where we left off in iteration

* add comment to explain better

* Generated protofile changes

* inc by 2

* inc by 2

* add comments

* add twap pruning test over multiple blocks

---------

Co-authored-by: github-actions <[email protected]>
  • Loading branch information
czarcas7ic and github-actions authored Feb 9, 2024
1 parent 6d4e422 commit 1c9e491
Show file tree
Hide file tree
Showing 12 changed files with 612 additions and 125 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* [#7360](https://github.com/osmosis-labs/osmosis/pull/7360) Bump cometbft-db from 0.8.0 to 0.10.0
* [#7385](https://github.com/osmosis-labs/osmosis/pull/7385) Add missing protobuf interface
* [#7427](https://github.com/osmosis-labs/osmosis/pull/7427) Prune TWAP records over multiple blocks, instead of all at once at epoch

## v23.0.0

Expand Down
20 changes: 20 additions & 0 deletions proto/osmosis/twap/v1beta1/twap_record.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,23 @@ message TwapRecord {
(gogoproto.moretags) = "yaml:\"last_error_time\""
];
}

// PruningState allows us to spread out the pruning of TWAP records over time,
// instead of pruning all at once at the end of the epoch.
message PruningState {
// is_pruning is true if the pruning process is ongoing.
// This tells the module to continue pruning the TWAP records
// at the EndBlock.
bool is_pruning = 1;
// last_kept_time is the time of the last kept TWAP record.
// This is used to determine all TWAP records that are older than
// last_kept_time and should be pruned.
google.protobuf.Timestamp last_kept_time = 2 [
(gogoproto.nullable) = false,
(gogoproto.stdtime) = true,
(gogoproto.moretags) = "yaml:\"last_kept_time\""
];
// last_key_seen is the last key of the TWAP records that were pruned
// before reaching the block's prune limit
bytes last_key_seen = 3;
}
8 changes: 2 additions & 6 deletions x/twap/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,8 @@ func (k Keeper) UpdateRecords(ctx sdk.Context, poolId uint64) error {
return k.updateRecords(ctx, poolId)
}

func (k Keeper) PruneRecordsBeforeTimeButNewest(ctx sdk.Context, lastKeptTime time.Time) error {
return k.pruneRecordsBeforeTimeButNewest(ctx, lastKeptTime)
}

func (k Keeper) PruneRecords(ctx sdk.Context) error {
return k.pruneRecords(ctx)
func (k Keeper) PruneRecordsBeforeTimeButNewest(ctx sdk.Context, state types.PruningState) error {
return k.pruneRecordsBeforeTimeButNewest(ctx, state)
}

func (k Keeper) GetInterpolatedRecord(ctx sdk.Context, poolId uint64, asset0Denom string, asset1Denom string, t time.Time) (types.TwapRecord, error) {
Expand Down
29 changes: 29 additions & 0 deletions x/twap/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"

paramtypes "github.com/cosmos/cosmos-sdk/x/params/types"

Expand Down Expand Up @@ -100,3 +101,31 @@ func (k Keeper) GetGeometricStrategy() *geometric {
func (k Keeper) GetArithmeticStrategy() *arithmetic {
return &arithmetic{k}
}

// GetPruningState gets the current pruning state, which is used to determine
// whether to prune historical records in the EndBlock. This allows us to spread
// out the computational cost of pruning over time rather than all at once at epoch.
func (k Keeper) GetPruningState(ctx sdk.Context) types.PruningState {
store := ctx.KVStore(k.storeKey)
state := types.PruningState{}

bz := store.Get(types.PruningStateKey)
if bz == nil {
return state
}
err := proto.Unmarshal(bz, &state)
if err != nil {
panic(err)
}
return state
}

func (k Keeper) SetPruningState(ctx sdk.Context, state types.PruningState) {
store := ctx.KVStore(k.storeKey)

bz, err := proto.Marshal(&state)
if err != nil {
panic(err)
}
store.Set(types.PruningStateKey, bz)
}
10 changes: 7 additions & 3 deletions x/twap/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/osmosis-labs/osmosis/osmomath"
concentratedliquiditytypes "github.com/osmosis-labs/osmosis/v23/x/concentrated-liquidity/types"
gammtypes "github.com/osmosis-labs/osmosis/v23/x/gamm/types"
"github.com/osmosis-labs/osmosis/v23/x/twap/types"
epochtypes "github.com/osmosis-labs/osmosis/x/epochs/types"
)

Expand All @@ -24,9 +25,12 @@ func (k Keeper) EpochHooks() epochtypes.EpochHooks {

func (hook *epochhook) AfterEpochEnd(ctx sdk.Context, epochIdentifier string, epochNumber int64) error {
if epochIdentifier == hook.k.PruneEpochIdentifier(ctx) {
if err := hook.k.pruneRecords(ctx); err != nil {
ctx.Logger().Error("Error pruning old twaps at the epoch end", err)
}
lastKeptTime := ctx.BlockTime().Add(-hook.k.RecordHistoryKeepPeriod(ctx))
hook.k.SetPruningState(ctx, types.PruningState{
IsPruning: true,
LastKeptTime: lastKeptTime,
LastKeySeen: types.FormatHistoricalTimeIndexTWAPKey(lastKeptTime, 0, "", ""),
})
}
return nil
}
Expand Down
18 changes: 9 additions & 9 deletions x/twap/listeners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,20 +275,20 @@ func (s *TestSuite) TestAfterEpochEnd() {
err = s.App.TwapKeeper.EpochHooks().AfterEpochEnd(s.Ctx, allEpochs[i].Identifier, int64(1))
s.Require().NoError(err)

recordsAfterEpoch, err := s.twapkeeper.GetAllHistoricalTimeIndexedTWAPs(s.Ctx)
lastKeptTime := s.Ctx.BlockTime().Add(-s.twapkeeper.RecordHistoryKeepPeriod(s.Ctx))
pruneState := s.twapkeeper.GetPruningState(s.Ctx)

// old record should have been pruned here
// however, the newest younger than the prune threshold
// is kept.
// state entry should be set for pruning state
if allEpochs[i].Identifier == pruneEpochIdentifier {
s.Require().Equal(1, len(recordsAfterEpoch))
s.Require().Equal(newestRecord, recordsAfterEpoch[0])
s.Require().Equal(true, pruneState.IsPruning)
s.Require().Equal(lastKeptTime, pruneState.LastKeptTime)

// quit test once the record has been pruned
return
// reset pruning state to make sure other epochs do not modify it
s.twapkeeper.SetPruningState(s.Ctx, types.PruningState{})
} else { // pruning should not be triggered at first, not pruning epoch
s.Require().NoError(err)
s.Require().Equal(twapsBeforeEpoch, recordsAfterEpoch)
s.Require().Equal(false, pruneState.IsPruning)
s.Require().Equal(time.Time{}, pruneState.LastKeptTime)
}
}
}
Expand Down
20 changes: 8 additions & 12 deletions x/twap/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ func (k Keeper) EndBlock(ctx sdk.Context) {
" Skipping record update. Underlying err: %w", id, err).Error())
}
}

state := k.GetPruningState(ctx)
if state.IsPruning {
err := k.pruneRecordsBeforeTimeButNewest(ctx, state)
if err != nil {
ctx.Logger().Error("Error pruning old twaps at the end block", err)
}
}
}

// updateRecords updates all records for a given pool id.
Expand Down Expand Up @@ -195,18 +203,6 @@ func (k Keeper) updateRecord(ctx sdk.Context, record types.TwapRecord) (types.Tw
return newRecord, nil
}

// pruneRecords prunes twap records that happened earlier than recordHistoryKeepPeriod
// before current block time while preserving the most recent record before the threshold.
// Such record is preserved for each pool.
// See TWAP keeper's `pruneRecordsBeforeTimeButNewest(...)` for more details about the reasons for
// keeping this record.
func (k Keeper) pruneRecords(ctx sdk.Context) error {
recordHistoryKeepPeriod := k.RecordHistoryKeepPeriod(ctx)

lastKeptTime := ctx.BlockTime().Add(-recordHistoryKeepPeriod)
return k.pruneRecordsBeforeTimeButNewest(ctx, lastKeptTime)
}

// recordWithUpdatedAccumulators returns a record, with updated accumulator values and time for provided newTime,
// otherwise referred to as "interpolating the record" to the target time.
// This does not mutate the passed in record.
Expand Down
56 changes: 0 additions & 56 deletions x/twap/logic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,62 +572,6 @@ type computeThreeAssetArithmeticTwapTestCase struct {
expErr bool
}

// TestPruneRecords tests that twap records earlier than
// current block time - RecordHistoryKeepPeriod are pruned from the store
// while keeping the newest record before the above time threshold.
// Such record is kept for each pool.
func (s *TestSuite) TestPruneRecords() {
recordHistoryKeepPeriod := s.twapkeeper.RecordHistoryKeepPeriod(s.Ctx)

pool1OlderMin2MsRecord, // deleted
pool2OlderMin1MsRecordAB, pool2OlderMin1MsRecordAC, pool2OlderMin1MsRecordBC, // deleted
pool3OlderBaseRecord, // kept as newest under keep period
pool4OlderPlus1Record := // kept as newest under keep period
s.createTestRecordsFromTime(baseTime.Add(2 * -recordHistoryKeepPeriod))

pool1Min2MsRecord, // kept as newest under keep period
pool2Min1MsRecordAB, pool2Min1MsRecordAC, pool2Min1MsRecordBC, // kept as newest under keep period
pool3BaseRecord, // kept as it is at the keep period boundary
pool4Plus1Record := // kept as it is above the keep period boundary
s.createTestRecordsFromTime(baseTime.Add(-recordHistoryKeepPeriod))

// non-ascending insertion order.
recordsToPreSet := []types.TwapRecord{
pool2OlderMin1MsRecordAB, pool2OlderMin1MsRecordAC, pool2OlderMin1MsRecordBC,
pool4Plus1Record,
pool4OlderPlus1Record,
pool3OlderBaseRecord,
pool2Min1MsRecordAB, pool2Min1MsRecordAC, pool2Min1MsRecordBC,
pool3BaseRecord,
pool1Min2MsRecord,
pool1OlderMin2MsRecord,
}

// tMin2Record is before the threshold and is pruned away.
// tmin1Record is the newest record before current block time - record history keep period.
// All other records happen after the threshold and are kept.
expectedKeptRecords := []types.TwapRecord{
pool3OlderBaseRecord,
pool4OlderPlus1Record,
pool1Min2MsRecord,
pool2Min1MsRecordAB, pool2Min1MsRecordAC, pool2Min1MsRecordBC,
pool3BaseRecord,
pool4Plus1Record,
}
s.SetupTest()
s.preSetRecords(recordsToPreSet)

ctx := s.Ctx
twapKeeper := s.twapkeeper

ctx = ctx.WithBlockTime(baseTime)

err := twapKeeper.PruneRecords(ctx)
s.Require().NoError(err)

s.validateExpectedRecords(expectedKeptRecords)
}

// TestUpdateRecords tests that the records are updated correctly.
// It tests the following:
// - two-asset pools
Expand Down
38 changes: 36 additions & 2 deletions x/twap/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (
"github.com/osmosis-labs/osmosis/v23/x/twap/types"
)

// NumRecordsToPrunePerBlock is the number of records to prune per block.
// Two records are deleted per incentive record:
// 1. by time index
// 2. by pool index
// Therefore, setting this to 1000 means 500 complete incentive records are deleted per block.
var NumRecordsToPrunePerBlock uint16 = 1000

type timeTooOldError struct {
Time time.Time
}
Expand Down Expand Up @@ -73,15 +80,22 @@ func (k Keeper) StoreHistoricalTWAP(ctx sdk.Context, twap types.TwapRecord) {
// So, in order to have correct behavior for the desired guarantee,
// we keep the newest record that is older than the pruning time.
// This is why we would keep the -50 hour and -1hour twaps despite a 48hr pruning period
func (k Keeper) pruneRecordsBeforeTimeButNewest(ctx sdk.Context, lastKeptTime time.Time) error {
//
// If we reach the per block pruning limit, we store the last key seen in the pruning state.
// This is so that we can continue pruning from where we left off in the next block.
// If we have pruned all records, we set the pruning state to not pruning.
// There is a small bug here where we store more seenPoolAssetTriplets than we need to.
// Issue added here: https://github.com/osmosis-labs/osmosis/issues/7435
// The bloat is minimal though, and is not at risk of getting out of hand.
func (k Keeper) pruneRecordsBeforeTimeButNewest(ctx sdk.Context, state types.PruningState) error {
store := ctx.KVStore(k.storeKey)

// Reverse iterator guarantees that we iterate through the newest per pool first.
// Due to how it is indexed, we will only iterate times starting from
// lastKeptTime exclusively down to the oldest record.
iter := store.ReverseIterator(
[]byte(types.HistoricalTWAPTimeIndexPrefix),
types.FormatHistoricalTimeIndexTWAPKey(lastKeptTime, 0, "", ""))
state.LastKeySeen)
defer iter.Close()

// We mark what (pool id, asset 0, asset 1) triplets we've seen.
Expand All @@ -93,6 +107,8 @@ func (k Keeper) pruneRecordsBeforeTimeButNewest(ctx sdk.Context, lastKeptTime ti
}
seenPoolAssetTriplets := map[uniqueTriplet]struct{}{}

var numPruned uint16

for ; iter.Valid(); iter.Next() {
timeIndexKey := iter.Key()
timeS, poolId, asset0, asset1, err := types.ParseFieldsFromHistoricalTimeKey(timeIndexKey)
Expand All @@ -117,6 +133,24 @@ func (k Keeper) pruneRecordsBeforeTimeButNewest(ctx sdk.Context, lastKeptTime ti
store.Delete(timeIndexKey)
poolIndexKey := types.FormatHistoricalPoolIndexTWAPKeyFromStrTime(poolId, asset0, asset1, timeS)
store.Delete(poolIndexKey)

// Increment the number of records pruned by 2, since we delete two records per iteration.
numPruned += 2

if numPruned >= NumRecordsToPrunePerBlock {
// We have hit the limit, so we stop pruning.
break
}
}

if !iter.Valid() {
// The iterator is exhausted, so we have pruned all records.
state.IsPruning = false
k.SetPruningState(ctx, state)
} else {
// We have not pruned all records, so we update the last key seen.
state.LastKeySeen = iter.Key()
k.SetPruningState(ctx, state)
}
return nil
}
Expand Down
Loading

0 comments on commit 1c9e491

Please sign in to comment.