From ac37680e56bf6ef56185ab412348ad634e483a9e Mon Sep 17 00:00:00 2001 From: mconcat Date: Fri, 7 Jan 2022 05:49:05 +0900 Subject: [PATCH] Pool Cleanup (#652) * add pool cleanup * add pool destruction on cleanup * gofmt * Update x/gamm/keeper/pool.go Co-authored-by: Sunny Aggarwal * fix test * add ForceUnlock * move to batch processing cleanup, exclude module accounts * Update x/lockup/keeper/lock.go Co-authored-by: Dev Ojha * fix lock iteration and reuse unlock logic * Minor grammar fixes Co-authored-by: Sunny Aggarwal Co-authored-by: Dev Ojha --- x/gamm/keeper/pool.go | 103 ++++++++++++++ x/gamm/keeper/pool_test.go | 222 +++++++++++++++++++++++++++++++ x/gamm/types/errors.go | 17 +-- x/gamm/types/expected_keepers.go | 1 + x/gamm/types/query.pb.gw.go | 34 ++++- x/lockup/keeper/lock.go | 22 +++ 6 files changed, 390 insertions(+), 9 deletions(-) create mode 100644 x/gamm/keeper/pool_test.go diff --git a/x/gamm/keeper/pool.go b/x/gamm/keeper/pool.go index 7c910beb9a4..11ae0f78d9f 100644 --- a/x/gamm/keeper/pool.go +++ b/x/gamm/keeper/pool.go @@ -80,6 +80,109 @@ func (k Keeper) SetPool(ctx sdk.Context, pool types.PoolI) error { return nil } +func (k Keeper) DeletePool(ctx sdk.Context, poolId uint64) error { + store := ctx.KVStore(k.storeKey) + poolKey := types.GetKeyPrefixPools(poolId) + if !store.Has(poolKey) { + return fmt.Errorf("pool with ID %d does not exist", poolId) + } + + store.Delete(poolKey) + return nil +} + +// CleanupBalancerPool destructs a pool and refund all the assets according to +// the shares held by the accounts. CleanupBalancerPool should not be called during +// the chain execution time, as it iterates the entire account balances. +// TODO: once SDK v0.46.0, use https://github.com/cosmos/cosmos-sdk/pull/9611 +// +// All locks on this pool share must be unlocked prior to execution. Use LockupKeeper.ForceUnlock +// on remaining locks before calling this function. +func (k Keeper) CleanupBalancerPool(ctx sdk.Context, poolIds []uint64, excludedModules []string) (err error) { + pools := make(map[string]types.PoolI) + totalShares := make(map[string]sdk.Int) + for _, poolId := range poolIds { + pool, err := k.GetPool(ctx, poolId) + if err != nil { + return err + } + shareDenom := pool.GetTotalShares().Denom + pools[shareDenom] = pool + totalShares[shareDenom] = pool.GetTotalShares().Amount + } + + moduleAccounts := make(map[string]string) + for _, module := range excludedModules { + moduleAccounts[string(authtypes.NewModuleAddress(module))] = module + } + + // first iterate through the share holders and burn them + k.bankKeeper.IterateAllBalances(ctx, func(addr sdk.AccAddress, coin sdk.Coin) (stop bool) { + if coin.Amount.IsZero() { + return + } + + pool, ok := pools[coin.Denom] + if !ok { + return + } + + // track the iterated shares + pool.SubTotalShares(coin.Amount) + pools[coin.Denom] = pool + + // check if the shareholder is a module + if _, ok = moduleAccounts[coin.Denom]; ok { + return + } + + // Burn the share tokens + err = k.bankKeeper.SendCoinsFromAccountToModule(ctx, addr, types.ModuleName, sdk.Coins{coin}) + if err != nil { + return true + } + + err = k.bankKeeper.BurnCoins(ctx, types.ModuleName, sdk.Coins{coin}) + if err != nil { + return true + } + + // Refund assets + for _, asset := range pool.GetAllPoolAssets() { + // lpShareEquivalentTokens = (amount in pool) * (your shares) / (total shares) + lpShareEquivalentTokens := asset.Token.Amount.Mul(coin.Amount).Quo(totalShares[coin.Denom]) + if lpShareEquivalentTokens.IsZero() { + continue + } + err = k.bankKeeper.SendCoins( + ctx, pool.GetAddress(), addr, sdk.Coins{{asset.Token.Denom, lpShareEquivalentTokens}}) + if err != nil { + return true + } + } + + return false + }) + + if err != nil { + return err + } + + for _, pool := range pools { + // sanity check + if !pool.GetTotalShares().IsZero() { + panic("pool total share should be zero after cleanup") + } + + err = k.DeletePool(ctx, pool.GetId()) + if err != nil { + return err + } + } + + return nil +} + // newBalancerPool is an internal function that creates a new Balancer Pool object with the provided // parameters, initial assets, and future governor. func (k Keeper) newBalancerPool(ctx sdk.Context, balancerPoolParams balancer.BalancerPoolParams, assets []types.PoolAsset, futureGovernor string) (types.PoolI, error) { diff --git a/x/gamm/keeper/pool_test.go b/x/gamm/keeper/pool_test.go new file mode 100644 index 00000000000..f6476c7452c --- /dev/null +++ b/x/gamm/keeper/pool_test.go @@ -0,0 +1,222 @@ +package keeper_test + +import ( + "math/rand" + "time" + + "github.com/cosmos/cosmos-sdk/simapp" + sdk "github.com/cosmos/cosmos-sdk/types" + + "github.com/osmosis-labs/osmosis/x/gamm/types" +) + +func (suite *KeeperTestSuite) TestCleanupPool() { + // Mint some assets to the accounts. + for _, acc := range []sdk.AccAddress{acc1, acc2, acc3} { + err := simapp.FundAccount( + suite.app.BankKeeper, + suite.ctx, + acc, + sdk.NewCoins( + sdk.NewCoin("uosmo", sdk.NewInt(1000000000)), + sdk.NewCoin("foo", sdk.NewInt(1000)), + sdk.NewCoin("bar", sdk.NewInt(1000)), + sdk.NewCoin("baz", sdk.NewInt(1000)), + ), + ) + if err != nil { + panic(err) + } + } + + poolId, err := suite.app.GAMMKeeper.CreateBalancerPool(suite.ctx, acc1, defaultBalancerPoolParams, []types.PoolAsset{ + { + Weight: sdk.NewInt(100), + Token: sdk.NewCoin("foo", sdk.NewInt(1000)), + }, + { + Weight: sdk.NewInt(100), + Token: sdk.NewCoin("bar", sdk.NewInt(1000)), + }, + { + Weight: sdk.NewInt(100), + Token: sdk.NewCoin("baz", sdk.NewInt(1000)), + }, + }, "") + suite.NoError(err) + + for _, acc := range []sdk.AccAddress{acc2, acc3} { + err = suite.app.GAMMKeeper.JoinPool(suite.ctx, acc, poolId, types.OneShare.MulRaw(100), sdk.NewCoins( + sdk.NewCoin("foo", sdk.NewInt(1000)), + sdk.NewCoin("bar", sdk.NewInt(1000)), + sdk.NewCoin("baz", sdk.NewInt(1000)), + )) + suite.NoError(err) + } + + pool, err := suite.app.GAMMKeeper.GetPool(suite.ctx, poolId) + suite.NoError(err) + denom := pool.GetTotalShares().Denom + totalAmount := sdk.ZeroInt() + for _, acc := range []sdk.AccAddress{acc1, acc2, acc3} { + coin := suite.app.BankKeeper.GetBalance(suite.ctx, acc, denom) + suite.True(coin.Amount.Equal(types.OneShare.MulRaw(100))) + totalAmount = totalAmount.Add(coin.Amount) + } + suite.True(totalAmount.Equal(types.OneShare.MulRaw(300))) + + err = suite.app.GAMMKeeper.CleanupBalancerPool(suite.ctx, []uint64{poolId}, []string{}) + suite.NoError(err) + for _, acc := range []sdk.AccAddress{acc1, acc2, acc3} { + for _, denom := range []string{"foo", "bar", "baz"} { + amt := suite.app.BankKeeper.GetBalance(suite.ctx, acc, denom) + suite.True(amt.Amount.Equal(sdk.NewInt(1000)), + "Expected equal %s: %d, %d", amt.Denom, amt.Amount.Int64(), 1000) + } + } +} + +func (suite *KeeperTestSuite) TestCleanupPoolRandomized() { + // address => deposited coins + coinOf := make(map[string]sdk.Coins) + denoms := []string{"foo", "bar", "baz"} + + // Mint some assets to the accounts. + for _, acc := range []sdk.AccAddress{acc1, acc2, acc3} { + coins := make(sdk.Coins, 3) + for i := range coins { + amount := sdk.NewInt(rand.Int63n(1000)) + // give large amount of coins to the pool creator + if i == 0 { + amount = amount.MulRaw(10000) + } + coins[i] = sdk.Coin{denoms[i], amount} + } + coinOf[acc.String()] = coins + coins = append(coins, sdk.NewCoin("uosmo", sdk.NewInt(1000000000))) + + err := simapp.FundAccount( + suite.app.BankKeeper, + suite.ctx, + acc, + coins.Sort(), + ) + if err != nil { + panic(err) + } + } + + initialAssets := []types.PoolAsset{} + for _, coin := range coinOf[acc1.String()] { + initialAssets = append(initialAssets, types.PoolAsset{Weight: types.OneShare.MulRaw(100), Token: coin}) + } + poolId, err := suite.app.GAMMKeeper.CreateBalancerPool(suite.ctx, acc1, defaultBalancerPoolParams, initialAssets, "") + suite.NoError(err) + + for _, acc := range []sdk.AccAddress{acc2, acc3} { + err = suite.app.GAMMKeeper.JoinPool(suite.ctx, acc, poolId, types.OneShare, coinOf[acc.String()]) + suite.NoError(err) + } + + err = suite.app.GAMMKeeper.CleanupBalancerPool(suite.ctx, []uint64{poolId}, []string{}) + suite.NoError(err) + for _, acc := range []sdk.AccAddress{acc1, acc2, acc3} { + for _, coin := range coinOf[acc.String()] { + amt := suite.app.BankKeeper.GetBalance(suite.ctx, acc, coin.Denom) + // the refund could have rounding error + suite.True(amt.Amount.Equal(coin.Amount) || amt.Amount.Equal(coin.Amount.SubRaw(1)), + "Expected equal %s: %d, %d", amt.Denom, amt.Amount.Int64(), coin.Amount.Int64()) + } + } +} + +func (suite *KeeperTestSuite) TestCleanupPoolErrorOnSwap() { + suite.ctx = suite.ctx.WithBlockTime(time.Unix(1000, 1000)) + err := simapp.FundAccount( + suite.app.BankKeeper, + suite.ctx, + acc1, + sdk.NewCoins( + sdk.NewCoin("uosmo", sdk.NewInt(1000000000)), + sdk.NewCoin("foo", sdk.NewInt(1000)), + sdk.NewCoin("bar", sdk.NewInt(1000)), + sdk.NewCoin("baz", sdk.NewInt(1000)), + ), + ) + if err != nil { + panic(err) + } + + poolId, err := suite.app.GAMMKeeper.CreateBalancerPool(suite.ctx, acc1, defaultBalancerPoolParams, []types.PoolAsset{ + { + Weight: sdk.NewInt(100), + Token: sdk.NewCoin("foo", sdk.NewInt(1000)), + }, + { + Weight: sdk.NewInt(100), + Token: sdk.NewCoin("bar", sdk.NewInt(1000)), + }, + { + Weight: sdk.NewInt(100), + Token: sdk.NewCoin("baz", sdk.NewInt(1000)), + }, + }, "") + suite.NoError(err) + + err = suite.app.GAMMKeeper.CleanupBalancerPool(suite.ctx, []uint64{poolId}, []string{}) + suite.NoError(err) + + _, _, err = suite.app.GAMMKeeper.SwapExactAmountIn(suite.ctx, acc1, poolId, sdk.NewCoin("foo", sdk.NewInt(1)), "bar", sdk.NewInt(1)) + suite.Error(err) +} + +func (suite *KeeperTestSuite) TestCleanupPoolWithLockup() { + suite.ctx = suite.ctx.WithBlockTime(time.Unix(1000, 1000)) + err := simapp.FundAccount( + suite.app.BankKeeper, + suite.ctx, + acc1, + sdk.NewCoins( + sdk.NewCoin("uosmo", sdk.NewInt(1000000000)), + sdk.NewCoin("foo", sdk.NewInt(1000)), + sdk.NewCoin("bar", sdk.NewInt(1000)), + sdk.NewCoin("baz", sdk.NewInt(1000)), + ), + ) + if err != nil { + panic(err) + } + + poolId, err := suite.app.GAMMKeeper.CreateBalancerPool(suite.ctx, acc1, defaultBalancerPoolParams, []types.PoolAsset{ + { + Weight: sdk.NewInt(100), + Token: sdk.NewCoin("foo", sdk.NewInt(1000)), + }, + { + Weight: sdk.NewInt(100), + Token: sdk.NewCoin("bar", sdk.NewInt(1000)), + }, + { + Weight: sdk.NewInt(100), + Token: sdk.NewCoin("baz", sdk.NewInt(1000)), + }, + }, "") + suite.NoError(err) + + _, err = suite.app.LockupKeeper.LockTokens(suite.ctx, acc1, sdk.Coins{sdk.NewCoin(types.GetPoolShareDenom(poolId), types.InitPoolSharesSupply)}, time.Hour) + suite.NoError(err) + + for _, lock := range suite.app.LockupKeeper.GetLocksDenom(suite.ctx, types.GetPoolShareDenom(poolId)) { + err = suite.app.LockupKeeper.ForceUnlock(suite.ctx, lock) + suite.NoError(err) + } + + err = suite.app.GAMMKeeper.CleanupBalancerPool(suite.ctx, []uint64{poolId}, []string{}) + suite.NoError(err) + for _, coin := range []string{"foo", "bar", "baz"} { + amt := suite.app.BankKeeper.GetBalance(suite.ctx, acc1, coin) + // the refund could have rounding error + suite.True(amt.Amount.Equal(sdk.NewInt(1000)) || amt.Amount.Equal(sdk.NewInt(1000).SubRaw(1)), + "Expected equal %s: %d, %d", amt.Denom, amt.Amount.Int64(), sdk.NewInt(1000).Int64()) + } +} diff --git a/x/gamm/types/errors.go b/x/gamm/types/errors.go index de34eff0bff..26e01e8ad8e 100644 --- a/x/gamm/types/errors.go +++ b/x/gamm/types/errors.go @@ -4,14 +4,15 @@ import sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" // x/gamm module sentinel errors var ( - ErrPoolNotFound = sdkerrors.Register(ModuleName, 1, "pool not found") - ErrPoolAlreadyExist = sdkerrors.Register(ModuleName, 2, "pool already exist") - ErrPoolLocked = sdkerrors.Register(ModuleName, 3, "pool is locked") - ErrTooFewPoolAssets = sdkerrors.Register(ModuleName, 4, "pool should have at least 2 assets, as they must be swapping between at least two assets") - ErrTooManyPoolAssets = sdkerrors.Register(ModuleName, 5, "pool has too many assets (currently capped at 8 assets per pool)") - ErrLimitMaxAmount = sdkerrors.Register(ModuleName, 6, "calculated amount is larger than max amount") - ErrLimitMinAmount = sdkerrors.Register(ModuleName, 7, "calculated amount is lesser than min amount") - ErrInvalidMathApprox = sdkerrors.Register(ModuleName, 8, "invalid calculated result") + ErrPoolNotFound = sdkerrors.Register(ModuleName, 1, "pool not found") + ErrPoolAlreadyExist = sdkerrors.Register(ModuleName, 2, "pool already exist") + ErrPoolLocked = sdkerrors.Register(ModuleName, 3, "pool is locked") + ErrTooFewPoolAssets = sdkerrors.Register(ModuleName, 4, "pool should have at least 2 assets, as they must be swapping between at least two assets") + ErrTooManyPoolAssets = sdkerrors.Register(ModuleName, 5, "pool has too many assets (currently capped at 8 assets per pool)") + ErrLimitMaxAmount = sdkerrors.Register(ModuleName, 6, "calculated amount is larger than max amount") + ErrLimitMinAmount = sdkerrors.Register(ModuleName, 7, "calculated amount is lesser than min amount") + ErrInvalidMathApprox = sdkerrors.Register(ModuleName, 8, "invalid calculated result") + ErrAlreadyInvalidPool = sdkerrors.Register(ModuleName, 9, "destruction on already invalid pool") ErrEmptyRoutes = sdkerrors.Register(ModuleName, 21, "routes not defined") ErrEmptyPoolAssets = sdkerrors.Register(ModuleName, 22, "PoolAssets not defined") diff --git a/x/gamm/types/expected_keepers.go b/x/gamm/types/expected_keepers.go index 62ab13f175c..ee18d78bcd7 100644 --- a/x/gamm/types/expected_keepers.go +++ b/x/gamm/types/expected_keepers.go @@ -54,6 +54,7 @@ type BankKeeper interface { GetSupply(ctx sdk.Context, denom string) sdk.Coin UndelegateCoinsFromModuleToAccount(ctx sdk.Context, senderModule string, recipientAddr sdk.AccAddress, amt sdk.Coins) error DelegateCoinsFromAccountToModule(ctx sdk.Context, senderAddr sdk.AccAddress, recipientModule string, amt sdk.Coins) error + IterateAllBalances(ctx sdk.Context, callback func(addr sdk.AccAddress, coin sdk.Coin) (stop bool)) } // DistrKeeper defines the contract needed to be fulfilled for distribution keeper diff --git a/x/gamm/types/query.pb.gw.go b/x/gamm/types/query.pb.gw.go index a33763ce2aa..ec23110dba6 100644 --- a/x/gamm/types/query.pb.gw.go +++ b/x/gamm/types/query.pb.gw.go @@ -20,6 +20,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -30,6 +31,7 @@ var _ status.Status var _ = runtime.String var _ = utilities.NewDoubleArray var _ = descriptor.ForMessage +var _ = metadata.Join var ( filter_Query_Pools_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)} @@ -538,12 +540,14 @@ func local_request_Query_EstimateSwapExactAmountOut_0(ctx context.Context, marsh // RegisterQueryHandlerServer registers the http handlers for service Query to "mux". // UnaryRPC :call QueryServer directly. // StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. -// Note that using this registration option will cause many gRPC library features (such as grpc.SendHeader, etc) to stop working. Consider using RegisterQueryHandlerFromEndpoint instead. +// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterQueryHandlerFromEndpoint instead. func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, server QueryServer) error { mux.Handle("GET", pattern_Query_Pools_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -551,6 +555,7 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv return } resp, md, err := local_request_Query_Pools_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -564,6 +569,8 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv mux.Handle("GET", pattern_Query_NumPools_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -571,6 +578,7 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv return } resp, md, err := local_request_Query_NumPools_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -584,6 +592,8 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv mux.Handle("GET", pattern_Query_TotalLiquidity_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -591,6 +601,7 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv return } resp, md, err := local_request_Query_TotalLiquidity_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -604,6 +615,8 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv mux.Handle("GET", pattern_Query_Pool_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -611,6 +624,7 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv return } resp, md, err := local_request_Query_Pool_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -624,6 +638,8 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv mux.Handle("GET", pattern_Query_PoolParams_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -631,6 +647,7 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv return } resp, md, err := local_request_Query_PoolParams_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -644,6 +661,8 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv mux.Handle("GET", pattern_Query_TotalShares_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -651,6 +670,7 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv return } resp, md, err := local_request_Query_TotalShares_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -664,6 +684,8 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv mux.Handle("GET", pattern_Query_PoolAssets_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -671,6 +693,7 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv return } resp, md, err := local_request_Query_PoolAssets_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -684,6 +707,8 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv mux.Handle("GET", pattern_Query_SpotPrice_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -691,6 +716,7 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv return } resp, md, err := local_request_Query_SpotPrice_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -704,6 +730,8 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv mux.Handle("GET", pattern_Query_EstimateSwapExactAmountIn_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -711,6 +739,7 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv return } resp, md, err := local_request_Query_EstimateSwapExactAmountIn_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -724,6 +753,8 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv mux.Handle("GET", pattern_Query_EstimateSwapExactAmountOut_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -731,6 +762,7 @@ func RegisterQueryHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv return } resp, md, err := local_request_Query_EstimateSwapExactAmountOut_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) diff --git a/x/lockup/keeper/lock.go b/x/lockup/keeper/lock.go index 47a578662de..4efa1d16cac 100644 --- a/x/lockup/keeper/lock.go +++ b/x/lockup/keeper/lock.go @@ -219,6 +219,10 @@ func (k Keeper) GetLocksPastTimeDenom(ctx sdk.Context, denom string, timestamp t return combineLocks(notUnlockings, unlockings) } +func (k Keeper) GetLocksDenom(ctx sdk.Context, denom string) []types.PeriodLock { + return k.GetLocksLongerThanDurationDenom(ctx, denom, time.Duration(0)) +} + // GetLockedDenom Returns the total amount of denom that are locked func (k Keeper) GetLockedDenom(ctx sdk.Context, denom string, duration time.Duration) sdk.Int { totalAmtLocked := k.GetPeriodLocksAccumulation(ctx, types.QueryCondition{ @@ -532,6 +536,11 @@ func (k Keeper) Unlock(ctx sdk.Context, lock types.PeriodLock) error { return fmt.Errorf("lock is not unlockable yet: %s >= %s", curTime.String(), lock.EndTime.String()) } + return k.unlock(ctx, lock) +} + +func (k Keeper) unlock(ctx sdk.Context, lock types.PeriodLock) error { + owner, err := sdk.AccAddressFromBech32(lock.Owner) if err != nil { return err @@ -560,3 +569,16 @@ func (k Keeper) Unlock(ctx sdk.Context, lock types.PeriodLock) error { k.hooks.OnTokenUnlocked(ctx, owner, lock.ID, lock.Coins, lock.Duration, lock.EndTime) return nil } + +// ForceUnlock ignores unlock duration and immediately unlock and refund. +// CONTRACT: should be used only at the chain upgrade script +// TODO: Revisit for Superfluid Staking +func (k Keeper) ForceUnlock(ctx sdk.Context, lock types.PeriodLock) error { + if !lock.IsUnlocking() { + err := k.BeginUnlock(ctx, lock) + if err != nil { + return err + } + } + return k.unlock(ctx, lock) +}