Skip to content

Commit

Permalink
executor: track the memroy usage in HashJoin probe phase (pingcap#41081)
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored and blacktear23 committed Feb 15, 2023
1 parent aa1f34a commit 1a0c2aa
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 2 deletions.
39 changes: 37 additions & 2 deletions executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ type hashRowContainer struct {
memTracker *memory.Tracker

// chkBuf buffer the data reads from the disk if rowContainer is spilled.
chkBuf *chunk.Chunk
chkBuf *chunk.Chunk
chkBufSizeForOneProbe int64
}

func newHashRowContainer(sCtx sessionctx.Context, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer {
Expand Down Expand Up @@ -213,6 +214,15 @@ func (c *hashRowContainer) GetAllMatchedRows(probeHCtx *hashContext, probeSideRo
return matched, nil
}

// signalCheckpointForJoin indicates the times of row probe that a signal detection will be triggered.
const signalCheckpointForJoin int = 1 << 14

// rowSize is the size of Row.
const rowSize = int64(unsafe.Sizeof(chunk.Row{}))

// rowPtrSize is the size of RowPtr.
const rowPtrSize = int64(unsafe.Sizeof(chunk.RowPtr{}))

// GetMatchedRowsAndPtrs get matched rows and Ptrs from probeRow. It can be called
// in multiple goroutines while each goroutine should keep its own
// h and buf.
Expand All @@ -225,7 +235,19 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
matched = matched[:0]
var matchedRow chunk.Row
matchedPtrs = matchedPtrs[:0]
for _, ptr := range innerPtrs {

// Some variables used for memTracker.
var (
matchedDataSize = int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize
lastChunkBufPointer *chunk.Chunk = nil
memDelta int64 = 0
)
c.chkBuf = nil
c.memTracker.Consume(-c.chkBufSizeForOneProbe + int64(cap(innerPtrs))*rowPtrSize)
defer c.memTracker.Consume(-int64(cap(innerPtrs))*rowPtrSize + memDelta)
c.chkBufSizeForOneProbe = 0

for i, ptr := range innerPtrs {
matchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunk(ptr, c.chkBuf)
if err != nil {
return nil, nil, err
Expand All @@ -235,6 +257,19 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
if err != nil {
return nil, nil, err
}
if c.chkBuf != lastChunkBufPointer && lastChunkBufPointer != nil {
lastChunkSize := lastChunkBufPointer.MemoryUsage()
c.chkBufSizeForOneProbe += lastChunkSize
memDelta += lastChunkSize
}
lastChunkBufPointer = c.chkBuf
if i&signalCheckpointForJoin == 0 {
// Trigger Consume for checking the OOM Action signal
memDelta += int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize - matchedDataSize
matchedDataSize = int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize
c.memTracker.Consume(memDelta + 1)
memDelta = 0
}
if !ok {
atomic.AddInt64(&c.stat.probeCollision, 1)
continue
Expand Down
17 changes: 17 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2892,3 +2892,20 @@ func TestOuterJoin(t *testing.T) {
),
)
}

func TestCartesianJoinPanic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t values(1)")
tk.MustExec("set tidb_mem_quota_query = 1 << 30")
tk.MustExec("set global tidb_mem_oom_action = 'CANCEL'")
tk.MustExec("set global tidb_enable_tmp_storage_on_oom = off;")
for i := 0; i < 14; i++ {
tk.MustExec("insert into t select * from t")
}
err := tk.QueryToErr("desc analyze select * from t t1, t t2, t t3, t t4, t t5, t t6;")
require.NotNil(t, err)
require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!"))
}

0 comments on commit 1a0c2aa

Please sign in to comment.