Skip to content

Commit

Permalink
ttl: fix the infinite waiting for delRateLimiter when `tidb_ttl_delet…
Browse files Browse the repository at this point in the history
…e_rate_limit` changes (#58485)

close #58484
  • Loading branch information
lcwangchao authored Dec 24, 2024
1 parent 33f0727 commit 392fb75
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 7 deletions.
32 changes: 28 additions & 4 deletions pkg/ttl/ttlworker/del.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/ttl/sqlbuilder"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
"golang.org/x/time/rate"
Expand All @@ -48,27 +49,44 @@ var globalDelRateLimiter = newDelRateLimiter()

type defaultDelRateLimiter struct {
sync.Mutex
// limiter limits the rate of delete operation.
// limit.Limit() has a range [1.0, +rate.Inf].
// When the value of system variable `tidb_ttl_delete_rate_limit` is `0`, `limit.Limit()` returns `rate.Inf`.
limiter *rate.Limiter
limit atomic.Int64
// limit is the rate limit of the limiter that is the same value of system variable `tidb_ttl_delete_rate_limit`.
// When it is 0, it means unlimited and `limiter.Limit()` will return `rate.Inf`.
limit atomic.Int64
}

func newDelRateLimiter() delRateLimiter {
limiter := &defaultDelRateLimiter{}
limiter.limiter = rate.NewLimiter(0, 1)
limiter.limiter = rate.NewLimiter(rate.Inf, 1)
limiter.limit.Store(0)
return limiter
}

type beforeWaitLimiterForTestType struct{}

var beforeWaitLimiterForTest = &beforeWaitLimiterForTestType{}

func (l *defaultDelRateLimiter) WaitDelToken(ctx context.Context) error {
limit := l.limit.Load()
if variable.TTLDeleteRateLimit.Load() != limit {
limit = l.reset()
}

if limit == 0 {
intest.Assert(limit >= 0)
if limit <= 0 {
return ctx.Err()
}

if intest.InTest {
intest.Assert(l.limiter.Limit() > 0)
if fn, ok := ctx.Value(beforeWaitLimiterForTest).(func()); ok {
fn()
}
}

return l.limiter.Wait(ctx)
}

Expand All @@ -78,7 +96,13 @@ func (l *defaultDelRateLimiter) reset() (newLimit int64) {
newLimit = variable.TTLDeleteRateLimit.Load()
if newLimit != l.limit.Load() {
l.limit.Store(newLimit)
l.limiter.SetLimit(rate.Limit(newLimit))
rateLimit := rate.Inf
if newLimit > 0 {
// When `TTLDeleteRateLimit > 0`, use the setting as the rate limit.
// Otherwise, use `rate.Inf` to make it unlimited.
rateLimit = rate.Limit(newLimit)
}
l.limiter.SetLimit(rateLimit)
}
return
}
Expand Down
77 changes: 74 additions & 3 deletions pkg/ttl/ttlworker/del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"slices"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -394,11 +395,22 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
}

func TestTTLDeleteRateLimiter(t *testing.T) {
origDeleteLimit := variable.TTLDeleteRateLimit.Load()
origGlobalDelRateLimiter := globalDelRateLimiter
defer func() {
variable.TTLDeleteRateLimit.Store(origDeleteLimit)
globalDelRateLimiter = origGlobalDelRateLimiter
variable.TTLDeleteRateLimit.Store(variable.DefTiDBTTLDeleteRateLimit)
}()

// The global inner limiter should have a default config
require.Equal(t, 0, variable.DefTiDBTTLDeleteRateLimit)
require.Equal(t, int64(0), variable.TTLDeleteRateLimit.Load())
require.Equal(t, int64(0), globalDelRateLimiter.(*defaultDelRateLimiter).limit.Load())
require.Equal(t, rate.Inf, globalDelRateLimiter.(*defaultDelRateLimiter).limiter.Limit())
// The newDelRateLimiter() should return a default config
globalDelRateLimiter = newDelRateLimiter()
require.Equal(t, int64(0), globalDelRateLimiter.(*defaultDelRateLimiter).limit.Load())
require.Equal(t, rate.Inf, globalDelRateLimiter.(*defaultDelRateLimiter).limiter.Limit())

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer func() {
if cancel != nil {
Expand All @@ -413,7 +425,7 @@ func TestTTLDeleteRateLimiter(t *testing.T) {

variable.TTLDeleteRateLimit.Store(0)
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
require.Equal(t, rate.Limit(0), globalDelRateLimiter.(*defaultDelRateLimiter).limiter.Limit())
require.Equal(t, rate.Inf, globalDelRateLimiter.(*defaultDelRateLimiter).limiter.Limit())
require.Equal(t, int64(0), globalDelRateLimiter.(*defaultDelRateLimiter).limit.Load())

// 0 stands for no limit
Expand Down Expand Up @@ -569,3 +581,62 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
require.Equal(t, uint64(3), tasks[4].statistics.SuccessRows.Load())
require.Equal(t, uint64(0), tasks[4].statistics.ErrorRows.Load())
}

// TestDelRateLimiterConcurrency is used to test some concurrency cases of delRateLimiter.
// See issue: https://github.com/pingcap/tidb/issues/58484
// It tests the below case:
// 1. The `tidb_ttl_delete_rate_limit` set to some non-zero value such as 128.
// 2. Some delWorker delete rows concurrency and try to wait for the inner `rate.Limiter`.
// 3. Before internal `l.limiter.Wait` is called, the `tidb_ttl_delete_rate_limit` is set to 0.
// It resets the internal `rate.Limiter` (in the bug codes, its rate is set to 0).
// 4. The delWorkers in step 2 continue to call l.limiter.Wait.
// In the bug codes, some of them are blocked forever because the rate is set to 0.
func TestDelRateLimiterConcurrency(t *testing.T) {
origGlobalDelRateLimiter := globalDelRateLimiter
defer func() {
globalDelRateLimiter = origGlobalDelRateLimiter
variable.TTLDeleteRateLimit.Store(variable.DefTiDBTTLDeleteRateLimit)
}()

globalDelRateLimiter = newDelRateLimiter()
require.NoError(t, globalDelRateLimiter.WaitDelToken(context.Background()))

variable.TTLDeleteRateLimit.Store(128)
var waiting atomic.Int64
continue1 := make(chan struct{})
continue2 := make(chan struct{})
continue3 := make(chan struct{})
cnt := 4
for i := 0; i < cnt; i++ {
go func() {
ctx := context.WithValue(context.Background(), beforeWaitLimiterForTest, func() {
if waiting.Add(1) == int64(cnt) {
close(continue1)
}
<-continue2
})
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
if waiting.Add(-1) == 0 {
close(continue3)
}
}()
}

timeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

select {
case <-continue1:
variable.TTLDeleteRateLimit.Store(0)
require.NoError(t, globalDelRateLimiter.WaitDelToken(timeCtx))
close(continue2)
case <-timeCtx.Done():
require.FailNow(t, "timeout")
}

select {
case <-continue3:
case <-timeCtx.Done():
require.FailNow(t, "timeout")
}
}

0 comments on commit 392fb75

Please sign in to comment.