[C++] fix cpp client do AcknowledgeCumulative not clean up previous message#8606
[C++] fix cpp client do AcknowledgeCumulative not clean up previous message#8606sijie merged 1 commit intoapache:masterfrom
Conversation
|
Could you explain why the last message id should not be removed? See pulsar/pulsar-client-cpp/lib/ConsumerImpl.cc Lines 851 to 854 in 281163b After messages whose id is <= And see Java client's implementation: The behaviors of Java and C++ client are the same. |
|
@BewareMyPower Sorry,I did not describe clearly. see https://github.com/apache/pulsar/pull/8606/files, the origin implementation of cpp is different from that of Java client. cpp client just earse |
|
@saosir Can you add a unit test that reproduces the issue? That will also help people to understand the problem. |
|
@saosir OK, I see. |
a8336f7 to
e98394a
Compare
|
@BewareMyPower Can you review this pull request again? |
OK |
|
@BewareMyPower I have two questions here, can you help me answer them?
|
|
From my perspective, both you concerned are right. But the Java client has the same behavior, like |
|
@BewareMyPower |
Yeah, |
|
@BewareMyPower |
|
@saosir It's pleasure to have your contribution. By the way, I just looked at the When you created a consumer with unacked messages tracker enabled like: Consumer consumer;
ConsumerConfiguration consumerConf;
consumerConf.setUnAckedMessagesTimeoutMs(11000); // must >= 10000
consumerConf.setTickDurationInMs(11000);
Result result = client.subscribe("my-topic", "consumer-1", consumerConf, consumer);If pulsar/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc Lines 231 to 232 in 102fa9d And Therefore, the trackers of sub-consumers only do the redelivery but not add any message id, see pulsar/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc Lines 497 to 499 in 102fa9d
|
|
@BewareMyPower pulsar/pulsar-client-cpp/lib/ClientConnection.cc Lines 666 to 669 in 102fa9d and ConsumerImpl#internalListener will be called and notify PartitionedConsumerImpl receive messge by listener of ConsumerConfiguration(here is PartitionedConsumerImpl::messageReceived)
|
f846daa to
ed3704c
Compare
|
I reset branch at first commit |
|
Thanks. Please update the associated PR description too. Also I think the tests could still be added without the refactor, like what |
8ed9502 to
d3f76e5
Compare
All is done |
|
/pulsarbot run-failure-checks |
| long size() { return UnAckedMessageTrackerEnabled::size(); } | ||
| }; // class UnAckedMessageTrackerEnabledMock | ||
|
|
||
| TEST(BasicEndToEndTest, testtUnAckedMessageTrackerDefaultBehavior) { |
There was a problem hiding this comment.
A typo: testtUnAcked -> testUnAcked
There was a problem hiding this comment.
Do I need to resubmit the code to fix this problem?How about someone fix it when merge pull request?
There was a problem hiding this comment.
I think a committer could fix it when merge PR. So you needn't any changes. @jiazhai could you help take a look?
There was a problem hiding this comment.
@saosir Can you create a follow-up PR to fix the typo?
BewareMyPower
left a comment
There was a problem hiding this comment.
LGTM, just leave a typo comment.
|
/pulsarbot run-failure-checks |
2 similar comments
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
@BewareMyPower |
|
@saosir Yeah, your branch is far behind the master, it's better to rebase to latest master. |
|
/pulsarbot run-failure-checks |
|
LGTM. @jiazhai Could you help take a look? |
| long size() { return UnAckedMessageTrackerEnabled::size(); } | ||
| }; // class UnAckedMessageTrackerEnabledMock | ||
|
|
||
| TEST(BasicEndToEndTest, testtUnAckedMessageTrackerDefaultBehavior) { |
There was a problem hiding this comment.
@saosir Can you create a follow-up PR to fix the typo?
…essage (#8606) ### Motivation pulsar-client-cpp Consumer do AcknowledgeCumulative just clean up `msgId`, not <= `msgId` in `UnAckedMessageTrackerEnabled::removeMessagesTill` ### Modifications - When do AcknowledgeCumulative from application, earse <= `msgId` in UnAckedMessageTrackerEnabled, avoid redeliver unnecessary unacknowledged messages to Broker - add unit test for `UnAckedMessageTrackerEnabled` (cherry picked from commit e75de48)
pulsar-client-cpp Consumer UnAckedMessageTrackerEnabled::removeMessagesTill should erase message whose id <=
msgIdin messageIdPartitionMapMotivation
pulsar-client-cpp Consumer do AcknowledgeCumulative just clean up
msgId, not <=msgIdinUnAckedMessageTrackerEnabled::removeMessagesTillModifications
msgIdin UnAckedMessageTrackerEnabled, avoid redeliver unnecessary unacknowledged messages to BrokerUnAckedMessageTrackerEnabledDoes this pull request potentially affect one of the following parts:
If
yeswas chosen, please highlight the changesDocumentation