Skip to content

Commit

Permalink
ttl: escape parameters for finish job sql (pingcap#40118)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKeao authored Dec 23, 2022
1 parent 014159d commit c61ef1d
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 58 deletions.
5 changes: 2 additions & 3 deletions ttl/cache/ttlstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package cache

import (
"context"
"fmt"
"time"

"github.com/pingcap/tidb/sessionctx"
Expand All @@ -43,8 +42,8 @@ const (
const selectFromTTLTableStatus = "SELECT LOW_PRIORITY table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status"

// SelectFromTTLTableStatusWithID returns an SQL statement to get the table status from table id
func SelectFromTTLTableStatusWithID(tableID int64) string {
return selectFromTTLTableStatus + fmt.Sprintf(" WHERE table_id = %d", tableID)
func SelectFromTTLTableStatusWithID(tableID int64) (string, []interface{}) {
return selectFromTTLTableStatus + " WHERE table_id = %?", []interface{}{tableID}
}

// TableStatus contains the corresponding information in the system table `mysql.tidb_ttl_table_status`
Expand Down
1 change: 1 addition & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_test(
flaky = True,
deps = [
"//infoschema",
"//kv",
"//parser/ast",
"//parser/model",
"//parser/mysql",
Expand Down
48 changes: 32 additions & 16 deletions ttl/ttlworker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package ttlworker
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

Expand All @@ -29,20 +28,34 @@ import (
"go.uber.org/zap"
)

const updateJobCurrentStatusTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_status = '%s' WHERE table_id = %d AND current_job_status = '%s' AND current_job_id = '%s'"
const finishJobTemplate = "UPDATE mysql.tidb_ttl_table_status SET last_job_id = current_job_id, last_job_start_time = current_job_start_time, last_job_finish_time = '%s', last_job_ttl_expire = current_job_ttl_expire, last_job_summary = '%s', current_job_id = NULL, current_job_owner_id = NULL, current_job_owner_hb_time = NULL, current_job_start_time = NULL, current_job_ttl_expire = NULL, current_job_state = NULL, current_job_status = NULL, current_job_status_update_time = NULL WHERE table_id = %d AND current_job_id = '%s'"
const updateJobStateTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_state = '%s' WHERE table_id = %d AND current_job_id = '%s' AND current_job_owner_id = '%s'"
const updateJobCurrentStatusTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_status = %? WHERE table_id = %? AND current_job_status = %? AND current_job_id = %?"
const finishJobTemplate = `UPDATE mysql.tidb_ttl_table_status
SET last_job_id = current_job_id,
last_job_start_time = current_job_start_time,
last_job_finish_time = %?,
last_job_ttl_expire = current_job_ttl_expire,
last_job_summary = %?,
current_job_id = NULL,
current_job_owner_id = NULL,
current_job_owner_hb_time = NULL,
current_job_start_time = NULL,
current_job_ttl_expire = NULL,
current_job_state = NULL,
current_job_status = NULL,
current_job_status_update_time = NULL
WHERE table_id = %? AND current_job_id = %?`
const updateJobStateTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_state = %? WHERE table_id = %? AND current_job_id = %? AND current_job_owner_id = %?"

func updateJobCurrentStatusSQL(tableID int64, oldStatus cache.JobStatus, newStatus cache.JobStatus, jobID string) string {
return fmt.Sprintf(updateJobCurrentStatusTemplate, newStatus, tableID, oldStatus, jobID)
func updateJobCurrentStatusSQL(tableID int64, oldStatus cache.JobStatus, newStatus cache.JobStatus, jobID string) (string, []interface{}) {
return updateJobCurrentStatusTemplate, []interface{}{newStatus, tableID, oldStatus, jobID}
}

func finishJobSQL(tableID int64, finishTime time.Time, summary string, jobID string) string {
return fmt.Sprintf(finishJobTemplate, finishTime.Format(timeFormat), summary, tableID, jobID)
func finishJobSQL(tableID int64, finishTime time.Time, summary string, jobID string) (string, []interface{}) {
return finishJobTemplate, []interface{}{finishTime.Format(timeFormat), summary, tableID, jobID}
}

func updateJobState(tableID int64, currentJobID string, currentJobState string, currentJobOwnerID string) string {
return fmt.Sprintf(updateJobStateTemplate, currentJobState, tableID, currentJobID, currentJobOwnerID)
func updateJobState(tableID int64, currentJobID string, currentJobState string, currentJobOwnerID string) (string, []interface{}) {
return updateJobStateTemplate, []interface{}{currentJobState, tableID, currentJobID, currentJobOwnerID}
}

type ttlJob struct {
Expand Down Expand Up @@ -76,9 +89,10 @@ func (job *ttlJob) changeStatus(ctx context.Context, se session.Session, status
job.status = status
job.statusMutex.Unlock()

_, err := se.ExecuteSQL(ctx, updateJobCurrentStatusSQL(job.tbl.ID, oldStatus, status, job.id))
sql, args := updateJobCurrentStatusSQL(job.tbl.ID, oldStatus, status, job.id)
_, err := se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Trace(err)
return errors.Wrapf(err, "execute sql: %s", sql)
}

return nil
Expand All @@ -89,9 +103,10 @@ func (job *ttlJob) updateState(ctx context.Context, se session.Session) error {
if err != nil {
logutil.Logger(job.ctx).Warn("fail to generate summary for ttl job", zap.Error(err))
}
_, err = se.ExecuteSQL(ctx, updateJobState(job.tbl.ID, job.id, summary, job.ownerID))
sql, args := updateJobState(job.tbl.ID, job.id, summary, job.ownerID)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Trace(err)
return errors.Wrapf(err, "execute sql: %s", sql)
}

return nil
Expand All @@ -115,9 +130,10 @@ func (job *ttlJob) finish(se session.Session, now time.Time) {
}
// at this time, the job.ctx may have been canceled (to cancel this job)
// even when it's canceled, we'll need to update the states, so use another context
_, err = se.ExecuteSQL(context.TODO(), finishJobSQL(job.tbl.ID, now, summary, job.id))
sql, args := finishJobSQL(job.tbl.ID, now, summary, job.id)
_, err = se.ExecuteSQL(context.TODO(), sql, args...)
if err != nil {
logutil.Logger(job.ctx).Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id))
logutil.Logger(job.ctx).Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id), zap.String("sql", sql), zap.Any("arguments", args))
}
}

Expand Down
53 changes: 32 additions & 21 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package ttlworker

import (
"context"
"fmt"
"time"

"github.com/pingcap/errors"
Expand All @@ -31,22 +30,30 @@ import (
"go.uber.org/zap"
)

const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%d, %d)"
const setTableStatusOwnerTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_id = UUID(), current_job_owner_id = '%s',current_job_start_time = '%s',current_job_status = 'waiting',current_job_status_update_time = '%s',current_job_ttl_expire = '%s',current_job_owner_hb_time = '%s' WHERE table_id = %d"
const updateHeartBeatTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_owner_hb_time = '%s' WHERE table_id = %d AND current_job_owner_id = '%s'"
const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%?, %?)"
const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status
SET current_job_id = UUID(),
current_job_owner_id = %?,
current_job_start_time = %?,
current_job_status = 'waiting',
current_job_status_update_time = %?,
current_job_ttl_expire = %?,
current_job_owner_hb_time = %?
WHERE table_id = %?`
const updateHeartBeatTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_owner_hb_time = %? WHERE table_id = %? AND current_job_owner_id = %?"

const timeFormat = "2006-01-02 15:04:05"

func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) string {
return fmt.Sprintf(insertNewTableIntoStatusTemplate, tableID, parentTableID)
func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, []interface{}) {
return insertNewTableIntoStatusTemplate, []interface{}{tableID, parentTableID}
}

func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) string {
return fmt.Sprintf(setTableStatusOwnerTemplate, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID)
func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) (string, []interface{}) {
return setTableStatusOwnerTemplate, []interface{}{id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID}
}

func updateHeartBeatSQL(tableID int64, now time.Time, id string) string {
return fmt.Sprintf(updateHeartBeatTemplate, now.Format(timeFormat), tableID, id)
func updateHeartBeatSQL(tableID int64, now time.Time, id string) (string, []interface{}) {
return updateHeartBeatTemplate, []interface{}{now.Format(timeFormat), tableID, id}
}

// JobManager schedules and manages the ttl jobs on this instance
Expand Down Expand Up @@ -503,19 +510,22 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
var expireTime time.Time

err := se.RunInTxn(ctx, func() error {
rows, err := se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID))
sql, args := cache.SelectFromTTLTableStatusWithID(table.ID)
rows, err := se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return err
return errors.Wrapf(err, "execute sql: %s", sql)
}
if len(rows) == 0 {
// cannot find the row, insert the status row
_, err = se.ExecuteSQL(ctx, insertNewTableIntoStatusSQL(table.ID, table.TableInfo.ID))
sql, args := insertNewTableIntoStatusSQL(table.ID, table.TableInfo.ID)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return err
return errors.Wrapf(err, "execute sql: %s", sql)
}
rows, err = se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID))
sql, args = cache.SelectFromTTLTableStatusWithID(table.ID)
rows, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return err
return errors.Wrapf(err, "execute sql: %s", sql)
}
if len(rows) == 0 {
return errors.New("table status row still doesn't exist after insertion")
Expand All @@ -534,9 +544,9 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return err
}

_, err = se.ExecuteSQL(ctx, setTableStatusOwnerSQL(table.ID, now, expireTime, m.id))

return err
sql, args = setTableStatusOwnerSQL(table.ID, now, expireTime, m.id)
_, err = se.ExecuteSQL(ctx, sql, args...)
return errors.Wrapf(err, "execute sql: %s", sql)
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -599,9 +609,10 @@ func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *ca
func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session) error {
now := se.Now()
for _, job := range m.localJobs() {
_, err := se.ExecuteSQL(ctx, updateHeartBeatSQL(job.tbl.ID, now, m.id))
sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id)
_, err := se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Trace(err)
return errors.Wrapf(err, "execute sql: %s", sql)
}
// also updates some internal state for this job
err = job.updateState(ctx, se)
Expand Down
35 changes: 31 additions & 4 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
dbsession "github.com/pingcap/tidb/session"
Expand All @@ -35,10 +37,8 @@ import (
"go.uber.org/zap"
)

func TestParallelLockNewJob(t *testing.T) {
store := testkit.CreateMockStore(t)

sessionFactory := func() session.Session {
func sessionFactory(t *testing.T, store kv.Storage) func() session.Session {
return func() session.Session {
dbSession, err := dbsession.CreateSession4Test(store)
require.NoError(t, err)
se := session.NewSession(dbSession, dbSession, nil)
Expand All @@ -50,6 +50,12 @@ func TestParallelLockNewJob(t *testing.T) {

return se
}
}

func TestParallelLockNewJob(t *testing.T) {
store := testkit.CreateMockStore(t)

sessionFactory := sessionFactory(t, store)

storedTTLJobRunInterval := variable.TTLJobRunInterval.Load()
variable.TTLJobRunInterval.Store(0)
Expand Down Expand Up @@ -96,3 +102,24 @@ func TestParallelLockNewJob(t *testing.T) {
successJob.Finish(se, time.Now())
}
}

func TestFinishJob(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)

testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}}

tk.MustExec("insert into mysql.tidb_ttl_table_status(table_id) values (2)")

// finish with error
m := ttlworker.NewJobManager("test-id", nil, store)
se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now())
require.NoError(t, err)
job.SetScanErr(errors.New(`"'an error message contains both single and double quote'"`))
job.Finish(se, time.Now())

tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows("2 {\"total_rows\":0,\"success_rows\":0,\"error_rows\":0,\"total_scan_task\":1,\"scheduled_scan_task\":0,\"finished_scan_task\":0,\"scan_task_err\":\"\\\"'an error message contains both single and double quote'\\\"\"}"))
}
Loading

0 comments on commit c61ef1d

Please sign in to comment.