Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add memTracker for UpdateExec #14299

Merged
merged 3 commits into from
Dec 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4449,6 +4449,15 @@ func (s *testSuite) TestOOMPanicAction(c *C) {
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")
_, err = tk.Exec("replace into t select a from t1 order by a desc;")
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")

tk.MustExec("set @@tidb_mem_quota_query=100000;")
tk.MustExec("truncate table t")
tk.MustExec("insert into t values(1),(2),(3)")
// set the memory to quota to make the SQL panic during UpdateExec instead
// of TableReader.
tk.MustExec("set @@tidb_mem_quota_query=244;")
_, err = tk.Exec("update t set a = 4")
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")
}

func setOOMAction(action string) {
Expand Down
2 changes: 1 addition & 1 deletion executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle int64, oldRow []
}

newData := e.row4Update[:len(oldRow)]
_, handleChanged, newHandle, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true)
_, handleChanged, newHandle, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker)
if err != nil {
return nil, false, 0, err
}
Expand Down
14 changes: 14 additions & 0 deletions executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,20 @@ func (s *testOOMSuite) TestDistSQLMemoryControl(c *C) {
tk.Se.GetSessionVars().MemQuotaIndexLookupReader = -1
}

func (s *testOOMSuite) TestMemTracker4UpdateExec(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t_MemTracker4UpdateExec (id int, a int, b int, index idx_a(`a`))")

log.SetLevel(zap.InfoLevel)
s.oom.tracker = ""
tk.MustExec("insert into t_MemTracker4UpdateExec values (1,1,1), (2,2,2), (3,3,3)")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaQuery = 244
tk.MustExec("update t_MemTracker4UpdateExec set a = 4")
c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase")
}

func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) {
//log.SetLevel(zap.FatalLevel)
tk := testkit.NewTestKit(c, s.store)
Expand Down
17 changes: 14 additions & 3 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/memory"
)

// UpdateExec represents a new update executor.
Expand All @@ -49,8 +50,8 @@ type UpdateExec struct {
tblColPosInfos plannercore.TblColPosInfoSlice
evalBuffer chunk.MutRow
allAssignmentsAreConstant bool

fetched bool
fetched bool
memTracker *memory.Tracker
}

func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema) ([]types.Datum, error) {
Expand Down Expand Up @@ -101,7 +102,7 @@ func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema) ([]typ
}

// Update row
changed, _, _, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false)
changed, _, _, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false, e.memTracker)
if err1 == nil {
e.updatedRowKeys[content.TblID][handle] = changed
continue
Expand Down Expand Up @@ -174,7 +175,9 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error {
if !e.allAssignmentsAreConstant {
composeFunc = e.composeNewRow
}
memUsageOfChk := int64(0)
for {
e.memTracker.Consume(-memUsageOfChk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return err
Expand All @@ -183,6 +186,9 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error {
if chk.NumRows() == 0 {
break
}
memUsageOfChk = chk.MemoryUsage()
e.memTracker.Consume(memUsageOfChk)
firstRowIdx := globalRowIdx
for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ {
chunkRow := chk.GetRow(rowIdx)
datumRow := chunkRow.GetDatumRow(fields)
Expand All @@ -194,6 +200,8 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error {
e.newRowsData = append(e.newRowsData, newRow)
globalRowIdx++
}
e.memTracker.Consume(types.EstimatedMemUsage(e.rows[firstRowIdx], globalRowIdx-firstRowIdx))
e.memTracker.Consume(types.EstimatedMemUsage(e.newRowsData[firstRowIdx], globalRowIdx-firstRowIdx))
chk = chunk.Renew(chk, e.maxChunkSize)
}
return nil
Expand Down Expand Up @@ -279,6 +287,9 @@ func (e *UpdateExec) Close() error {

// Open implements the Executor Open interface.
func (e *UpdateExec) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

return e.children[0].Open(ctx)
}

Expand Down
11 changes: 8 additions & 3 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
)

Expand All @@ -48,13 +49,18 @@ var (
// 3. newHandle (int64) : if handleChanged == true, the newHandle means the new handle after update.
// 4. err (error) : error in the update.
func updateRecord(ctx context.Context, sctx sessionctx.Context, h int64, oldData, newData []types.Datum, modified []bool, t table.Table,
onDup bool) (bool, bool, int64, error) {
onDup bool, memTracker *memory.Tracker) (bool, bool, int64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.updateRecord", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

txn, err := sctx.Txn(false)
if err != nil {
return false, false, 0, err
}
memUsageOfTxnState := txn.Size()
defer memTracker.Consume(int64(txn.Size() - memUsageOfTxnState))
sc := sctx.GetSessionVars().StmtCtx
changed, handleChanged := false, false
// onUpdateSpecified is for "UPDATE SET ts_field = old_value", the
Expand Down Expand Up @@ -150,7 +156,6 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h int64, oldData
}

// 5. If handle changed, remove the old then add the new record, otherwise update the record.
var err error
if handleChanged {
if sc.DupKeyAsWarning {
// For `UPDATE IGNORE`/`INSERT IGNORE ON DUPLICATE KEY UPDATE`
Expand Down