Skip to content

Commit

Permalink
[Store Gateway] Token bucket limiter (cortexproject#6016)
Browse files Browse the repository at this point in the history
* Create TokenBucket

Signed-off-by: Justin Jung <[email protected]>

* Update bucket stores to pass token bucket

Signed-off-by: Justin Jung <[email protected]>

* Move limiters to a new file

Signed-off-by: Justin Jung <[email protected]>

* Added tests for limiters and token bucket

Signed-off-by: Justin Jung <[email protected]>

* Add more tests

Signed-off-by: Justin Jung <[email protected]>

* Added enable flag

Signed-off-by: Justin Jung <[email protected]>

* Add dryrun feature

Signed-off-by: Justin Jung <[email protected]>

* Add doc

Signed-off-by: Justin Jung <[email protected]>

* Add changelog

Signed-off-by: Justin Jung <[email protected]>

* Lint

Signed-off-by: Justin Jung <[email protected]>

* Do not create pod token bucket if the feature is not enabled

Signed-off-by: Justin Jung <[email protected]>

* More docs

Signed-off-by: Justin Jung <[email protected]>

* Address comments

Signed-off-by: Justin Jung <[email protected]>

* Rename podTokenBucket to instanceTokenBucket

Signed-off-by: Justin Jung <[email protected]>

* Updated default values

Signed-off-by: Justin Jung <[email protected]>

* Rename TokenBucketLimiter to TokenBucketBytesLimiter

Signed-off-by: Justin Jung <[email protected]>

* Changed error to httpgrpc

Signed-off-by: Justin Jung <[email protected]>

* Nit

Signed-off-by: Justin Jung <[email protected]>

* Increment failure metric when token bucket returns error

Signed-off-by: Justin Jung <[email protected]>

* Simplify token bucket by making Retrieve to always deduct token

Signed-off-by: Justin Jung <[email protected]>

* Throw 429 and 422 for different failure scenarios

Signed-off-by: Justin Jung <[email protected]>

* Hide token factors from doc

Signed-off-by: Justin Jung <[email protected]>

* Simplified config by combining dryrun and enabled

Signed-off-by: Justin Jung <[email protected]>

* Remove test log

Signed-off-by: Justin Jung <[email protected]>

* Fix tests

Signed-off-by: Justin Jung <[email protected]>

* Fix

Signed-off-by: Justin Jung <[email protected]>

---------

Signed-off-by: Justin Jung <[email protected]>
  • Loading branch information
justinjung04 authored Jul 24, 2024
1 parent 42d7327 commit 5356796
Show file tree
Hide file tree
Showing 11 changed files with 625 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071
* [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081
* [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104
* [FEATURE] Store Gateway: Token bucket limiter. #6016
* [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916
Expand Down
18 changes: 18 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,24 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.series-batch-size
[series_batch_size: <int> | default = 10000]

token_bucket_bytes_limiter:
# Token bucket bytes limiter mode. Supported values are: disabled, dryrun,
# enabled
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.mode
[mode: <string> | default = "disabled"]

# Instance token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size
[instance_token_bucket_size: <int> | default = 859832320]

# User token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size
[user_token_bucket_size: <int> | default = 644874240]

# Request token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size
[request_token_bucket_size: <int> | default = 4194304]

tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
Expand Down
18 changes: 18 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,24 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.series-batch-size
[series_batch_size: <int> | default = 10000]

token_bucket_bytes_limiter:
# Token bucket bytes limiter mode. Supported values are: disabled, dryrun,
# enabled
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.mode
[mode: <string> | default = "disabled"]

# Instance token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size
[instance_token_bucket_size: <int> | default = 859832320]

# User token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size
[user_token_bucket_size: <int> | default = 644874240]

# Request token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size
[request_token_bucket_size: <int> | default = 4194304]

tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
Expand Down
18 changes: 18 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1899,6 +1899,24 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.series-batch-size
[series_batch_size: <int> | default = 10000]

token_bucket_bytes_limiter:
# Token bucket bytes limiter mode. Supported values are: disabled, dryrun,
# enabled
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.mode
[mode: <string> | default = "disabled"]

# Instance token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size
[instance_token_bucket_size: <int> | default = 859832320]

# User token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size
[user_token_bucket_size: <int> | default = 644874240]

# Request token bucket size
# CLI flag: -blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size
[request_token_bucket_size: <int> | default = 4194304]

tsdb:
# Local directory to store TSDBs in the ingesters.
# CLI flag: -blocks-storage.tsdb.dir
Expand Down
45 changes: 45 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tsdb

import (
"flag"
"fmt"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -52,6 +53,7 @@ var (

ErrInvalidBucketIndexBlockDiscoveryStrategy = errors.New("bucket index block discovery strategy can only be enabled when bucket index is enabled")
ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy")
ErrInvalidTokenBucketBytesLimiterMode = errors.New("invalid token bucket bytes limiter mode")
)

// BlocksStorageConfig holds the config information for the blocks storage.
Expand Down Expand Up @@ -292,6 +294,22 @@ type BucketStoreConfig struct {

// Controls how many series to fetch per batch in Store Gateway. Default value is 10000.
SeriesBatchSize int `yaml:"series_batch_size"`

// Token bucket configs
TokenBucketBytesLimiter TokenBucketBytesLimiterConfig `yaml:"token_bucket_bytes_limiter"`
}

type TokenBucketBytesLimiterConfig struct {
Mode string `yaml:"mode"`
InstanceTokenBucketSize int64 `yaml:"instance_token_bucket_size"`
UserTokenBucketSize int64 `yaml:"user_token_bucket_size"`
RequestTokenBucketSize int64 `yaml:"request_token_bucket_size"`
FetchedPostingsTokenFactor float64 `yaml:"fetched_postings_token_factor" doc:"hidden"`
TouchedPostingsTokenFactor float64 `yaml:"touched_postings_token_factor" doc:"hidden"`
FetchedSeriesTokenFactor float64 `yaml:"fetched_series_token_factor" doc:"hidden"`
TouchedSeriesTokenFactor float64 `yaml:"touched_series_token_factor" doc:"hidden"`
FetchedChunksTokenFactor float64 `yaml:"fetched_chunks_token_factor" doc:"hidden"`
TouchedChunksTokenFactor float64 `yaml:"touched_chunks_token_factor" doc:"hidden"`
}

// RegisterFlags registers the BucketStore flags
Expand Down Expand Up @@ -325,6 +343,16 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.LazyExpandedPostingsEnabled, "blocks-storage.bucket-store.lazy-expanded-postings-enabled", false, "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.")
f.IntVar(&cfg.SeriesBatchSize, "blocks-storage.bucket-store.series-batch-size", store.SeriesBatchSize, "Controls how many series to fetch per batch in Store Gateway. Default value is 10000.")
f.StringVar(&cfg.BlockDiscoveryStrategy, "blocks-storage.bucket-store.block-discovery-strategy", string(ConcurrentDiscovery), "One of "+strings.Join(supportedBlockDiscoveryStrategies, ", ")+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations. bucket_index strategy can be used in Compactor only and utilizes the existing bucket index to fetch block IDs to sync. This avoids iterating the bucket but can be impacted by delays of cleaner creating bucket index.")
f.StringVar(&cfg.TokenBucketBytesLimiter.Mode, "blocks-storage.bucket-store.token-bucket-bytes-limiter.mode", string(TokenBucketBytesLimiterDisabled), fmt.Sprintf("Token bucket bytes limiter mode. Supported values are: %s", strings.Join(supportedTokenBucketBytesLimiterModes, ", ")))
f.Int64Var(&cfg.TokenBucketBytesLimiter.InstanceTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size", int64(820*units.Mebibyte), "Instance token bucket size")
f.Int64Var(&cfg.TokenBucketBytesLimiter.UserTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size", int64(615*units.Mebibyte), "User token bucket size")
f.Int64Var(&cfg.TokenBucketBytesLimiter.RequestTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.request-token-bucket-size", int64(4*units.Mebibyte), "Request token bucket size")
f.Float64Var(&cfg.TokenBucketBytesLimiter.FetchedPostingsTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.fetched-postings-token-factor", 0, "Multiplication factor used for fetched postings token")
f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedPostingsTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-postings-token-factor", 5, "Multiplication factor used for touched postings token")
f.Float64Var(&cfg.TokenBucketBytesLimiter.FetchedSeriesTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.fetched-series-token-factor", 0, "Multiplication factor used for fetched series token")
f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedSeriesTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-series-token-factor", 25, "Multiplication factor used for touched series token")
f.Float64Var(&cfg.TokenBucketBytesLimiter.FetchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.fetched-chunks-token-factor", 0, "Multiplication factor used for fetched chunks token")
f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-chunks-token-factor", 1, "Multiplication factor used for touched chunks token")
}

// Validate the config.
Expand All @@ -344,6 +372,9 @@ func (cfg *BucketStoreConfig) Validate() error {
if !util.StringsContain(supportedBlockDiscoveryStrategies, cfg.BlockDiscoveryStrategy) {
return ErrInvalidBucketIndexBlockDiscoveryStrategy
}
if !util.StringsContain(supportedTokenBucketBytesLimiterModes, cfg.TokenBucketBytesLimiter.Mode) {
return ErrInvalidTokenBucketBytesLimiterMode
}
return nil
}

Expand Down Expand Up @@ -375,3 +406,17 @@ var supportedBlockDiscoveryStrategies = []string{
string(RecursiveDiscovery),
string(BucketIndexDiscovery),
}

type TokenBucketBytesLimiterMode string

const (
TokenBucketBytesLimiterDisabled TokenBucketBytesLimiterMode = "disabled"
TokenBucketBytesLimiterDryRun TokenBucketBytesLimiterMode = "dryrun"
TokenBucketBytesLimiterEnabled TokenBucketBytesLimiterMode = "enabled"
)

var supportedTokenBucketBytesLimiterModes = []string{
string(TokenBucketBytesLimiterDisabled),
string(TokenBucketBytesLimiterDryRun),
string(TokenBucketBytesLimiterEnabled),
}
101 changes: 52 additions & 49 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math"
"net/http"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -35,6 +34,7 @@ import (

"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/backoff"
cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"
util_log "github.com/cortexproject/cortex/pkg/util/log"
Expand Down Expand Up @@ -73,6 +73,11 @@ type BucketStores struct {
storesErrorsMu sync.RWMutex
storesErrors map[string]error

instanceTokenBucket *util.TokenBucket

userTokenBucketsMu sync.RWMutex
userTokenBuckets map[string]*util.TokenBucket

// Keeps number of inflight requests
inflightRequestCnt int
inflightRequestMu sync.RWMutex
Expand Down Expand Up @@ -115,6 +120,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
metaFetcherMetrics: NewMetadataFetcherMetrics(),
queryGate: queryGate,
partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
userTokenBuckets: make(map[string]*util.TokenBucket),
syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_bucket_stores_blocks_sync_seconds",
Help: "The total time it takes to perform a sync stores",
Expand Down Expand Up @@ -144,6 +150,13 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
return nil, errors.Wrap(err, "create chunks bytes pool")
}

if u.cfg.BucketStore.TokenBucketBytesLimiter.Mode != string(tsdb.TokenBucketBytesLimiterDisabled) {
u.instanceTokenBucket = util.NewTokenBucket(cfg.BucketStore.TokenBucketBytesLimiter.InstanceTokenBucketSize, promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_bucket_stores_instance_token_bucket_remaining",
Help: "Number of tokens left in instance token bucket.",
}))
}

if reg != nil {
reg.MustRegister(u.bucketStoreMetrics, u.metaFetcherMetrics)
}
Expand Down Expand Up @@ -475,6 +488,12 @@ func (u *BucketStores) closeEmptyBucketStore(userID string) error {
unlockInDefer = false
u.storesMu.Unlock()

if u.cfg.BucketStore.TokenBucketBytesLimiter.Mode != string(tsdb.TokenBucketBytesLimiterDisabled) {
u.userTokenBucketsMu.Lock()
delete(u.userTokenBuckets, userID)
u.userTokenBucketsMu.Unlock()
}

u.metaFetcherMetrics.RemoveUserRegistry(userID)
u.bucketStoreMetrics.RemoveUserRegistry(userID)
return bs.Close()
Expand Down Expand Up @@ -612,13 +631,19 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging())
}

if u.cfg.BucketStore.TokenBucketBytesLimiter.Mode != string(tsdb.TokenBucketBytesLimiterDisabled) {
u.userTokenBucketsMu.Lock()
u.userTokenBuckets[userID] = util.NewTokenBucket(u.cfg.BucketStore.TokenBucketBytesLimiter.UserTokenBucketSize, nil)
u.userTokenBucketsMu.Unlock()
}

bs, err := store.NewBucketStore(
userBkt,
fetcher,
u.syncDirForUser(userID),
newChunksLimiterFactory(u.limits, userID),
newSeriesLimiterFactory(u.limits, userID),
newBytesLimiterFactory(u.limits, userID),
newBytesLimiterFactory(u.limits, userID, u.getUserTokenBucket(userID), u.instanceTokenBucket, u.cfg.BucketStore.TokenBucketBytesLimiter, u.getTokensToRetrieve),
u.partitioner,
u.cfg.BucketStore.BlockSyncConcurrency,
false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers
Expand Down Expand Up @@ -680,6 +705,31 @@ func (u *BucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[str
}
}

func (u *BucketStores) getUserTokenBucket(userID string) *util.TokenBucket {
u.userTokenBucketsMu.RLock()
defer u.userTokenBucketsMu.RUnlock()
return u.userTokenBuckets[userID]
}

func (u *BucketStores) getTokensToRetrieve(tokens uint64, dataType store.StoreDataType) int64 {
tokensToRetrieve := float64(tokens)
switch dataType {
case store.PostingsFetched:
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.FetchedPostingsTokenFactor
case store.PostingsTouched:
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.TouchedPostingsTokenFactor
case store.SeriesFetched:
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.FetchedSeriesTokenFactor
case store.SeriesTouched:
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.TouchedSeriesTokenFactor
case store.ChunksFetched:
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.FetchedChunksTokenFactor
case store.ChunksTouched:
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketBytesLimiter.TouchedChunksTokenFactor
}
return int64(tokensToRetrieve)
}

func getUserIDFromGRPCContext(ctx context.Context) string {
meta, ok := metadata.FromIncomingContext(ctx)
if !ok {
Expand Down Expand Up @@ -730,50 +780,3 @@ type spanSeriesServer struct {
func (s spanSeriesServer) Context() context.Context {
return s.ctx
}

type limiter struct {
limiter *store.Limiter
}

func (c *limiter) Reserve(num uint64) error {
return c.ReserveWithType(num, 0)
}

func (c *limiter) ReserveWithType(num uint64, _ store.StoreDataType) error {
err := c.limiter.Reserve(num)
if err != nil {
return httpgrpc.Errorf(http.StatusUnprocessableEntity, err.Error())
}

return nil
}

func newChunksLimiterFactory(limits *validation.Overrides, userID string) store.ChunksLimiterFactory {
return func(failedCounter prometheus.Counter) store.ChunksLimiter {
// Since limit overrides could be live reloaded, we have to get the current user's limit
// each time a new limiter is instantiated.
return &limiter{
limiter: store.NewLimiter(uint64(limits.MaxChunksPerQueryFromStore(userID)), failedCounter),
}
}
}

func newSeriesLimiterFactory(limits *validation.Overrides, userID string) store.SeriesLimiterFactory {
return func(failedCounter prometheus.Counter) store.SeriesLimiter {
// Since limit overrides could be live reloaded, we have to get the current user's limit
// each time a new limiter is instantiated.
return &limiter{
limiter: store.NewLimiter(uint64(limits.MaxFetchedSeriesPerQuery(userID)), failedCounter),
}
}
}

func newBytesLimiterFactory(limits *validation.Overrides, userID string) store.BytesLimiterFactory {
return func(failedCounter prometheus.Counter) store.BytesLimiter {
// Since limit overrides could be live reloaded, we have to get the current user's limit
// each time a new limiter is instantiated.
return &limiter{
limiter: store.NewLimiter(uint64(limits.MaxDownloadedBytesPerRequest(userID)), failedCounter),
}
}
}
Loading

0 comments on commit 5356796

Please sign in to comment.