Skip to content

Commit

Permalink
ttl: add table mysql.ttl_job_history to store ttl job histories (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored Jan 29, 2023
1 parent 26ca040 commit 4686338
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 29 deletions.
2 changes: 1 addition & 1 deletion executor/infoschema_cluster_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestTableStorageStats(t *testing.T) {
"test 2",
))
rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()
result := 45
result := 46
require.Len(t, rows, result)

// More tests about the privileges.
Expand Down
26 changes: 25 additions & 1 deletion session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,27 @@ const (
created_time timestamp NOT NULL,
primary key(job_id, scan_id),
key(created_time));`

// CreateTTLJobHistory is a table that stores ttl job's history
CreateTTLJobHistory = `CREATE TABLE IF NOT EXISTS mysql.tidb_ttl_job_history (
job_id varchar(64) PRIMARY KEY,
table_id bigint(64) NOT NULL,
parent_table_id bigint(64) NOT NULL,
table_schema varchar(64) NOT NULL,
table_name varchar(64) NOT NULL,
partition_name varchar(64) DEFAULT NULL,
create_time timestamp NOT NULL,
finish_time timestamp NOT NULL,
ttl_expire timestamp NOT NULL,
summary_text text,
expired_rows bigint(64) DEFAULT NULL,
deleted_rows bigint(64) DEFAULT NULL,
error_delete_rows bigint(64) DEFAULT NULL,
status varchar(64) NOT NULL,
key(table_schema, table_name, create_time),
key(parent_table_id, create_time),
key(create_time)
);`
)

// bootstrap initiates system DB for a store.
Expand Down Expand Up @@ -757,7 +778,7 @@ const (
version109 = 109
// version110 sets tidb_enable_gc_aware_memory_track to off when a cluster upgrades from some version lower than v6.5.0.
version110 = 110
// version111 adds the table tidb_ttl_task
// version111 adds the table tidb_ttl_task and tidb_ttl_job_history
version111 = 111
)

Expand Down Expand Up @@ -2239,6 +2260,7 @@ func upgradeToVer111(s Session, ver int64) {
return
}
doReentrantDDL(s, CreateTTLTask)
doReentrantDDL(s, CreateTTLJobHistory)
}

func writeOOMAction(s Session) {
Expand Down Expand Up @@ -2349,6 +2371,8 @@ func doDDLWorks(s Session) {
mustExecute(s, CreateTTLTableStatus)
// Create tidb_ttl_task table
mustExecute(s, CreateTTLTask)
// Create tidb_ttl_job_history table
mustExecute(s, CreateTTLJobHistory)
}

// doBootstrapSQLFile executes SQL commands in a file as the last stage of bootstrap.
Expand Down
10 changes: 6 additions & 4 deletions ttl/cache/ttlstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ const (
// JobStatusWaiting means the job hasn't started
JobStatusWaiting JobStatus = "waiting"
// JobStatusRunning means this job is running
JobStatusRunning = "running"
JobStatusRunning JobStatus = "running"
// JobStatusCancelling means this job is being canceled, but not canceled yet
JobStatusCancelling = "cancelling"
JobStatusCancelling JobStatus = "cancelling"
// JobStatusCancelled means this job has been canceled successfully
JobStatusCancelled = "cancelled"
JobStatusCancelled JobStatus = "cancelled"
// JobStatusTimeout means this job has timeout
JobStatusTimeout = "timeout"
JobStatusTimeout JobStatus = "timeout"
// JobStatusFinished means job has been finished
JobStatusFinished JobStatus = "finished"
)

const selectFromTTLTableStatus = "SELECT LOW_PRIORITY table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status"
Expand Down
61 changes: 58 additions & 3 deletions ttl/ttlworker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,25 @@ const finishJobTemplate = `UPDATE mysql.tidb_ttl_table_status
current_job_status_update_time = NULL
WHERE table_id = %? AND current_job_id = %?`
const removeTaskForJobTemplate = "DELETE FROM mysql.tidb_ttl_task WHERE job_id = %?"
const addJobHistoryTemplate = `INSERT INTO
mysql.tidb_ttl_job_history (
job_id,
table_id,
parent_table_id,
table_schema,
table_name,
partition_name,
create_time,
finish_time,
ttl_expire,
summary_text,
expired_rows,
deleted_rows,
error_delete_rows,
status
)
VALUES
(%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)`

func updateJobCurrentStatusSQL(tableID int64, oldStatus cache.JobStatus, newStatus cache.JobStatus, jobID string) (string, []interface{}) {
return updateJobCurrentStatusTemplate, []interface{}{string(newStatus), tableID, string(oldStatus), jobID}
Expand All @@ -56,11 +75,41 @@ func removeTaskForJob(jobID string) (string, []interface{}) {
return removeTaskForJobTemplate, []interface{}{jobID}
}

func addJobHistorySQL(job *ttlJob, finishTime time.Time, summary *TTLSummary) (string, []interface{}) {
status := cache.JobStatusFinished
if job.status == cache.JobStatusTimeout || job.status == cache.JobStatusCancelled {
status = job.status
}

var partitionName interface{}
if job.tbl.Partition.O != "" {
partitionName = job.tbl.Partition.O
}

return addJobHistoryTemplate, []interface{}{
job.id,
job.tbl.ID,
job.tbl.TableInfo.ID,
job.tbl.Schema.O,
job.tbl.Name.O,
partitionName,
job.createTime.Format(timeFormat),
finishTime.Format(timeFormat),
job.ttlExpireTime.Format(timeFormat),
summary.SummaryText,
summary.TotalRows,
summary.SuccessRows,
summary.ErrorRows,
string(status),
}
}

type ttlJob struct {
id string
ownerID string

createTime time.Time
createTime time.Time
ttlExpireTime time.Time

tbl *cache.PhysicalTable

Expand All @@ -71,11 +120,11 @@ type ttlJob struct {
}

// finish turns current job into last job, and update the error message and statistics summary
func (job *ttlJob) finish(se session.Session, now time.Time, summary string) {
func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary) {
// at this time, the job.ctx may have been canceled (to cancel this job)
// even when it's canceled, we'll need to update the states, so use another context
err := se.RunInTxn(context.TODO(), func() error {
sql, args := finishJobSQL(job.tbl.ID, now, summary, job.id)
sql, args := finishJobSQL(job.tbl.ID, now, summary.SummaryText, job.id)
_, err := se.ExecuteSQL(context.TODO(), sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
Expand All @@ -87,6 +136,12 @@ func (job *ttlJob) finish(se session.Session, now time.Time, summary string) {
return errors.Wrapf(err, "execute sql: %s", sql)
}

sql, args = addJobHistorySQL(job, now, summary)
_, err = se.ExecuteSQL(context.TODO(), sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}

return nil
}, session.TxnModeOptimistic)

Expand Down
29 changes: 17 additions & 12 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return nil, err
}

job := m.createNewJob(now, table)
job := m.createNewJob(expireTime, now, table)

// job is created, notify every scan managers to fetch new tasks
err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, job.id)
Expand All @@ -639,14 +639,15 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
return job, nil
}

func (m *JobManager) createNewJob(now time.Time, table *cache.PhysicalTable) *ttlJob {
func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *cache.PhysicalTable) *ttlJob {
id := m.tableStatusCache.Tables[table.ID].CurrentJobID

return &ttlJob{
id: id,
ownerID: m.id,

createTime: now,
createTime: now,
ttlExpireTime: expireTime,
// at least, the info schema cache and table status cache are consistent in table id, so it's safe to get table
// information from schema cache directly
tbl: table,
Expand Down Expand Up @@ -717,7 +718,8 @@ func (m *JobManager) GetNotificationCli() client.NotificationClient {
return m.notificationCli
}

type ttlSummary struct {
// TTLSummary is the summary for TTL job
type TTLSummary struct {
TotalRows uint64 `json:"total_rows"`
SuccessRows uint64 `json:"success_rows"`
ErrorRows uint64 `json:"error_rows"`
Expand All @@ -727,22 +729,24 @@ type ttlSummary struct {
FinishedScanTask int `json:"finished_scan_task"`

ScanTaskErr string `json:"scan_task_err,omitempty"`
SummaryText string `json:"-"`
}

func summarizeErr(err error) (string, error) {
summary := &ttlSummary{
func summarizeErr(err error) (*TTLSummary, error) {
summary := &TTLSummary{
ScanTaskErr: err.Error(),
}

buf, err := json.Marshal(summary)
if err != nil {
return "", err
return nil, err
}
return string(buf), nil
summary.SummaryText = string(buf)
return summary, nil
}

func summarizeTaskResult(tasks []*cache.TTLTask) (string, error) {
summary := &ttlSummary{}
func summarizeTaskResult(tasks []*cache.TTLTask) (*TTLSummary, error) {
summary := &TTLSummary{}
var allErr error
for _, t := range tasks {
if t.State != nil {
Expand All @@ -768,7 +772,8 @@ func summarizeTaskResult(tasks []*cache.TTLTask) (string, error) {

buf, err := json.Marshal(summary)
if err != nil {
return "", err
return nil, err
}
return string(buf), nil
summary.SummaryText = string(buf)
return summary, nil
}
37 changes: 30 additions & 7 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package ttlworker_test

import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -69,7 +71,7 @@ func TestParallelLockNewJob(t *testing.T) {
se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now(), false)
require.NoError(t, err)
job.Finish(se, time.Now(), "")
job.Finish(se, time.Now(), &ttlworker.TTLSummary{})

// lock one table in parallel, only one of them should lock successfully
testTimes := 100
Expand Down Expand Up @@ -103,32 +105,53 @@ func TestParallelLockNewJob(t *testing.T) {
wg.Wait()

require.Equal(t, uint64(1), successCounter.Load())
successJob.Finish(se, time.Now(), "")
successJob.Finish(se, time.Now(), &ttlworker.TTLSummary{})
}
}

func TestFinishJob(t *testing.T) {
timeFormat := "2006-01-02 15:04:05"
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)

testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}}
testTable := &cache.PhysicalTable{ID: 2, Schema: model.NewCIStr("db1"), TableInfo: &model.TableInfo{ID: 1, Name: model.NewCIStr("t1"), TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}}

tk.MustExec("insert into mysql.tidb_ttl_table_status(table_id) values (2)")

// finish with error
m := ttlworker.NewJobManager("test-id", nil, store, nil)
m.InfoSchemaCache().Tables[testTable.ID] = testTable
se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now(), false)
startTime := time.Now()
job, err := m.LockNewJob(context.Background(), se, testTable, startTime, false)
require.NoError(t, err)

expireTime, err := testTable.EvalExpireTime(context.Background(), se, startTime)
require.NoError(t, err)

summary := `{"total_rows":0,"scan_task_err":"\"'an error message contains both single and double quote'\""}`
job.Finish(se, time.Now(), summary)
tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows(`2 {"total_rows":0,"scan_task_err":"\"'an error message contains both single and double quote'\""}`))
summary := &ttlworker.TTLSummary{
ScanTaskErr: "\"'an error message contains both single and double quote'\"",
TotalRows: 128,
SuccessRows: 120,
ErrorRows: 8,
}
summaryBytes, err := json.Marshal(summary)
summary.SummaryText = string(summaryBytes)

require.NoError(t, err)
endTime := time.Now()
job.Finish(se, endTime, summary)
tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows("2 " + summary.SummaryText))
tk.MustQuery("select * from mysql.tidb_ttl_task").Check(testkit.Rows())
expectedRow := []string{
job.ID(), "2", "1", "db1", "t1", "<nil>",
startTime.Format(timeFormat), endTime.Format(timeFormat), expireTime.Format(timeFormat),
summary.SummaryText, "128", "120", "8", "finished",
}
tk.MustQuery("select * from mysql.tidb_ttl_job_history").Check(testkit.Rows(strings.Join(expectedRow, " ")))
}

func TestTTLAutoAnalyze(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, no
return m.updateHeartBeat(ctx, se, now)
}

func (j *ttlJob) Finish(se session.Session, now time.Time, summary string) {
func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) {
j.finish(se, now, summary)
}

Expand Down

0 comments on commit 4686338

Please sign in to comment.