Skip to content

Commit

Permalink
domain: use enhanced waitGroupWrapper in domain #40606 (#40626)
Browse files Browse the repository at this point in the history
ref #40330
  • Loading branch information
Yisaer authored Jan 17, 2023
1 parent 4bb51e7 commit f7d35ab
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 48 deletions.
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ type Config struct {
TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"`
// TiDBMaxReuseColumn indicates max cached column num
TiDBMaxReuseColumn uint32 `toml:"tidb-max-reuse-column" json:"tidb-max-reuse-column"`
// TiDBEnableExitCheck indicates whether exit-checking in domain for background process
TiDBEnableExitCheck bool `toml:"tidb-enable-exit-check" json:"tidb-enable-exit-check"`
}

// UpdateTempStoragePath is to update the `TempStoragePath` if port/statusPort was changed
Expand Down Expand Up @@ -1000,6 +1002,7 @@ var defaultConf = Config{
DisaggregatedTiFlash: false,
TiDBMaxReuseChunk: 64,
TiDBMaxReuseColumn: 256,
TiDBEnableExitCheck: false,
}

var (
Expand Down
39 changes: 20 additions & 19 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,17 @@ type Domain struct {
expensiveQueryHandle *expensivequery.Handle
memoryUsageAlarmHandle *memoryusagealarm.Handle
serverMemoryLimitHandle *servermemorylimit.Handle
wg util.WaitGroupWrapper
statsUpdating atomicutil.Int32
cancel context.CancelFunc
indexUsageSyncLease time.Duration
dumpFileGcChecker *dumpFileGcChecker
planReplayerHandle *planReplayerHandle
expiredTimeStamp4PC types.Time
logBackupAdvancer *daemon.OwnerDaemon
historicalStatsWorker *HistoricalStatsWorker
ttlJobManager *ttlworker.JobManager
// TODO: use Run for each process in future pr
wg *util.WaitGroupEnhancedWrapper
statsUpdating atomicutil.Int32
cancel context.CancelFunc
indexUsageSyncLease time.Duration
dumpFileGcChecker *dumpFileGcChecker
planReplayerHandle *planReplayerHandle
expiredTimeStamp4PC types.Time
logBackupAdvancer *daemon.OwnerDaemon
historicalStatsWorker *HistoricalStatsWorker
ttlJobManager *ttlworker.JobManager

serverID uint64
serverIDSession *concurrency.Session
Expand Down Expand Up @@ -918,7 +919,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
jobsIdsMap: make(map[int64]string),
},
}

do.wg = util.NewWaitGroupEnhancedWrapper("domain", do.exit, config.GetGlobalConfig().TiDBEnableExitCheck)
do.SchemaValidator = NewSchemaValidator(ddlLease, do)
do.expensiveQueryHandle = expensivequery.NewExpensiveQueryHandle(do.exit)
do.memoryUsageAlarmHandle = memoryusagealarm.NewMemoryUsageAlarmHandle(do.exit)
Expand Down Expand Up @@ -1065,7 +1066,7 @@ func (do *Domain) Init(
// Local store needs to get the change information for every DDL state in each session.
go do.loadSchemaInLoop(ctx, ddlLease)
}
do.wg.Run(do.mdlCheckLoop)
do.wg.Run(do.mdlCheckLoop, "mdlCheckLoop")
do.wg.Add(3)
go do.topNSlowQueryLoop()
go do.infoSyncerKeeper()
Expand Down Expand Up @@ -1107,7 +1108,7 @@ func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error {
if err != nil {
return err
}
do.wg.Run(loop)
do.wg.Run(loop, "logBackupAdvancer")
return nil
}

Expand Down Expand Up @@ -1921,7 +1922,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
do.ddl.RegisterStatsHandle(statsHandle)
// Negative stats lease indicates that it is in test, it does not need update.
if do.statsLease >= 0 {
do.wg.Run(do.loadStatsWorker)
do.wg.Run(do.loadStatsWorker, "loadStatsWorker")
}
owner := do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey)
if do.indexUsageSyncLease > 0 {
Expand All @@ -1932,9 +1933,9 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
return nil
}
do.SetStatsUpdating(true)
do.wg.Run(func() { do.updateStatsWorker(ctx, owner) })
do.wg.Run(func() { do.autoAnalyzeWorker(owner) })
do.wg.Run(func() { do.gcAnalyzeHistory(owner) })
do.wg.Run(func() { do.updateStatsWorker(ctx, owner) }, "updateStatsWorker")
do.wg.Run(func() { do.autoAnalyzeWorker(owner) }, "autoAnalyzeWorker")
do.wg.Run(func() { do.gcAnalyzeHistory(owner) }, "gcAnalyzeHistory")
return nil
}

Expand All @@ -1944,7 +1945,7 @@ func (do *Domain) StartLoadStatsSubWorkers(ctxList []sessionctx.Context) {
for i, ctx := range ctxList {
statsHandle.StatsLoad.SubCtxs[i] = ctx
do.wg.Add(1)
go statsHandle.SubLoadWorker(ctx, do.exit, &do.wg)
go statsHandle.SubLoadWorker(ctx, do.exit, do.wg)
}
}

