-
Notifications
You must be signed in to change notification settings - Fork 282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pipeline: set table status to stopped after sink is closed #2716
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
/run-all-tests |
Codecov Report
@@ Coverage Diff @@
## master #2716 +/- ##
================================================
+ Coverage 55.8716% 61.5431% +5.6715%
================================================
Files 169 161 -8
Lines 20667 17911 -2756
================================================
- Hits 11547 11023 -524
+ Misses 8012 5873 -2139
+ Partials 1108 1015 -93 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
cdc/processor/processor.go
Outdated
@@ -367,7 +367,7 @@ func (p *processor) handleTableOperation(ctx cdcContext.Context) error { | |||
}) | |||
case model.OperProcessed: | |||
if table.Status() != tablepipeline.TableStatusStopped { | |||
log.Debug("the table is still not stopped", zap.Uint64("checkpointTs", table.CheckpointTs()), zap.Int64("tableID", tableID)) | |||
log.Info("the table is still not stopped", zap.Uint64("checkpointTs", table.CheckpointTs()), zap.Int64("tableID", tableID)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also change Debug to Info for the log at line 734?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid there are too many logs if table count is large, such as 10k tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also revert this log level change, since
- we can deduce lazy stop from sink barrier block log
- It is better to refine scheduling logs in a specific PR
cdc/processor/pipeline/table.go
Outdated
@@ -100,8 +100,7 @@ func (t *tablePipelineImpl) UpdateBarrierTs(ts model.Ts) { | |||
// AsyncStop tells the pipeline to stop, and returns true is the pipeline is already stopped. | |||
func (t *tablePipelineImpl) AsyncStop(targetTs model.Ts) bool { | |||
err := t.p.SendToFirstNode(pipeline.CommandMessage(&pipeline.Command{ | |||
Tp: pipeline.CommandTypeStopAtTs, | |||
StoppedTs: targetTs, | |||
Tp: pipeline.CommandTypeStopAtTs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tp: pipeline.CommandTypeStopAtTs, | |
Tp: pipeline.CommandTypeStopAtTs, |
Cloud you rename pipeline.CommandTypeStopAtTs
to pipeline.CommandTypeStop
?
/run-all-tests |
1 similar comment
/run-all-tests |
/test ? |
/merge |
This pull request has been accepted and is ready to merge. Commit hash: 4d40ea6
|
/run-unit-tests |
/run-integration-tests |
2 similar comments
/run-integration-tests |
/run-integration-tests |
/merge |
/run-integration-tests |
In response to a cherrypick label: new pull request created: #2727. |
In response to a cherrypick label: new pull request created: #2728. |
In response to a cherrypick label: new pull request created: #2729. |
In response to a cherrypick label: new pull request created: #2730. |
What problem does this PR solve?
Fix a data inconsistency issue caused by conflict write from two captures, it was a remaining bug after #2417
What is changed and how it works?
StoppedTs
parameter in pipeline command.Check List
Tests
Release note