Skip to content

Commit

Permalink
executor: add memTracker for UpdateExec (#14299)
Browse files Browse the repository at this point in the history
  • Loading branch information
XuHuaiyu authored Dec 31, 2019
1 parent a487748 commit e00887e
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 7 deletions.
9 changes: 9 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4449,6 +4449,15 @@ func (s *testSuite) TestOOMPanicAction(c *C) {
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")
_, err = tk.Exec("replace into t select a from t1 order by a desc;")
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")

tk.MustExec("set @@tidb_mem_quota_query=100000;")
tk.MustExec("truncate table t")
tk.MustExec("insert into t values(1),(2),(3)")
// set the memory to quota to make the SQL panic during UpdateExec instead
// of TableReader.
tk.MustExec("set @@tidb_mem_quota_query=244;")
_, err = tk.Exec("update t set a = 4")
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")
}

func setOOMAction(action string) {
Expand Down
2 changes: 1 addition & 1 deletion executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle int64, oldRow []
}

newData := e.row4Update[:len(oldRow)]
_, handleChanged, newHandle, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true)
_, handleChanged, newHandle, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker)
if err != nil {
return nil, false, 0, err
}
Expand Down
14 changes: 14 additions & 0 deletions executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,20 @@ func (s *testOOMSuite) TestDistSQLMemoryControl(c *C) {
tk.Se.GetSessionVars().MemQuotaIndexLookupReader = -1
}

func (s *testOOMSuite) TestMemTracker4UpdateExec(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t_MemTracker4UpdateExec (id int, a int, b int, index idx_a(`a`))")

log.SetLevel(zap.InfoLevel)
s.oom.tracker = ""
tk.MustExec("insert into t_MemTracker4UpdateExec values (1,1,1), (2,2,2), (3,3,3)")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaQuery = 244
tk.MustExec("update t_MemTracker4UpdateExec set a = 4")
c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase")
}

func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) {
//log.SetLevel(zap.FatalLevel)
tk := testkit.NewTestKit(c, s.store)
Expand Down
17 changes: 14 additions & 3 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/memory"
)

// UpdateExec represents a new update executor.
Expand All @@ -49,8 +50,8 @@ type UpdateExec struct {
tblColPosInfos plannercore.TblColPosInfoSlice
evalBuffer chunk.MutRow
allAssignmentsAreConstant bool

fetched bool
fetched bool
memTracker *memory.Tracker
}

func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema) ([]types.Datum, error) {
Expand Down Expand Up @@ -101,7 +102,7 @@ func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema) ([]typ
}

// Update row
changed, _, _, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false)
changed, _, _, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false, e.memTracker)
if err1 == nil {
e.updatedRowKeys[content.TblID][handle] = changed
continue
Expand Down Expand Up @@ -174,7 +175,9 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error {
if !e.allAssignmentsAreConstant {
composeFunc = e.composeNewRow
}
memUsageOfChk := int64(0)
for {
e.memTracker.Consume(-memUsageOfChk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return err
Expand All @@ -183,6 +186,9 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error {
if chk.NumRows() == 0 {
break
}
memUsageOfChk = chk.MemoryUsage()
e.memTracker.Consume(memUsageOfChk)
firstRowIdx := globalRowIdx
for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ {
chunkRow := chk.GetRow(rowIdx)
datumRow := chunkRow.GetDatumRow(fields)
Expand All @@ -194,6 +200,8 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error {
e.newRowsData = append(e.newRowsData, newRow)
globalRowIdx++
}
e.memTracker.Consume(types.EstimatedMemUsage(e.rows[firstRowIdx], globalRowIdx-firstRowIdx))
e.memTracker.Consume(types.EstimatedMemUsage(e.newRowsData[firstRowIdx], globalRowIdx-firstRowIdx))
chk = chunk.Renew(chk, e.maxChunkSize)
}
return nil
Expand Down Expand Up @@ -279,6 +287,9 @@ func (e *UpdateExec) Close() error {

// Open implements the Executor Open interface.
func (e *UpdateExec) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

return e.children[0].Open(ctx)
}

Expand Down
11 changes: 8 additions & 3 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
)

Expand All @@ -48,13 +49,18 @@ var (
// 3. newHandle (int64) : if handleChanged == true, the newHandle means the new handle after update.
// 4. err (error) : error in the update.
func updateRecord(ctx context.Context, sctx sessionctx.Context, h int64, oldData, newData []types.Datum, modified []bool, t table.Table,
onDup bool) (bool, bool, int64, error) {
onDup bool, memTracker *memory.Tracker) (bool, bool, int64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.updateRecord", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

txn, err := sctx.Txn(false)
if err != nil {
return false, false, 0, err
}
memUsageOfTxnState := txn.Size()
defer memTracker.Consume(int64(txn.Size() - memUsageOfTxnState))
sc := sctx.GetSessionVars().StmtCtx
changed, handleChanged := false, false
// onUpdateSpecified is for "UPDATE SET ts_field = old_value", the
Expand Down Expand Up @@ -150,7 +156,6 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h int64, oldData
}

// 5. If handle changed, remove the old then add the new record, otherwise update the record.
var err error
if handleChanged {
if sc.DupKeyAsWarning {
// For `UPDATE IGNORE`/`INSERT IGNORE ON DUPLICATE KEY UPDATE`
Expand Down

0 comments on commit e00887e

Please sign in to comment.