-
Notifications
You must be signed in to change notification settings - Fork 293
sink: implement an async kafka sink #344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
/run-kafka-tests |
/run-kafka-test |
/run-kafka-tests |
5 similar comments
/run-kafka-tests |
/run-kafka-tests |
/run-kafka-tests |
/run-kafka-tests |
/run-kafka-tests |
# Conflicts: # tests/_utils/run_kafka_consumer # tests/cdc/run.sh # tests/multi_capture/run.sh # tests/row_format/run.sh # tests/simple/run.sh # tests/split_region/run.sh
/run-kafka-tests |
/run-kafka-tests |
/run-kafka-tests |
/run-kafka-tests |
/run-kafka-tests |
/run-all-tests |
/run-kafka-tests |
/run-all-tests |
/run-all-tests |
/run-all-tests |
/run-integration-tests |
return wg.Wait() | ||
} | ||
|
||
func (k *kafkaSaramaProducer) MaxSuccessesIndex() uint64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the return value of this function have two meanings:
- If all partitions have confirmed success with all sent messages, then the return value means the max success index of all partitions.
- If not all partitions have confirmed success messages with all sent messages, the return value means the min success index of not confirmed partitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, All in all, this function returns an index which less than any unsent index.
/run-all-tests |
/run-integration-tests |
Codecov Report
@@ Coverage Diff @@
## master #344 +/- ##
================================================
- Coverage 29.2417% 28.5587% -0.6831%
================================================
Files 59 59
Lines 5328 5613 +285
================================================
+ Hits 1558 1603 +45
- Misses 3629 3867 +238
- Partials 141 143 +2 |
@@ -212,8 +219,14 @@ func (k *mqSink) run(ctx context.Context) error { | |||
} | |||
|
|||
// wait mq producer send message successfully | |||
for sinkCheckpoint.index > k.mqProducer.MaxSuccessesIndex() { | |||
time.Sleep(20 * time.Millisecond) | |||
err := retry.Run(func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the DefaultInitialInterval
for retry is 500ms. 🤔️
Seems our capsulation for retry shadows the retry config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me expose this config in next pr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see #378
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What problem does this PR solve?
implement an async kafka sink
What is changed and how it works?
Check List
Tests