Expand Down Expand Up @@ -2529,7 +2530,7 @@ func (do *Domain) StartTTLJobManager() {
if err != nil {
logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err))
}
})
}, "ttlJobManager")
}

// TTLJobManager returns the ttl job manager on this domain
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ type StatsReaderContext struct {
}

// SubLoadWorker loads hist data for each column
func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupWrapper) {
func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) {
readerCtx := &StatsReaderContext{}
defer func() {
exitWg.Done()
Expand Down
39 changes: 21 additions & 18 deletions util/wait_group_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,50 +19,53 @@ import (
"time"

"github.com/pingcap/tidb/util/logutil"
"go.uber.org/atomic"
"go.uber.org/zap"
)

// WaitGroupEnhancedWrapper wrapper wg, it provides the basic ability of WaitGroupWrapper with checking unexited process
// if the `exited` signal is true by print them on log.
type WaitGroupEnhancedWrapper struct {
sync.WaitGroup
exited *atomic.Bool
source string
registerProcess sync.Map
}

// NewWaitGroupEnhancedWrapper returns WaitGroupEnhancedWrapper, the empty source indicates the unit test, then
// the `checkUnExitedProcess` won't be executed.
func NewWaitGroupEnhancedWrapper(source string, exited *atomic.Bool) *WaitGroupEnhancedWrapper {
func NewWaitGroupEnhancedWrapper(source string, exit chan struct{}, exitedCheck bool) *WaitGroupEnhancedWrapper {
wgew := &WaitGroupEnhancedWrapper{
exited: exited,
source: source,
registerProcess: sync.Map{},
}
if len(source) > 0 {
go wgew.checkUnExitedProcess()
if exitedCheck {
wgew.Add(1)
go wgew.checkUnExitedProcess(exit)
}
return wgew
}

func (w *WaitGroupEnhancedWrapper) checkUnExitedProcess() {
logutil.BgLogger().Info("waitGroupWrapper ready to check unexited process", zap.String("source", w.source))
ticker := time.NewTimer(10 * time.Second)
defer ticker.Stop()
for {
<-ticker.C
continueCheck := w.check()
if !continueCheck {
return
func (w *WaitGroupEnhancedWrapper) checkUnExitedProcess(exit chan struct{}) {
defer func() {
logutil.BgLogger().Info("waitGroupWrapper exit-checking exited", zap.String("source", w.source))
w.Done()
}()
logutil.BgLogger().Info("waitGroupWrapper enable exit-checking", zap.String("source", w.source))
<-exit
logutil.BgLogger().Info("waitGroupWrapper start exit-checking", zap.String("source", w.source))
if w.check() {
ticker := time.NewTimer(2 * time.Second)
defer ticker.Stop()
for {
<-ticker.C
continueCheck := w.check()
if !continueCheck {
return
}
}
}
}

func (w *WaitGroupEnhancedWrapper) check() bool {
if !w.exited.Load() {
return true
}
unexitedProcess := make([]string, 0)
w.registerProcess.Range(func(key, value any) bool {
unexitedProcess = append(unexitedProcess, key.(string))
Expand Down
15 changes: 5 additions & 10 deletions util/wait_group_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ func TestWaitGroupWrapperRun(t *testing.T) {
require.Equal(t, expect, val.Load())

val.Store(0)
exited := atomic.NewBool(false)
wg2 := NewWaitGroupEnhancedWrapper("", exited)
wg2 := NewWaitGroupEnhancedWrapper("", nil, false)
for i := int32(0); i < expect; i++ {
wg2.Run(func() {
val.Inc()
Expand All @@ -62,8 +61,7 @@ func TestWaitGroupWrapperRunWithRecover(t *testing.T) {
require.Equal(t, expect, val.Load())

val.Store(0)
exited := atomic.NewBool(false)
wg2 := NewWaitGroupEnhancedWrapper("", exited)
wg2 := NewWaitGroupEnhancedWrapper("", nil, false)
for i := int32(0); i < expect; i++ {
wg2.RunWithRecover(func() {
panic("test1")
Expand All @@ -76,18 +74,15 @@ func TestWaitGroupWrapperRunWithRecover(t *testing.T) {
}

func TestWaitGroupWrapperCheck(t *testing.T) {
exited := atomic.NewBool(false)
wg := NewWaitGroupEnhancedWrapper("", exited)
exit := make(chan struct{})
wg := NewWaitGroupEnhancedWrapper("", exit, false)
quit := make(chan struct{})
wg.Run(func() {
<-quit
}, "test")

// directly skip check if exited is false
require.True(t, wg.check())

// need continue check as existed unexited process
exited.Store(true)
close(exit)
require.True(t, wg.check())

// no need to continue check as all process exited
Expand Down

0 comments on commit f7d35ab

Please sign in to comment.