Skip to content

Commit

Permalink
*: add some logs to help debug error. (#2253) (#2357)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 27, 2021
1 parent 8afaa9e commit 5a4568b
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 13 deletions.
4 changes: 4 additions & 0 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"math"

"github.com/pingcap/errors"
"github.com/pingcap/log"
cerror "github.com/pingcap/ticdc/pkg/errors"
"go.uber.org/zap"
)

// AdminJobType represents for admin job type, both used in owner and processor
Expand Down Expand Up @@ -238,6 +240,7 @@ func (ts *TaskStatus) RemoveTable(id TableID, boundaryTs Ts, isMoveTable bool) (
return nil, false
}
delete(ts.Tables, id)
log.Info("remove a table", zap.Int64("tableId", id), zap.Uint64("boundaryTs", boundaryTs), zap.Bool("isMoveTable", isMoveTable))
if ts.Operation == nil {
ts.Operation = make(map[TableID]*TableOperation)
}
Expand All @@ -262,6 +265,7 @@ func (ts *TaskStatus) AddTable(id TableID, table *TableReplicaInfo, boundaryTs T
return
}
ts.Tables[id] = table
log.Info("add a table", zap.Int64("tableId", id), zap.Uint64("boundaryTs", boundaryTs))
if ts.Operation == nil {
ts.Operation = make(map[TableID]*TableOperation)
}
Expand Down
5 changes: 3 additions & 2 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,12 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
}
m.shouldBeRunning = false
jobsPending = true
m.patchState(model.StateRemoved)
// remove changefeed info and status

// remove changefeedInfo
m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
return nil, true, nil
})
// remove changefeedStatus
m.state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
return nil, true, nil
})
Expand Down
15 changes: 7 additions & 8 deletions cdc/owner/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func (s *scheduler) handleMoveTableJob() (shouldUpdateState bool, err error) {
s.moveTableTargets[job.tableID] = job.target
job := job
shouldUpdateState = false
// for all move table job, this just remove the table from the source capture.
// and the removed table by this function will be added to target function by syncTablesWithCurrentTables in the next tick.
// for all move table job, here just remove the table from the source capture.
// and the table removed by this function will be added to target capture by syncTablesWithCurrentTables in the next tick.
s.state.PatchTaskStatus(source, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) {
if status == nil {
// the capture may be down, just skip remove this table
Expand All @@ -147,13 +147,13 @@ func (s *scheduler) table2CaptureIndex() (map[model.TableID]model.CaptureID, err
for captureID, taskStatus := range s.state.TaskStatuses {
for tableID := range taskStatus.Tables {
if preCaptureID, exist := table2CaptureIndex[tableID]; exist && preCaptureID != captureID {
return nil, cerror.ErrTableListenReplicated.GenWithStackByArgs(preCaptureID, captureID)
return nil, cerror.ErrTableListenReplicated.GenWithStackByArgs(tableID, preCaptureID, captureID)
}
table2CaptureIndex[tableID] = captureID
}
for tableID := range taskStatus.Operation {
if preCaptureID, exist := table2CaptureIndex[tableID]; exist && preCaptureID != captureID {
return nil, cerror.ErrTableListenReplicated.GenWithStackByArgs(preCaptureID, captureID)
return nil, cerror.ErrTableListenReplicated.GenWithStackByArgs(tableID, preCaptureID, captureID)
}
table2CaptureIndex[tableID] = captureID
}
Expand Down Expand Up @@ -224,8 +224,8 @@ func (s *scheduler) dispatchToTargetCaptures(pendingJobs []*schedulerJob) {
}
}

// syncTablesWithCurrentTables iterates all current tables and check whether all the table has been listened.
// if not, this function will return scheduler jobs to make sure all the table will be listened.
// syncTablesWithCurrentTables iterates all current tables to check whether it should be listened or not.
// this function will return schedulerJob to make sure all tables will be listened.
func (s *scheduler) syncTablesWithCurrentTables() ([]*schedulerJob, error) {
var pendingJob []*schedulerJob
allTableListeningNow, err := s.table2CaptureIndex()
Expand All @@ -239,11 +239,10 @@ func (s *scheduler) syncTablesWithCurrentTables() ([]*schedulerJob, error) {
continue
}
// For each table which should be listened but is not, add an adding-table job to the pending job list
boundaryTs := globalCheckpointTs
pendingJob = append(pendingJob, &schedulerJob{
Tp: schedulerJobTypeAddTable,
TableID: tableID,
BoundaryTs: boundaryTs,
BoundaryTs: globalCheckpointTs,
})
}
// The remaining tables are the tables which should be not listened
Expand Down
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ this api supports POST method only

["CDC:ErrTableListenReplicated"]
error = '''
A table is being replicated by at least two processors(%s, %s), please report a bug
A table(%d) is being replicated by at least two processors(%s, %s), please report a bug
'''

["CDC:ErrTableProcessorStoppedSafely"]
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ var (
ErrSnapshotLostByGC = errors.Normalize("fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts %d is earlier than GC safepoint at %d", errors.RFCCodeText("CDC:ErrSnapshotLostByGC"))
ErrGCTTLExceeded = errors.Normalize("the checkpoint-ts(%d) lag of the changefeed(%s) has exceeded the GC TTL", errors.RFCCodeText("CDC:ErrGCTTLExceeded"))
ErrNotOwner = errors.Normalize("this capture is not a owner", errors.RFCCodeText("CDC:ErrNotOwner"))
ErrTableListenReplicated = errors.Normalize("A table is being replicated by at least two processors(%s, %s), please report a bug", errors.RFCCodeText("CDC:ErrTableListenReplicated"))
ErrTableListenReplicated = errors.Normalize("A table(%d) is being replicated by at least two processors(%s, %s), please report a bug", errors.RFCCodeText("CDC:ErrTableListenReplicated"))
// EtcdWorker related errors. Internal use only.
// ErrEtcdTryAgain is used by a PatchFunc to force a transaction abort.
ErrEtcdTryAgain = errors.Normalize("the etcd txn should be aborted and retried immediately", errors.RFCCodeText("CDC:ErrEtcdTryAgain"))
Expand Down
2 changes: 1 addition & 1 deletion pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type EtcdWorker struct {
state ReactorState
// rawState is the local cache of the latest Etcd state.
rawState map[util.EtcdKey][]byte
// pendingUpdates stores updates initiated by the Reactor that have not yet been uploaded to Etcd.
// pendingUpdates stores Etcd updates that the Reactor has not been notified of.
pendingUpdates []*etcdUpdate
// revision is the Etcd revision of the latest event received from Etcd
// (which has not necessarily been applied to the ReactorState)
Expand Down

0 comments on commit 5a4568b

Please sign in to comment.