Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#45549
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
wshwsh12 authored and ti-chi-bot committed Aug 1, 2023
1 parent fe7bcf1 commit 5e5a619
Show file tree
Hide file tree
Showing 3 changed files with 275 additions and 1 deletion.
41 changes: 40 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ type baseExecutor struct {

const (
// globalPanicStorageExceed represents the panic message when out of storage quota.
globalPanicStorageExceed string = "Out Of Global Storage Quota!"
globalPanicStorageExceed string = "Out Of Quota For Local Temporary Space!"
// globalPanicMemoryExceed represents the panic message when out of memory limit.
globalPanicMemoryExceed string = "Out Of Global Memory Limit!"
// globalPanicAnalyzeMemoryExceed represents the panic message when out of analyze memory limit.
Expand Down Expand Up @@ -1798,7 +1798,30 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {

sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow

<<<<<<< HEAD
if _, ok := s.(*ast.AnalyzeTableStmt); ok {
=======
vars.MemTracker.Detach()
vars.MemTracker.UnbindActions()
vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery)
vars.MemTracker.ResetMaxConsumed()
vars.DiskTracker.Detach()
vars.DiskTracker.ResetMaxConsumed()
vars.MemTracker.SessionID.Store(vars.ConnectionID)
vars.StmtCtx.TableStats = make(map[int64]interface{})

isAnalyze := false
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars)
if err != nil {
return err
}
_, isAnalyze = prepareStmt.PreparedAst.Stmt.(*ast.AnalyzeTableStmt)
} else if _, ok := s.(*ast.AnalyzeTableStmt); ok {
isAnalyze = true
}
if isAnalyze {
>>>>>>> 838b3674752 (executor, util: make tmp-storage-quota take affect (#45549))
sc.InitMemTracker(memory.LabelForAnalyzeMemory, -1)
sc.MemTracker.AttachTo(GlobalAnalyzeMemoryTracker)
} else {
Expand All @@ -1819,9 +1842,25 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
case variable.OOMActionLog:
fallthrough
default:
<<<<<<< HEAD
action := &memory.LogOnExceed{ConnID: ctx.GetSessionVars().ConnectionID}
action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota)
sc.MemTracker.SetActionOnExceed(action)
=======
action := &memory.LogOnExceed{ConnID: vars.ConnectionID}
action.SetLogHook(logOnQueryExceedMemQuota)
vars.MemTracker.SetActionOnExceed(action)
}
sc.MemTracker.SessionID.Store(vars.ConnectionID)
sc.MemTracker.AttachTo(vars.MemTracker)
sc.InitDiskTracker(memory.LabelForSQLText, -1)
globalConfig := config.GetGlobalConfig()
if variable.EnableTmpStorageOnOOM.Load() && sc.DiskTracker != nil {
sc.DiskTracker.AttachTo(vars.DiskTracker)
if GlobalDiskUsageTracker != nil {
vars.DiskTracker.AttachTo(GlobalDiskUsageTracker)
}
>>>>>>> 838b3674752 (executor, util: make tmp-storage-quota take affect (#45549))
}
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars)
Expand Down
17 changes: 17 additions & 0 deletions util/chunk/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package chunk

import (
"errors"
"fmt"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -132,7 +133,23 @@ func (c *RowContainer) SpillToDisk() {
N := c.m.records.inMemory.NumChunks()
c.m.records.inDisk = NewListInDisk(c.m.records.inMemory.FieldTypes())
c.m.records.inDisk.diskTracker.AttachTo(c.diskTracker)
<<<<<<< HEAD
for i := 0; i < N; i++ {
=======
defer func() {
if r := recover(); r != nil {
err := fmt.Errorf("%v", r)
c.m.records.spillError = err
logutil.BgLogger().Error("spill to disk failed", zap.Stack("stack"), zap.Error(err))
}
}()
failpoint.Inject("spillToDiskOutOfDiskQuota", func(val failpoint.Value) {
if val.(bool) {
panic("out of disk quota when spilling")
}
})
for i := 0; i < n; i++ {
>>>>>>> 838b3674752 (executor, util: make tmp-storage-quota take affect (#45549))
chk := c.m.records.inMemory.GetChunk(i)
err = c.m.records.inDisk.Add(chk)
if err != nil {
Expand Down
218 changes: 218 additions & 0 deletions util/chunk/row_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,221 @@ func TestActionBlocked(t *testing.T) {
ac.Action(tracker)
require.GreaterOrEqual(t, time.Since(starttime), 200*time.Millisecond)
}
<<<<<<< HEAD
=======

func insertBytesRowsIntoRowContainer(t *testing.T, chkCount int, rowPerChk int) (*RowContainer, [][]byte) {
longVarCharTyp := types.NewFieldTypeBuilder().SetType(mysql.TypeVarchar).SetFlen(4096).Build()
fields := []*types.FieldType{&longVarCharTyp}

rc := NewRowContainer(fields, chkCount)

allRows := [][]byte{}
// insert chunks
for i := 0; i < chkCount; i++ {
chk := NewChunkWithCapacity(fields, rowPerChk)
// insert rows for each chunk
for j := 0; j < rowPerChk; j++ {
length := rand2.Uint32()
randomBytes := make([]byte, length%4096)
_, err := rand.Read(randomBytes)
require.NoError(t, err)

chk.AppendBytes(0, randomBytes)
allRows = append(allRows, randomBytes)
}
require.NoError(t, rc.Add(chk))
}

return rc, allRows
}

func TestRowContainerReaderInDisk(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
conf.TempStoragePath = t.TempDir()
})

rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 16)
rc.SpillToDisk()

reader := NewRowContainerReader(rc)
defer reader.Close()
for i := 0; i < 16; i++ {
for j := 0; j < 16; j++ {
row := reader.Current()
require.Equal(t, allRows[i*16+j], row.GetBytes(0))
reader.Next()
}
}
}

func TestCloseRowContainerReader(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
conf.TempStoragePath = t.TempDir()
})

rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 16)
rc.SpillToDisk()

// read 8.5 of these chunks
reader := NewRowContainerReader(rc)
defer reader.Close()
for i := 0; i < 8; i++ {
for j := 0; j < 16; j++ {
row := reader.Current()
require.Equal(t, allRows[i*16+j], row.GetBytes(0))
reader.Next()
}
}
for j := 0; j < 8; j++ {
row := reader.Current()
require.Equal(t, allRows[8*16+j], row.GetBytes(0))
reader.Next()
}
}

func TestConcurrentSpillWithRowContainerReader(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
conf.TempStoragePath = t.TempDir()
})

rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 1024)

var wg sync.WaitGroup
// concurrently read and spill to disk
wg.Add(1)
go func() {
defer wg.Done()
reader := NewRowContainerReader(rc)
defer reader.Close()

for i := 0; i < 16; i++ {
for j := 0; j < 1024; j++ {
row := reader.Current()
require.Equal(t, allRows[i*1024+j], row.GetBytes(0))
reader.Next()
}
}
}()
rc.SpillToDisk()
wg.Wait()
}

func TestReadAfterSpillWithRowContainerReader(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
conf.TempStoragePath = t.TempDir()
})

rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 1024)

reader := NewRowContainerReader(rc)
defer reader.Close()
for i := 0; i < 8; i++ {
for j := 0; j < 1024; j++ {
row := reader.Current()
require.Equal(t, allRows[i*1024+j], row.GetBytes(0))
reader.Next()
}
}
rc.SpillToDisk()
for i := 8; i < 16; i++ {
for j := 0; j < 1024; j++ {
row := reader.Current()
require.Equal(t, allRows[i*1024+j], row.GetBytes(0))
reader.Next()
}
}
}

func TestPanicWhenSpillToDisk(t *testing.T) {
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
sz := 20
chk := NewChunkWithCapacity(fields, sz)
for i := 0; i < sz; i++ {
chk.AppendInt64(0, int64(i))
}

rc := NewRowContainer(fields, sz)
tracker := rc.GetMemTracker()
tracker.SetBytesLimit(chk.MemoryUsage() + 1)
tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest())
require.False(t, rc.AlreadySpilledSafeForTest())

require.NoError(t, rc.Add(chk))
rc.actionSpill.WaitForTest()
require.False(t, rc.AlreadySpilledSafeForTest())

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota"))
}()
require.NoError(t, rc.Add(chk))
rc.actionSpill.WaitForTest()
require.True(t, rc.AlreadySpilledSafeForTest())

_, err := rc.GetRow(RowPtr{})
require.EqualError(t, err, "out of disk quota when spilling")
require.EqualError(t, rc.Add(chk), "out of disk quota when spilling")
}

func BenchmarkRowContainerReaderInDiskWithRowSize512(b *testing.B) {
benchmarkRowContainerReaderInDiskWithRowLength(b, 512)
}

func BenchmarkRowContainerReaderInDiskWithRowSize1024(b *testing.B) {
benchmarkRowContainerReaderInDiskWithRowLength(b, 1024)
}

func BenchmarkRowContainerReaderInDiskWithRowSize4096(b *testing.B) {
benchmarkRowContainerReaderInDiskWithRowLength(b, 4096)
}

func benchmarkRowContainerReaderInDiskWithRowLength(b *testing.B, rowLength int) {
b.StopTimer()

restore := config.RestoreFunc()
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
conf.TempStoragePath = b.TempDir()
})

longVarCharTyp := types.NewFieldTypeBuilder().SetType(mysql.TypeVarchar).SetFlen(rowLength).Build()
fields := []*types.FieldType{&longVarCharTyp}

randomBytes := make([]byte, rowLength)
_, err := rand.Read(randomBytes)
require.NoError(b, err)

// create a row container which stores the data in disk
rc := NewRowContainer(fields, 1<<10)
rc.SpillToDisk()

// insert `b.N * 1<<10` rows (`b.N` chunks) into the rc
for i := 0; i < b.N; i++ {
chk := NewChunkWithCapacity(fields, 1<<10)
for j := 0; j < 1<<10; j++ {
chk.AppendBytes(0, randomBytes)
}

rc.Add(chk)
}

reader := NewRowContainerReader(rc)
defer reader.Close()
b.StartTimer()
for n := 0; n < b.N; n++ {
for i := 0; i < 1<<10; i++ {
reader.Next()
}
}
require.NoError(b, reader.Error())
}
>>>>>>> 838b3674752 (executor, util: make tmp-storage-quota take affect (#45549))

0 comments on commit 5e5a619

Please sign in to comment.