Skip to content

Commit

Permalink
ttl: add integration test with random fault session for TTL (#58110)
Browse files Browse the repository at this point in the history
close #58411
  • Loading branch information
YangKeao authored Dec 20, 2024
1 parent acba0cd commit d9749cd
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 25 deletions.
1 change: 1 addition & 0 deletions pkg/meta/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//pkg/planner/cascades/base",
"//pkg/util/intest",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_pd_client//http",
"@org_uber_go_atomic//:atomic",
],
Expand Down
5 changes: 5 additions & 0 deletions pkg/meta/model/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"
"unsafe"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/duration"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -1351,6 +1352,10 @@ func (t *TTLInfo) Clone() *TTLInfo {
// Didn't set TTL_JOB_INTERVAL during upgrade and bootstrap because setting default value here is much simpler
// and could avoid bugs blocking users from upgrading or bootstrapping the cluster.
func (t *TTLInfo) GetJobInterval() (time.Duration, error) {
failpoint.Inject("overwrite-ttl-job-interval", func(val failpoint.Value) (time.Duration, error) {
return time.Duration(val.(int)), nil
})

if len(t.JobInterval) == 0 {
// This only happens when the table is created from 6.5 in which the `tidb_job_interval` is not introduced yet.
// We use `OldDefaultTTLJobInterval` as the return value to ensure a consistent behavior for the
Expand Down
20 changes: 17 additions & 3 deletions pkg/ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ func getCheckJobInterval() time.Duration {
return jobManagerLoopTickerInterval
}

func getHeartbeatInterval() time.Duration {
failpoint.Inject("heartbeat-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return jobManagerLoopTickerInterval
}

func getJobManagerLoopSyncTimerInterval() time.Duration {
failpoint.Inject("sync-timer", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
Expand Down Expand Up @@ -86,11 +93,11 @@ func getTaskManagerLoopTickerInterval() time.Duration {
return taskManagerLoopTickerInterval
}

func getTaskManagerHeartBeatExpireInterval() time.Duration {
failpoint.Inject("task-manager-heartbeat-expire-interval", func(val failpoint.Value) time.Duration {
func getTaskManagerHeartBeatInterval() time.Duration {
failpoint.Inject("task-manager-heartbeat-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return 2 * ttlTaskHeartBeatTickerInterval
return ttlTaskHeartBeatTickerInterval
}

func getCheckJobTriggeredInterval() time.Duration {
Expand All @@ -100,6 +107,13 @@ func getCheckJobTriggeredInterval() time.Duration {
return 2 * time.Second
}

func getTTLGCInterval() time.Duration {
failpoint.Inject("gc-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return ttlGCInterval
}

func getScanSplitCnt(store kv.Storage) int {
tikvStore, ok := store.(tikv.Storage)
if !ok {
Expand Down
16 changes: 10 additions & 6 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const taskGCTemplate = `DELETE task FROM
const ttlJobHistoryGCTemplate = `DELETE FROM mysql.tidb_ttl_job_history WHERE create_time < CURDATE() - INTERVAL 90 DAY`
const ttlTableStatusGCWithoutIDTemplate = `DELETE FROM mysql.tidb_ttl_table_status WHERE (current_job_status IS NULL OR current_job_owner_hb_time < %?)`

const timeFormat = time.DateTime
var timeFormat = time.DateTime

func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, []any) {
return insertNewTableIntoStatusTemplate, []any{tableID, parentTableID}
Expand All @@ -86,7 +86,7 @@ func gcTTLTableStatusGCSQL(existIDs []int64, now time.Time) (string, []any) {
existIDStrs = append(existIDStrs, strconv.Itoa(int(id)))
}

hbExpireTime := now.Add(-jobManagerLoopTickerInterval * 2)
hbExpireTime := now.Add(-getHeartbeatInterval() * 2)
args := []any{hbExpireTime.Format(timeFormat)}
if len(existIDStrs) > 0 {
return ttlTableStatusGCWithoutIDTemplate + fmt.Sprintf(` AND table_id NOT IN (%s)`, strings.Join(existIDStrs, ",")), args
Expand Down Expand Up @@ -137,6 +137,10 @@ func NewJobManager(id string, sessPool util.SessionPool, store kv.Storage, etcdC

manager.init(manager.jobLoop)
manager.ctx = logutil.WithKeyValue(manager.ctx, "ttl-worker", "job-manager")
if intest.InTest {
// in test environment, in the same log there will be multiple ttl managers, so we need to distinguish them
manager.ctx = logutil.WithKeyValue(manager.ctx, "ttl-worker", id)
}

manager.infoSchemaCache = cache.NewInfoSchemaCache(getUpdateInfoSchemaCacheInterval())
manager.tableStatusCache = cache.NewTableStatusCache(getUpdateTTLTableStatusCacheInterval())
Expand Down Expand Up @@ -181,15 +185,15 @@ func (m *JobManager) jobLoop() error {
infoSchemaCacheUpdateTicker := time.Tick(m.infoSchemaCache.GetInterval())
tableStatusCacheUpdateTicker := time.Tick(m.tableStatusCache.GetInterval())
resizeWorkersTicker := time.Tick(getResizeWorkersInterval())
gcTicker := time.Tick(ttlGCInterval)
gcTicker := time.Tick(getTTLGCInterval())

scheduleJobTicker := time.Tick(getCheckJobInterval())
jobCheckTicker := time.Tick(getCheckJobInterval())
updateJobHeartBeatTicker := time.Tick(jobManagerLoopTickerInterval)
updateJobHeartBeatTicker := time.Tick(getHeartbeatInterval())
timerTicker := time.Tick(getJobManagerLoopSyncTimerInterval())

scheduleTaskTicker := time.Tick(getTaskManagerLoopTickerInterval())
updateTaskHeartBeatTicker := time.Tick(ttlTaskHeartBeatTickerInterval)
updateTaskHeartBeatTicker := time.Tick(getTaskManagerHeartBeatInterval())
taskCheckTicker := time.Tick(getTaskManagerLoopCheckTaskInterval())
checkScanTaskFinishedTicker := time.Tick(getTaskManagerLoopTickerInterval())

Expand Down Expand Up @@ -732,7 +736,7 @@ func (m *JobManager) couldLockJob(tableStatus *cache.TableStatus, table *cache.P
hbTime := tableStatus.CurrentJobOwnerHBTime
// jobManagerLoopTickerInterval is used to do heartbeat periodically.
// Use twice the time to detect the heartbeat timeout.
hbTimeout := jobManagerLoopTickerInterval * 2
hbTimeout := getHeartbeatInterval() * 2
if interval := getUpdateTTLTableStatusCacheInterval() * 2; interval > hbTimeout {
// tableStatus is get from the cache which may contain stale data.
// So if cache update interval > heartbeat interval, use the cache update interval instead.
Expand Down
177 changes: 177 additions & 0 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand/v2"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -1619,3 +1620,179 @@ func TestJobHeartBeatFailNotBlockOthers(t *testing.T) {
fmt.Sprintf("%d %s", testTable1.Meta().ID, now.Add(-2*time.Hour).In(tkTZ).Format(time.DateTime)),
fmt.Sprintf("%d %s", testTable2.Meta().ID, now.In(tkTZ).Format(time.DateTime))))
}

var _ fault = &faultWithProbability{}

type faultWithProbability struct {
percent float64
}

func (f *faultWithProbability) shouldFault(sql string) bool {
return rand.Float64() < f.percent
}

func newFaultWithProbability(percent float64) *faultWithProbability {
return &faultWithProbability{percent: percent}
}

func accelerateHeartBeat(t *testing.T, tk *testkit.TestKit) func() {
tk.MustExec("ALTER TABLE mysql.tidb_ttl_table_status MODIFY COLUMN current_job_owner_hb_time TIMESTAMP(6)")
tk.MustExec("ALTER TABLE mysql.tidb_ttl_task MODIFY COLUMN owner_hb_time TIMESTAMP(6)")
ttlworker.SetTimeFormat(time.DateTime + ".999999")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/heartbeat-interval", fmt.Sprintf("return(%d)", time.Millisecond*100)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/task-manager-heartbeat-interval", fmt.Sprintf("return(%d)", time.Millisecond*100)))
return func() {
tk.MustExec("ALTER TABLE mysql.tidb_ttl_table_status MODIFY COLUMN current_job_owner_hb_time TIMESTAMP")
tk.MustExec("ALTER TABLE mysql.tidb_ttl_task MODIFY COLUMN owner_hb_time TIMESTAMP")
ttlworker.SetTimeFormat(time.DateTime)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/heartbeat-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/task-manager-heartbeat-interval"))
}
}

func TestJobManagerWithFault(t *testing.T) {
// TODO: add a flag `-long` to enable this test
t.Skip("skip this test because it'll need to run for a long time")

defer boostJobScheduleForTest(t)()

store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)

tk := testkit.NewTestKit(t, store)
defer accelerateHeartBeat(t, tk)()
tk.MustExec("set @@global.tidb_ttl_running_tasks=32")

managerCount := 20
testDuration := 10 * time.Minute
faultPercent := 0.5

leader := atomic.NewString("")
isLeaderFactory := func(id string) func() bool {
return func() bool {
return leader.Load() == id
}
}

type managerWithPool struct {
m *ttlworker.JobManager
pool util.SessionPool
}
managers := make([]managerWithPool, 0, managerCount)
for i := 0; i < managerCount; i++ {
pool := wrapPoolForTest(dom.SysSessionPool())
faultPool := newFaultSessionPool(pool)

id := fmt.Sprintf("test-ttl-job-manager-%d", i)
m := ttlworker.NewJobManager(id, faultPool, store, nil, isLeaderFactory(id))
managers = append(managers, managerWithPool{
m: m,
pool: faultPool,
})

m.Start()
}

stopTestCh := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(1)

fault := newFaultWithFilter(func(sql string) bool {
// skip some local only sql, ref `getSession()` in `session.go`
if strings.HasPrefix(sql, "set tidb_") || strings.HasPrefix(sql, "set @@") ||
strings.ToUpper(sql) == "COMMIT" || strings.ToUpper(sql) == "ROLLBACK" {
return false
}

return true
}, newFaultWithProbability(faultPercent))
go func() {
defer wg.Done()

faultTicker := time.NewTicker(time.Second)
for {
select {
case <-stopTestCh:
// Recover all sessions
for _, m := range managers {
m.pool.(*faultSessionPool).setFault(nil)
}

return
case <-faultTicker.C:
// Recover all sessions
for _, m := range managers {
m.pool.(*faultSessionPool).setFault(nil)
}

faultCount := rand.Int() % managerCount
logutil.BgLogger().Info("inject fault", zap.Int("faultCount", faultCount))
rand.Shuffle(managerCount, func(i, j int) {
managers[i], managers[j] = managers[j], managers[i]
})
// the first non-faultt manager is the leader
leader.Store(managers[faultCount].m.ID())
logutil.BgLogger().Info("set leader", zap.String("leader", leader.Load()))
for i := 0; i < faultCount; i++ {
m := managers[i]
logutil.BgLogger().Info("inject fault", zap.String("id", m.m.ID()))
m.pool.(*faultSessionPool).setFault(fault)
}
}
}
}()

// run the workload goroutine
testStart := time.Now()
for time.Since(testStart) < testDuration {
// create a new table
tk.MustExec("use test")
tk.MustExec("DROP TABLE if exists t")
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR TTL_ENABLE='OFF'")
tbl, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)
logutil.BgLogger().Info("create table", zap.Int64("table_id", tbl.Meta().ID))

// insert some data
for i := 0; i < 5; i++ {
tk.MustExec(fmt.Sprintf("INSERT INTO t VALUES (%d, '%s')", i, time.Now().Add(-time.Hour*2).Format(time.DateTime)))
}
for i := 0; i < 5; i++ {
tk.MustExec(fmt.Sprintf("INSERT INTO t VALUES (%d, '%s')", i+5, time.Now().Format(time.DateTime)))
}

tk.MustExec("ALTER TABLE t TTL_ENABLE='ON'")

start := time.Now()
require.Eventually(t, func() bool {
rows := tk.MustQuery("SELECT COUNT(*) FROM t").Rows()
if len(rows) == 1 && rows[0][0].(string) == "5" {
return true
}

logutil.BgLogger().Info("get row count", zap.String("count", rows[0][0].(string)))
return false
}, time.Second*5, time.Millisecond*100)

require.Eventually(t, func() bool {
rows := tk.MustQuery("SELECT current_job_state FROM mysql.tidb_ttl_table_status").Rows()
if len(rows) == 1 && rows[0][0].(string) == "<nil>" {
return true
}

tableStatus := tk.MustQuery("SELECT * FROM mysql.tidb_ttl_table_status").String()
logutil.BgLogger().Info("get job state", zap.String("tidb_ttl_table_status", tableStatus))
return false
}, time.Second*5, time.Millisecond*100)

logutil.BgLogger().Info("finish workload", zap.Duration("duration", time.Since(start)))
}

logutil.BgLogger().Info("test finished")
stopTestCh <- struct{}{}
close(stopTestCh)

wg.Wait()
}
12 changes: 12 additions & 0 deletions pkg/ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ func (m *JobManager) ReportMetrics(se session.Session) {
m.reportMetrics(se)
}

// ID returns the id of JobManager
func (m *JobManager) ID() string {
return m.id
}

// CheckFinishedJob is an exported version of checkFinishedJob
func (m *JobManager) CheckFinishedJob(se session.Session) {
m.checkFinishedJob(se)
Expand Down Expand Up @@ -695,3 +700,10 @@ func TestSplitCnt(t *testing.T) {
}
}
}

// SetTimeFormat sets the time format used by the test.
// Some tests require a greater precision than the default time format. We don't change it globally to avoid potential compatibility issues.
// Therefore, the format for most tests are also not changed, to make sure the tests can represent the real-world scenarios.
func SetTimeFormat(format string) {
timeFormat = format
}
Loading

0 comments on commit d9749cd

Please sign in to comment.