diff --git a/executor/infoschema_cluster_table_test.go b/executor/infoschema_cluster_table_test.go index be2f04cb5c6ac..b1a6d4c57f4f8 100644 --- a/executor/infoschema_cluster_table_test.go +++ b/executor/infoschema_cluster_table_test.go @@ -290,7 +290,7 @@ func TestTableStorageStats(t *testing.T) { "test 2", )) rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows() - result := 45 + result := 46 require.Len(t, rows, result) // More tests about the privileges. diff --git a/session/bootstrap.go b/session/bootstrap.go index 74eecb28a68a4..ed65bb0720cf0 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -516,6 +516,27 @@ const ( created_time timestamp NOT NULL, primary key(job_id, scan_id), key(created_time));` + + // CreateTTLJobHistory is a table that stores ttl job's history + CreateTTLJobHistory = `CREATE TABLE IF NOT EXISTS mysql.tidb_ttl_job_history ( + job_id varchar(64) PRIMARY KEY, + table_id bigint(64) NOT NULL, + parent_table_id bigint(64) NOT NULL, + table_schema varchar(64) NOT NULL, + table_name varchar(64) NOT NULL, + partition_name varchar(64) DEFAULT NULL, + create_time timestamp NOT NULL, + finish_time timestamp NOT NULL, + ttl_expire timestamp NOT NULL, + summary_text text, + expired_rows bigint(64) DEFAULT NULL, + deleted_rows bigint(64) DEFAULT NULL, + error_delete_rows bigint(64) DEFAULT NULL, + status varchar(64) NOT NULL, + key(table_schema, table_name, create_time), + key(parent_table_id, create_time), + key(create_time) + );` ) // bootstrap initiates system DB for a store. @@ -757,7 +778,7 @@ const ( version109 = 109 // version110 sets tidb_enable_gc_aware_memory_track to off when a cluster upgrades from some version lower than v6.5.0. version110 = 110 - // version111 adds the table tidb_ttl_task + // version111 adds the table tidb_ttl_task and tidb_ttl_job_history version111 = 111 ) @@ -2239,6 +2260,7 @@ func upgradeToVer111(s Session, ver int64) { return } doReentrantDDL(s, CreateTTLTask) + doReentrantDDL(s, CreateTTLJobHistory) } func writeOOMAction(s Session) { @@ -2349,6 +2371,8 @@ func doDDLWorks(s Session) { mustExecute(s, CreateTTLTableStatus) // Create tidb_ttl_task table mustExecute(s, CreateTTLTask) + // Create tidb_ttl_job_history table + mustExecute(s, CreateTTLJobHistory) } // doBootstrapSQLFile executes SQL commands in a file as the last stage of bootstrap. diff --git a/ttl/cache/ttlstatus.go b/ttl/cache/ttlstatus.go index d28bafa5a76c8..b21a50a161f79 100644 --- a/ttl/cache/ttlstatus.go +++ b/ttl/cache/ttlstatus.go @@ -30,13 +30,15 @@ const ( // JobStatusWaiting means the job hasn't started JobStatusWaiting JobStatus = "waiting" // JobStatusRunning means this job is running - JobStatusRunning = "running" + JobStatusRunning JobStatus = "running" // JobStatusCancelling means this job is being canceled, but not canceled yet - JobStatusCancelling = "cancelling" + JobStatusCancelling JobStatus = "cancelling" // JobStatusCancelled means this job has been canceled successfully - JobStatusCancelled = "cancelled" + JobStatusCancelled JobStatus = "cancelled" // JobStatusTimeout means this job has timeout - JobStatusTimeout = "timeout" + JobStatusTimeout JobStatus = "timeout" + // JobStatusFinished means job has been finished + JobStatusFinished JobStatus = "finished" ) const selectFromTTLTableStatus = "SELECT LOW_PRIORITY table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status" diff --git a/ttl/ttlworker/job.go b/ttl/ttlworker/job.go index f2a78e7ef0270..5ba91dcc375a0 100644 --- a/ttl/ttlworker/job.go +++ b/ttl/ttlworker/job.go @@ -43,6 +43,25 @@ const finishJobTemplate = `UPDATE mysql.tidb_ttl_table_status current_job_status_update_time = NULL WHERE table_id = %? AND current_job_id = %?` const removeTaskForJobTemplate = "DELETE FROM mysql.tidb_ttl_task WHERE job_id = %?" +const addJobHistoryTemplate = `INSERT INTO + mysql.tidb_ttl_job_history ( + job_id, + table_id, + parent_table_id, + table_schema, + table_name, + partition_name, + create_time, + finish_time, + ttl_expire, + summary_text, + expired_rows, + deleted_rows, + error_delete_rows, + status + ) +VALUES + (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)` func updateJobCurrentStatusSQL(tableID int64, oldStatus cache.JobStatus, newStatus cache.JobStatus, jobID string) (string, []interface{}) { return updateJobCurrentStatusTemplate, []interface{}{string(newStatus), tableID, string(oldStatus), jobID} @@ -56,11 +75,41 @@ func removeTaskForJob(jobID string) (string, []interface{}) { return removeTaskForJobTemplate, []interface{}{jobID} } +func addJobHistorySQL(job *ttlJob, finishTime time.Time, summary *TTLSummary) (string, []interface{}) { + status := cache.JobStatusFinished + if job.status == cache.JobStatusTimeout || job.status == cache.JobStatusCancelled { + status = job.status + } + + var partitionName interface{} + if job.tbl.Partition.O != "" { + partitionName = job.tbl.Partition.O + } + + return addJobHistoryTemplate, []interface{}{ + job.id, + job.tbl.ID, + job.tbl.TableInfo.ID, + job.tbl.Schema.O, + job.tbl.Name.O, + partitionName, + job.createTime.Format(timeFormat), + finishTime.Format(timeFormat), + job.ttlExpireTime.Format(timeFormat), + summary.SummaryText, + summary.TotalRows, + summary.SuccessRows, + summary.ErrorRows, + string(status), + } +} + type ttlJob struct { id string ownerID string - createTime time.Time + createTime time.Time + ttlExpireTime time.Time tbl *cache.PhysicalTable @@ -71,11 +120,11 @@ type ttlJob struct { } // finish turns current job into last job, and update the error message and statistics summary -func (job *ttlJob) finish(se session.Session, now time.Time, summary string) { +func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary) { // at this time, the job.ctx may have been canceled (to cancel this job) // even when it's canceled, we'll need to update the states, so use another context err := se.RunInTxn(context.TODO(), func() error { - sql, args := finishJobSQL(job.tbl.ID, now, summary, job.id) + sql, args := finishJobSQL(job.tbl.ID, now, summary.SummaryText, job.id) _, err := se.ExecuteSQL(context.TODO(), sql, args...) if err != nil { return errors.Wrapf(err, "execute sql: %s", sql) @@ -87,6 +136,12 @@ func (job *ttlJob) finish(se session.Session, now time.Time, summary string) { return errors.Wrapf(err, "execute sql: %s", sql) } + sql, args = addJobHistorySQL(job, now, summary) + _, err = se.ExecuteSQL(context.TODO(), sql, args...) + if err != nil { + return errors.Wrapf(err, "execute sql: %s", sql) + } + return nil }, session.TxnModeOptimistic) diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 77a66d7e3f761..132be2e626cde 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -629,7 +629,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return nil, err } - job := m.createNewJob(now, table) + job := m.createNewJob(expireTime, now, table) // job is created, notify every scan managers to fetch new tasks err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, job.id) @@ -639,14 +639,15 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return job, nil } -func (m *JobManager) createNewJob(now time.Time, table *cache.PhysicalTable) *ttlJob { +func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *cache.PhysicalTable) *ttlJob { id := m.tableStatusCache.Tables[table.ID].CurrentJobID return &ttlJob{ id: id, ownerID: m.id, - createTime: now, + createTime: now, + ttlExpireTime: expireTime, // at least, the info schema cache and table status cache are consistent in table id, so it's safe to get table // information from schema cache directly tbl: table, @@ -717,7 +718,8 @@ func (m *JobManager) GetNotificationCli() client.NotificationClient { return m.notificationCli } -type ttlSummary struct { +// TTLSummary is the summary for TTL job +type TTLSummary struct { TotalRows uint64 `json:"total_rows"` SuccessRows uint64 `json:"success_rows"` ErrorRows uint64 `json:"error_rows"` @@ -727,22 +729,24 @@ type ttlSummary struct { FinishedScanTask int `json:"finished_scan_task"` ScanTaskErr string `json:"scan_task_err,omitempty"` + SummaryText string `json:"-"` } -func summarizeErr(err error) (string, error) { - summary := &ttlSummary{ +func summarizeErr(err error) (*TTLSummary, error) { + summary := &TTLSummary{ ScanTaskErr: err.Error(), } buf, err := json.Marshal(summary) if err != nil { - return "", err + return nil, err } - return string(buf), nil + summary.SummaryText = string(buf) + return summary, nil } -func summarizeTaskResult(tasks []*cache.TTLTask) (string, error) { - summary := &ttlSummary{} +func summarizeTaskResult(tasks []*cache.TTLTask) (*TTLSummary, error) { + summary := &TTLSummary{} var allErr error for _, t := range tasks { if t.State != nil { @@ -768,7 +772,8 @@ func summarizeTaskResult(tasks []*cache.TTLTask) (string, error) { buf, err := json.Marshal(summary) if err != nil { - return "", err + return nil, err } - return string(buf), nil + summary.SummaryText = string(buf) + return summary, nil } diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index 313b9257249a6..11b4a1b6e8dee 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -16,8 +16,10 @@ package ttlworker_test import ( "context" + "encoding/json" "fmt" "strconv" + "strings" "sync" "testing" "time" @@ -69,7 +71,7 @@ func TestParallelLockNewJob(t *testing.T) { se := sessionFactory() job, err := m.LockNewJob(context.Background(), se, testTable, time.Now(), false) require.NoError(t, err) - job.Finish(se, time.Now(), "") + job.Finish(se, time.Now(), &ttlworker.TTLSummary{}) // lock one table in parallel, only one of them should lock successfully testTimes := 100 @@ -103,18 +105,19 @@ func TestParallelLockNewJob(t *testing.T) { wg.Wait() require.Equal(t, uint64(1), successCounter.Load()) - successJob.Finish(se, time.Now(), "") + successJob.Finish(se, time.Now(), &ttlworker.TTLSummary{}) } } func TestFinishJob(t *testing.T) { + timeFormat := "2006-01-02 15:04:05" store, dom := testkit.CreateMockStoreAndDomain(t) waitAndStopTTLManager(t, dom) tk := testkit.NewTestKit(t, store) sessionFactory := sessionFactory(t, store) - testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}} + testTable := &cache.PhysicalTable{ID: 2, Schema: model.NewCIStr("db1"), TableInfo: &model.TableInfo{ID: 1, Name: model.NewCIStr("t1"), TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}} tk.MustExec("insert into mysql.tidb_ttl_table_status(table_id) values (2)") @@ -122,13 +125,33 @@ func TestFinishJob(t *testing.T) { m := ttlworker.NewJobManager("test-id", nil, store, nil) m.InfoSchemaCache().Tables[testTable.ID] = testTable se := sessionFactory() - job, err := m.LockNewJob(context.Background(), se, testTable, time.Now(), false) + startTime := time.Now() + job, err := m.LockNewJob(context.Background(), se, testTable, startTime, false) + require.NoError(t, err) + + expireTime, err := testTable.EvalExpireTime(context.Background(), se, startTime) require.NoError(t, err) - summary := `{"total_rows":0,"scan_task_err":"\"'an error message contains both single and double quote'\""}` - job.Finish(se, time.Now(), summary) - tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows(`2 {"total_rows":0,"scan_task_err":"\"'an error message contains both single and double quote'\""}`)) + summary := &ttlworker.TTLSummary{ + ScanTaskErr: "\"'an error message contains both single and double quote'\"", + TotalRows: 128, + SuccessRows: 120, + ErrorRows: 8, + } + summaryBytes, err := json.Marshal(summary) + summary.SummaryText = string(summaryBytes) + + require.NoError(t, err) + endTime := time.Now() + job.Finish(se, endTime, summary) + tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows("2 " + summary.SummaryText)) tk.MustQuery("select * from mysql.tidb_ttl_task").Check(testkit.Rows()) + expectedRow := []string{ + job.ID(), "2", "1", "db1", "t1", "", + startTime.Format(timeFormat), endTime.Format(timeFormat), expireTime.Format(timeFormat), + summary.SummaryText, "128", "120", "8", "finished", + } + tk.MustQuery("select * from mysql.tidb_ttl_job_history").Check(testkit.Rows(strings.Join(expectedRow, " "))) } func TestTTLAutoAnalyze(t *testing.T) { diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index 9e0211410591b..bf7837fc2ee64 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -171,7 +171,7 @@ func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, no return m.updateHeartBeat(ctx, se, now) } -func (j *ttlJob) Finish(se session.Session, now time.Time, summary string) { +func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) { j.finish(se, now, summary) }