Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink: implement an async kafka sink #344

Merged
merged 15 commits into from
Mar 26, 2020
Merged

Conversation

zier-one
Copy link
Contributor

@zier-one zier-one commented Mar 17, 2020

What problem does this PR solve?

implement an async kafka sink

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test

@zier-one zier-one added WIP component/sink Sink component. subject/performance Denotes an issue or pull request is related to replication performance. labels Mar 17, 2020
@zier-one
Copy link
Contributor Author

/run-kafka-tests

@zier-one
Copy link
Contributor Author

/run-kafka-test

@zier-one
Copy link
Contributor Author

/run-kafka-tests

5 similar comments
@zier-one
Copy link
Contributor Author

/run-kafka-tests

@zier-one
Copy link
Contributor Author

/run-kafka-tests

@zier-one
Copy link
Contributor Author

/run-kafka-tests

@zier-one
Copy link
Contributor Author

/run-kafka-tests

@zier-one
Copy link
Contributor Author

/run-kafka-tests

leoppro added 2 commits March 19, 2020 11:24
# Conflicts:
#	tests/_utils/run_kafka_consumer
#	tests/cdc/run.sh
#	tests/multi_capture/run.sh
#	tests/row_format/run.sh
#	tests/simple/run.sh
#	tests/split_region/run.sh
@zier-one
Copy link
Contributor Author

/run-kafka-tests

@zier-one
Copy link
Contributor Author

/run-kafka-tests

@zier-one
Copy link
Contributor Author

/run-kafka-tests

@zier-one
Copy link
Contributor Author

/run-kafka-tests

@zier-one
Copy link
Contributor Author

/run-kafka-tests

@zier-one
Copy link
Contributor Author

/run-all-tests

@zier-one
Copy link
Contributor Author

/run-kafka-tests

@zier-one
Copy link
Contributor Author

/run-all-tests

@zier-one
Copy link
Contributor Author

/run-all-tests

@zier-one zier-one added status/ptal Could you please take a look? and removed WIP labels Mar 19, 2020
@zier-one
Copy link
Contributor Author

/run-all-tests

@zier-one
Copy link
Contributor Author

/run-integration-tests

return wg.Wait()
}

func (k *kafkaSaramaProducer) MaxSuccessesIndex() uint64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the return value of this function have two meanings:

  • If all partitions have confirmed success with all sent messages, then the return value means the max success index of all partitions.
  • If not all partitions have confirmed success messages with all sent messages, the return value means the min success index of not confirmed partitions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, All in all, this function returns an index which less than any unsent index.

cdc/sink/mq.go Outdated Show resolved Hide resolved
cdc/sink/mqProducer/kafka.go Show resolved Hide resolved
@zier-one
Copy link
Contributor Author

/run-all-tests

@zier-one
Copy link
Contributor Author

/run-integration-tests

@codecov-io
Copy link

Codecov Report

Merging #344 into master will decrease coverage by 0.6830%.
The diff coverage is 3.7174%.

@@               Coverage Diff                @@
##             master       #344        +/-   ##
================================================
- Coverage   29.2417%   28.5587%   -0.6831%     
================================================
  Files            59         59                
  Lines          5328       5613       +285     
================================================
+ Hits           1558       1603        +45     
- Misses         3629       3867       +238     
- Partials        141        143         +2     

@@ -212,8 +219,14 @@ func (k *mqSink) run(ctx context.Context) error {
}

// wait mq producer send message successfully
for sinkCheckpoint.index > k.mqProducer.MaxSuccessesIndex() {
time.Sleep(20 * time.Millisecond)
err := retry.Run(func() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the DefaultInitialInterval for retry is 500ms. 🤔️
Seems our capsulation for retry shadows the retry config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me expose this config in next pr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #378

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine

Copy link
Contributor

@amyangfei amyangfei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@amyangfei amyangfei added LGT1 and removed status/ptal Could you please take a look? labels Mar 26, 2020
@zier-one zier-one merged commit 3996321 into pingcap:master Mar 26, 2020
@zier-one zier-one deleted the kafka_batch branch April 10, 2020 05:13
5kbpers pushed a commit to 5kbpers/ticdc that referenced this pull request Aug 24, 2020
amyangfei pushed a commit to amyangfei/tiflow that referenced this pull request May 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/sink Sink component. subject/performance Denotes an issue or pull request is related to replication performance.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants