Skip to content

Commit

Permalink
executor: Fix crash during sort spill (#47581) (#47625)
Browse files Browse the repository at this point in the history
close #47538
  • Loading branch information
ti-chi-bot authored Oct 16, 2023
1 parent 359aea8 commit c14b794
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 68 deletions.
5 changes: 4 additions & 1 deletion executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,10 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
}
})
if e.rowChunks.NumRow() > 0 {
e.rowChunks.Sort()
err := e.rowChunks.Sort()
if err != nil {
return err
}
e.partitionList = append(e.partitionList, e.rowChunks)
}
return nil
Expand Down
147 changes: 80 additions & 67 deletions util/chunk/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func (m *mutexForRowContainer) RUnlock() {
m.rLock.RUnlock()
}

type spillHelper interface {
SpillToDisk()
hasEnoughDataToSpill(t *memory.Tracker) bool
}

// RowContainer provides a place for many rows, so many that we might want to spill them into disk.
// nolint:structcheck
type RowContainer struct {
Expand Down Expand Up @@ -121,6 +126,14 @@ func (c *RowContainer) ShallowCopyWithNewMutex() *RowContainer {

// SpillToDisk spills data to disk. This function may be called in parallel.
func (c *RowContainer) SpillToDisk() {
c.spillToDisk(nil)
}

func (*RowContainer) hasEnoughDataToSpill(_ *memory.Tracker) bool {
return true
}

func (c *RowContainer) spillToDisk(preSpillError error) {
c.m.Lock()
defer c.m.Unlock()
if c.alreadySpilled() {
Expand Down Expand Up @@ -153,6 +166,10 @@ func (c *RowContainer) SpillToDisk() {
panic("out of disk quota when spilling")
}
})
if preSpillError != nil {
c.m.records.spillError = preSpillError
return
}
for i := 0; i < n; i++ {
chk := c.m.records.inMemory.GetChunk(i)
err = c.m.records.inDisk.Add(chk)
Expand Down Expand Up @@ -313,8 +330,9 @@ func (c *RowContainer) Close() (err error) {
func (c *RowContainer) ActionSpill() *SpillDiskAction {
if c.actionSpill == nil {
c.actionSpill = &SpillDiskAction{
c: c,
cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}}
c: c,
baseSpillDiskAction: &baseSpillDiskAction{cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}},
}
}
return c.actionSpill
}
Expand All @@ -323,23 +341,21 @@ func (c *RowContainer) ActionSpill() *SpillDiskAction {
func (c *RowContainer) ActionSpillForTest() *SpillDiskAction {
c.actionSpill = &SpillDiskAction{
c: c,
testSyncInputFunc: func() {
c.actionSpill.testWg.Add(1)
baseSpillDiskAction: &baseSpillDiskAction{
testSyncInputFunc: func() {
c.actionSpill.testWg.Add(1)
},
testSyncOutputFunc: func() {
c.actionSpill.testWg.Done()
},
cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled},
},
testSyncOutputFunc: func() {
c.actionSpill.testWg.Done()
},
cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled},
}
return c.actionSpill
}

// SpillDiskAction implements memory.ActionOnExceed for chunk.List. If
// the memory quota of a query is exceeded, SpillDiskAction.Action is
// triggered.
type SpillDiskAction struct {
type baseSpillDiskAction struct {
memory.BaseOOMAction
c *RowContainer
m sync.Mutex
once sync.Once
cond spillStatusCond
Expand All @@ -350,6 +366,20 @@ type SpillDiskAction struct {
testWg sync.WaitGroup
}

// SpillDiskAction implements memory.ActionOnExceed for chunk.List. If
// the memory quota of a query is exceeded, SpillDiskAction.Action is
// triggered.
type SpillDiskAction struct {
c *RowContainer
*baseSpillDiskAction
}

// Action sends a signal to trigger spillToDisk method of RowContainer
// and if it is already triggered before, call its fallbackAction.
func (a *SpillDiskAction) Action(t *memory.Tracker) {
a.action(t, a.c)
}

type spillStatusCond struct {
*sync.Cond
// status indicates different stages for the Action
Expand All @@ -367,38 +397,35 @@ const (
spilledYet
)

func (a *SpillDiskAction) setStatus(status spillStatus) {
func (a *baseSpillDiskAction) setStatus(status spillStatus) {
a.cond.L.Lock()
defer a.cond.L.Unlock()
a.cond.status = status
}

func (a *SpillDiskAction) getStatus() spillStatus {
func (a *baseSpillDiskAction) getStatus() spillStatus {
a.cond.L.Lock()
defer a.cond.L.Unlock()
return a.cond.status
}

// Action sends a signal to trigger spillToDisk method of RowContainer
// and if it is already triggered before, call its fallbackAction.
func (a *SpillDiskAction) Action(t *memory.Tracker) {
func (a *baseSpillDiskAction) action(t *memory.Tracker, spillHelper spillHelper) {
a.m.Lock()
defer a.m.Unlock()

if a.getStatus() == notSpilled {
if a.getStatus() == notSpilled && spillHelper.hasEnoughDataToSpill(t) {
a.once.Do(func() {
logutil.BgLogger().Info("memory exceeds quota, spill to disk now.",
zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit()))
if a.testSyncInputFunc != nil {
a.testSyncInputFunc()
c := a.c
go func() {
c.SpillToDisk()
spillHelper.SpillToDisk()
a.testSyncOutputFunc()
}()
return
}
go a.c.SpillToDisk()
go spillHelper.SpillToDisk()
})
return
}
Expand All @@ -418,20 +445,20 @@ func (a *SpillDiskAction) Action(t *memory.Tracker) {
}

// Reset resets the status for SpillDiskAction.
func (a *SpillDiskAction) Reset() {
func (a *baseSpillDiskAction) Reset() {
a.m.Lock()
defer a.m.Unlock()
a.setStatus(notSpilled)
a.once = sync.Once{}
}

// GetPriority get the priority of the Action.
func (*SpillDiskAction) GetPriority() int64 {
func (*baseSpillDiskAction) GetPriority() int64 {
return memory.DefSpillPriority
}

// WaitForTest waits all goroutine have gone.
func (a *SpillDiskAction) WaitForTest() {
func (a *baseSpillDiskAction) WaitForTest() {
a.testWg.Wait()
}

Expand Down Expand Up @@ -522,9 +549,15 @@ func (c *SortedRowContainer) keyColumnsLess(i, j int) bool {
}

// Sort inits pointers and sorts the records.
func (c *SortedRowContainer) Sort() {
func (c *SortedRowContainer) Sort() (ret error) {
c.ptrM.Lock()
defer c.ptrM.Unlock()
ret = nil
defer func() {
if r := recover(); r != nil {
ret = fmt.Errorf("%v", r)
}
}()
if c.ptrM.rowPtrs != nil {
return
}
Expand All @@ -539,12 +572,24 @@ func (c *SortedRowContainer) Sort() {
c.ptrM.rowPtrs = append(c.ptrM.rowPtrs, RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)})
}
}
failpoint.Inject("errorDuringSortRowContainer", func(val failpoint.Value) {
if val.(bool) {
panic("sort meet error")
}
})
sort.Slice(c.ptrM.rowPtrs, c.keyColumnsLess)
return
}

func (c *SortedRowContainer) sortAndSpillToDisk() {
c.Sort()
c.RowContainer.SpillToDisk()
// SpillToDisk spills data to disk. This function may be called in parallel.
func (c *SortedRowContainer) SpillToDisk() {
err := c.Sort()
c.RowContainer.spillToDisk(err)
}

func (c *SortedRowContainer) hasEnoughDataToSpill(t *memory.Tracker) bool {
// Guarantee that each partition size is at least 10% of the threshold, to avoid opening too many files.
return c.GetMemTracker().BytesConsumed() > t.GetBytesLimit()/10
}

// Add appends a chunk into the SortedRowContainer.
Expand All @@ -571,8 +616,8 @@ func (c *SortedRowContainer) GetSortedRow(idx int) (Row, error) {
func (c *SortedRowContainer) ActionSpill() *SortAndSpillDiskAction {
if c.actionSpill == nil {
c.actionSpill = &SortAndSpillDiskAction{
c: c,
SpillDiskAction: c.RowContainer.ActionSpill(),
c: c,
baseSpillDiskAction: c.RowContainer.ActionSpill().baseSpillDiskAction,
}
}
return c.actionSpill
Expand All @@ -581,8 +626,8 @@ func (c *SortedRowContainer) ActionSpill() *SortAndSpillDiskAction {
// ActionSpillForTest returns a SortAndSpillDiskAction for sorting and spilling over to disk for test.
func (c *SortedRowContainer) ActionSpillForTest() *SortAndSpillDiskAction {
c.actionSpill = &SortAndSpillDiskAction{
c: c,
SpillDiskAction: c.RowContainer.ActionSpillForTest(),
c: c,
baseSpillDiskAction: c.RowContainer.ActionSpillForTest().baseSpillDiskAction,
}
return c.actionSpill
}
Expand All @@ -597,45 +642,13 @@ func (c *SortedRowContainer) GetMemTracker() *memory.Tracker {
// triggered.
type SortAndSpillDiskAction struct {
c *SortedRowContainer
*SpillDiskAction
*baseSpillDiskAction
}

// Action sends a signal to trigger sortAndSpillToDisk method of RowContainer
// and if it is already triggered before, call its fallbackAction.
func (a *SortAndSpillDiskAction) Action(t *memory.Tracker) {
a.m.Lock()
defer a.m.Unlock()
// Guarantee that each partition size is at least 10% of the threshold, to avoid opening too many files.
if a.getStatus() == notSpilled && a.c.GetMemTracker().BytesConsumed() > t.GetBytesLimit()/10 {
a.once.Do(func() {
logutil.BgLogger().Info("memory exceeds quota, spill to disk now.",
zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit()))
if a.testSyncInputFunc != nil {
a.testSyncInputFunc()
c := a.c
go func() {
c.sortAndSpillToDisk()
a.testSyncOutputFunc()
}()
return
}
go a.c.sortAndSpillToDisk()
})
return
}

a.cond.L.Lock()
for a.cond.status == spilling {
a.cond.Wait()
}
a.cond.L.Unlock()

if !t.CheckExceed() {
return
}
if fallback := a.GetFallback(); fallback != nil {
fallback.Action(t)
}
a.action(t, a.c)
}

// WaitForTest waits all goroutine have gone.
Expand Down
36 changes: 36 additions & 0 deletions util/chunk/row_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,42 @@ func TestPanicWhenSpillToDisk(t *testing.T) {
require.EqualError(t, rc.Add(chk), "out of disk quota when spilling")
}

func TestPanicDuringSortedRowContainerSpill(t *testing.T) {
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
byItemsDesc := []bool{false}
keyColumns := []int{0}
keyCmpFuncs := []CompareFunc{cmpInt64}
sz := 20
rc := NewSortedRowContainer(fields, sz, byItemsDesc, keyColumns, keyCmpFuncs)

chk := NewChunkWithCapacity(fields, sz)
for i := 0; i < sz; i++ {
chk.AppendInt64(0, int64(i))
}
var tracker *memory.Tracker
var err error
tracker = rc.GetMemTracker()
tracker.SetBytesLimit(chk.MemoryUsage() + int64(8*chk.NumRows()) + 1)
tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest())
require.False(t, rc.AlreadySpilledSafeForTest())
err = rc.Add(chk)
require.NoError(t, err)
rc.actionSpill.WaitForTest()
require.False(t, rc.AlreadySpilledSafeForTest())

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/errorDuringSortRowContainer", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/errorDuringSortRowContainer"))
}()
err = rc.Add(chk)
require.NoError(t, err)
rc.actionSpill.WaitForTest()
require.True(t, rc.AlreadySpilledSafeForTest())

_, err = rc.GetRow(RowPtr{})
require.EqualError(t, err, "sort meet error")
}

func BenchmarkRowContainerReaderInDiskWithRowSize512(b *testing.B) {
benchmarkRowContainerReaderInDiskWithRowLength(b, 512)
}
Expand Down

0 comments on commit c14b794

Please sign in to comment.