Skip to content

Commit

Permalink
processor(ticdc): Extract all closures to make code maintainable (#8337)
Browse files Browse the repository at this point in the history
ref #8132
  • Loading branch information
Rustin170506 authored Feb 24, 2023
1 parent 25110ef commit c9f207d
Show file tree
Hide file tree
Showing 6 changed files with 414 additions and 163 deletions.
12 changes: 6 additions & 6 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ func (m *SinkManager) generateSinkTasks() error {
// Task upperbound is limited by barrierTs and schemaResolvedTs.
// But receivedSorterResolvedTs can be less than barrierTs, in which case
// the table is just scheduled to this node.
getUpperBound := func(tableSink *tableSinkWrapper) engine.Position {
upperBoundTs := tableSink.getReceivedSorterResolvedTs()
getUpperBound := func(tableSinkReceivedSorterResolvedTs model.Ts) engine.Position {
upperBoundTs := tableSinkReceivedSorterResolvedTs

barrierTs := m.lastBarrierTs.Load()
if upperBoundTs > barrierTs {
Expand Down Expand Up @@ -406,7 +406,7 @@ func (m *SinkManager) generateSinkTasks() error {
tableSink := tables[i]
slowestTableProgress := progs[i]
lowerBound := slowestTableProgress.nextLowerBoundPos
upperBound := getUpperBound(tableSink)
upperBound := getUpperBound(tableSink.getReceivedSorterResolvedTs())

// The table has no available progress.
if lowerBound.Compare(upperBound) >= 0 {
Expand Down Expand Up @@ -493,8 +493,8 @@ func (m *SinkManager) generateSinkTasks() error {

func (m *SinkManager) generateRedoTasks() error {
// We use the table's resolved ts as the upper bound to fetch events.
getUpperBound := func(tableSink *tableSinkWrapper) engine.Position {
upperBoundTs := tableSink.getReceivedSorterResolvedTs()
getUpperBound := func(tableSinkReceivedSorterResolvedTs model.Ts) engine.Position {
upperBoundTs := tableSinkReceivedSorterResolvedTs

// If a task carries events after schemaResolvedTs, mounter group threads
// can be blocked on waiting schemaResolvedTs get advanced.
Expand Down Expand Up @@ -552,7 +552,7 @@ func (m *SinkManager) generateRedoTasks() error {
tableSink := tables[i]
slowestTableProgress := progs[i]
lowerBound := slowestTableProgress.nextLowerBoundPos
upperBound := getUpperBound(tableSink)
upperBound := getUpperBound(tableSink.getReceivedSorterResolvedTs())

// The table has no available progress.
if lowerBound.Compare(upperBound) >= 0 {
Expand Down
6 changes: 3 additions & 3 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ func (w *redoWorker) handleTasks(ctx context.Context, taskChan <-chan *redoTask)

func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr error) {
lowerBound := task.lowerBound
upperBound := task.getUpperBound(task.tableSink)
upperBound := task.getUpperBound(task.tableSink.getReceivedSorterResolvedTs())
lowerPhs := oracle.GetTimeFromTS(lowerBound.CommitTs)
upperPhs := oracle.GetTimeFromTS(upperBound.CommitTs)
if upperPhs.Sub(lowerPhs) > maxTaskRange {
upperCommitTs := oracle.GoTimeToTS(lowerPhs.Add(maxTaskRange))
if upperPhs.Sub(lowerPhs) > maxTaskTimeRange {
upperCommitTs := oracle.GoTimeToTS(lowerPhs.Add(maxTaskTimeRange))
upperBound = engine.Position{
StartTs: upperCommitTs - 1,
CommitTs: upperCommitTs,
Expand Down
Loading

0 comments on commit c9f207d

Please sign in to comment.