Skip to content

Commit

Permalink
store/copr: set upper limit for extra concurrency (#41135) (#41175)
Browse files Browse the repository at this point in the history
close #41134
  • Loading branch information
ti-chi-bot authored Feb 11, 2023
1 parent 516dc21 commit c6b632a
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 7 deletions.
16 changes: 12 additions & 4 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
copNextMaxBackoff = 20000
CopSmallTaskRow = 32 // 32 is the initial batch size of TiKV
smallTaskSigma = 0.5
smallConcPerCore = 20
)

// CopClient is coprocessor client.
Expand Down Expand Up @@ -196,7 +197,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
}
if req.FixedRowCountHint != nil {
var smallTasks int
smallTasks, it.smallTaskConcurrency = smallTaskConcurrency(tasks)
smallTasks, it.smallTaskConcurrency = smallTaskConcurrency(tasks, c.store.numcpu)
if len(tasks)-smallTasks < it.concurrency {
it.concurrency = len(tasks) - smallTasks
}
Expand Down Expand Up @@ -462,7 +463,7 @@ func isSmallTask(task *copTask) bool {

// smallTaskConcurrency counts the small tasks of tasks,
// then returns the task count and extra concurrency for small tasks.
func smallTaskConcurrency(tasks []*copTask) (int, int) {
func smallTaskConcurrency(tasks []*copTask, numcpu int) (int, int) {
res := 0
for _, task := range tasks {
if isSmallTask(task) {
Expand All @@ -474,8 +475,15 @@ func smallTaskConcurrency(tasks []*copTask) (int, int) {
}
// Calculate the extra concurrency for small tasks
// extra concurrency = tasks / (1 + sigma * sqrt(log(tasks ^ 2)))
extraConc := float64(res) / (1 + smallTaskSigma*math.Sqrt(2*math.Log(float64(res))))
return res, int(extraConc)
extraConc := int(float64(res) / (1 + smallTaskSigma*math.Sqrt(2*math.Log(float64(res)))))
if numcpu <= 0 {
numcpu = 1
}
smallTaskConcurrencyLimit := smallConcPerCore * numcpu
if extraConc > smallTaskConcurrencyLimit {
extraConc = smallTaskConcurrencyLimit
}
return res, extraConc
}

type copIterator struct {
Expand Down
23 changes: 20 additions & 3 deletions store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ func TestBasicSmallTaskConc(t *testing.T) {
require.True(t, isSmallTask(&copTask{RowCountHint: 6}))
require.True(t, isSmallTask(&copTask{RowCountHint: CopSmallTaskRow}))
require.False(t, isSmallTask(&copTask{RowCountHint: CopSmallTaskRow + 1}))
_, conc := smallTaskConcurrency([]*copTask{})
_, conc := smallTaskConcurrency([]*copTask{}, 16)
require.GreaterOrEqual(t, conc, 0)
}

Expand Down Expand Up @@ -722,7 +722,7 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) {
require.Equal(t, tasks[2].RowCountHint, 3)
// task[3] ["t"-"x", "y"-"z"]
require.Equal(t, tasks[3].RowCountHint, 3+CopSmallTaskRow)
_, conc := smallTaskConcurrency(tasks)
_, conc := smallTaskConcurrency(tasks, 16)
require.Equal(t, conc, 1)

req.FixedRowCountHint = []int{1, 1, 3, 3}
Expand All @@ -737,7 +737,7 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) {
require.Equal(t, tasks[2].RowCountHint, 3)
// task[3] ["t"-"x", "y"-"z"]
require.Equal(t, tasks[3].RowCountHint, 6)
_, conc = smallTaskConcurrency(tasks)
_, conc = smallTaskConcurrency(tasks, 16)
require.Equal(t, conc, 2)

// cross-region long range
Expand All @@ -754,3 +754,20 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) {
// task[3] ["t"-"z"]
require.Equal(t, tasks[3].RowCountHint, 10)
}

func TestSmallTaskConcurrencyLimit(t *testing.T) {
smallTaskCount := 1000
tasks := make([]*copTask, 0, smallTaskCount)
for i := 0; i < smallTaskCount; i++ {
tasks = append(tasks, &copTask{
RowCountHint: 1,
})
}
count, conc := smallTaskConcurrency(tasks, 1)
require.Equal(t, smallConcPerCore, conc)
require.Equal(t, smallTaskCount, count)
// also handle 0 value.
count, conc = smallTaskConcurrency(tasks, 0)
require.Equal(t, smallConcPerCore, conc)
require.Equal(t, smallTaskCount, count)
}
3 changes: 3 additions & 0 deletions store/copr/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package copr
import (
"context"
"math/rand"
"runtime"
"sync/atomic"
"time"

Expand Down Expand Up @@ -75,6 +76,7 @@ type Store struct {
*kvStore
coprCache *coprCache
replicaReadSeed uint32
numcpu int
}

// NewStore creates a new store instance.
Expand All @@ -89,6 +91,7 @@ func NewStore(s *tikv.KVStore, coprCacheConfig *config.CoprocessorCache) (*Store
kvStore: &kvStore{store: s},
coprCache: coprCache,
replicaReadSeed: rand.Uint32(),
numcpu: runtime.GOMAXPROCS(0),
}, nil
}

Expand Down

0 comments on commit c6b632a

Please sign in to comment.