Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Geo Replication lost messages or frequently fails due to Deduplication is not aprropriate for Geo-Replication #23697

Open
wants to merge 20 commits into
base: master
Choose a base branch
from

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Dec 9, 2024

Motivation

Background

How does deduplication work?

  • Producer sends messages and records sequence-ids that are pending responses in memory, the variable is {pendingMessages}
  • Brokers will respond to an invalid position -1:-1 if the sequence ID published is lower than the previous messages.
  • Producer checks whether the next sequence id in {pendingMessages} is larger than the one that was rejected.
    • {next} > {rejected}: ignore the error, and continue work
    • {next} < {rejected}: close channels and reconnect.

Conditions that issue happened

  • Replicators send all messages on a topic and do not matter what message was sent by which original producer.
    • (Highlight) it mixed messages into {pendingMessages} with message.sequenceId but ignores message.original-producer-name, which may cause the sequence-ids in {pendingMessages} is not increasing
  • The producer checking the sequence ID after receiving a -1:-1 send response will fail.

Issue-1: loss messages

  • Publishing in the source cluster
    • Producer-1 send 2 messages: M1(seq: 0), M2(seq: 1)
    • Producer-2 send 2 messages: M3(seq: 0), M4(seq: 1)
  • Replicate messages to the remote cluster
    • Copies M1 and M2
      • {pendingMessages}: [0,1]
    • Repeatedly copies M1 and M2. and copies M3 and M4.
      • {pendingMessages}: [0,1,0,1]
      • After repeatedly copying M1 and M2, the network broke.
      • Receive send receipt:
        • seq 0, position 0:0
        • seq 1, position 0:1
        • seq 0, position -1: -1
        • seq 1, position -1:-1
      • {pendingMessages}: [empty]
  • After a topic unloading.
    • The replicator will start after the topic is loaded up(backlog is 0 now).
    • The client will create a new connection.
  • Result:
    • Disabled deduplication: copied [M1, M2, M1, M2]
    • Enabled deduplication: copied [M1, M2]

You can reproduce the issue by the test testDeduplicationNotLostMessage


Issue-2: frequently fails

  • Original-producer-2: sent msg 3:0 with sequence-id 10
  • Original-producer-1: sent msg 3:1 with sequence-id 1
  • Original-producer-1: sent msg 3:2 with sequence-id 2
    -0 Replicator copies messages
    • {pendingMessages}: [10,1, 2]
  • Sent 3:0 successfully
    • {pendingMessages}: [1,2]
  • Resend 3:0(a duplicated publishing)
    • Receive -1:-1(new position relates to the latest publishing) for the latest send-response.
    • failed-sequenced:10 > pendingMessages[0].sequenceId: 1
    • Replicator-producer reconnects and resets the cursor, which leads to more duplicated publishing.
    • Loop to the step 1.

No test for reproducing this issue.

Modifications

Solution: replicators use a specified sequence ID(ledegrId:entryId of the original topic) instead of using the original producers’

  • Original-producer-2: sent msg 3:0 with sequence-id 10
  • Original-producer-1: sent msg 3:1 with sequence-id 1
  • Original-producer-1: sent msg 3:2 with sequence-id 2
  • Replicator copies messages with specified sequence ID(3:0)
    • {pendingMessages}: 3:0, 3:1, 3:2]
  • Sent 3:0 successfully
    • {pendingMessages}: [3:1, 3:2]
  • Resend 3:0(a duplicated publishing)
  • Receive -1:-1(new position relates to the latest publishing) for the latest send-response.
  • Ignored the error since failed-sequenced(3:0) < pendingMessages[0].sequenceId(3:2)

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@poorbarcode poorbarcode self-assigned this Dec 9, 2024
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Dec 9, 2024
@poorbarcode poorbarcode changed the title [draft][fix][broker] GEO replication fails due to deduplication is not aprropriate for Geo-Replication [draft][fix][broker] Geo Replication fails due to Deduplication is not aprropriate for Geo-Replication Dec 9, 2024
@poorbarcode poorbarcode force-pushed the fix/repl_sequence_id branch 3 times, most recently from b9ae34a to 0d2e235 Compare December 17, 2024 11:56
@poorbarcode poorbarcode changed the title [draft][fix][broker] Geo Replication fails due to Deduplication is not aprropriate for Geo-Replication [fix][broker] Geo Replication fails due to Deduplication is not aprropriate for Geo-Replication Dec 17, 2024
@poorbarcode poorbarcode changed the title [fix][broker] Geo Replication fails due to Deduplication is not aprropriate for Geo-Replication [fix][broker] Geo Replication lost messages or frequently fails due to Deduplication is not aprropriate for Geo-Replication Dec 17, 2024
@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@poorbarcode poorbarcode changed the title [fix][broker] Geo Replication lost messages or frequently fails due to Deduplication is not aprropriate for Geo-Replication [fix] [broker] Geo Replication lost messages or frequently fails due to Deduplication is not aprropriate for Geo-Replication Dec 18, 2024
@codecov-commenter
Copy link

codecov-commenter commented Dec 23, 2024

Codecov Report

Attention: Patch coverage is 63.57388% with 106 lines in your changes missing coverage. Please review.

Project coverage is 74.08%. Comparing base (bbc6224) to head (5340223).
Report is 813 commits behind head on master.

Files with missing lines Patch % Lines
...pulsar/client/impl/GeoReplicationProducerImpl.java 50.00% 36 Missing and 18 partials ⚠️
...roker/service/persistent/MessageDeduplication.java 67.54% 24 Missing and 13 partials ⚠️
...ava/org/apache/pulsar/broker/service/Producer.java 78.12% 6 Missing and 1 partial ⚠️
.../java/org/apache/pulsar/client/impl/ClientCnx.java 63.63% 3 Missing and 1 partial ⚠️
...er/service/persistent/GeoPersistentReplicator.java 50.00% 1 Missing and 1 partial ⚠️
...n/java/org/apache/pulsar/broker/service/Topic.java 0.00% 1 Missing ⚠️
...org/apache/pulsar/broker/service/TransportCnx.java 50.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23697      +/-   ##
============================================
+ Coverage     73.57%   74.08%   +0.51%     
+ Complexity    32624    32215     -409     
============================================
  Files          1877     1855      -22     
  Lines        139502   143636    +4134     
  Branches      15299    16334    +1035     
============================================
+ Hits         102638   106419    +3781     
+ Misses        28908    28818      -90     
- Partials       7956     8399     +443     
Flag Coverage Δ
inttests 26.66% <27.83%> (+2.08%) ⬆️
systests 23.10% <8.24%> (-1.22%) ⬇️
unittests 73.62% <63.57%> (+0.77%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...ache/pulsar/broker/service/AbstractReplicator.java 65.80% <100.00%> (-19.20%) ⬇️
...va/org/apache/pulsar/broker/service/ServerCnx.java 72.95% <ø> (+0.80%) ⬆️
...ar/broker/service/persistent/ShadowReplicator.java 62.22% <100.00%> (+3.68%) ⬆️
...va/org/apache/pulsar/client/impl/ProducerImpl.java 84.01% <100.00%> (+0.41%) ⬆️
...rg/apache/pulsar/client/impl/PulsarClientImpl.java 75.16% <100.00%> (+0.86%) ⬆️
...ar/client/impl/conf/ProducerConfigurationData.java 85.29% <ø> (-6.48%) ⬇️
...va/org/apache/pulsar/common/protocol/Commands.java 90.95% <100.00%> (+0.35%) ⬆️
...ava/org/apache/pulsar/common/protocol/Markers.java 91.45% <100.00%> (+1.10%) ⬆️
...n/java/org/apache/pulsar/broker/service/Topic.java 40.00% <0.00%> (+3.63%) ⬆️
...org/apache/pulsar/broker/service/TransportCnx.java 50.00% <50.00%> (ø)
... and 5 more

... and 1009 files with indirect coverage changes

@codelipenghui
Copy link
Contributor

@poorbarcode It's a good idea to just use the ledger ID and entry ID for the message deduplication. In this case, we can also remove the deduplication state after the ledger get fully replicated. For example:

  • Replicating data from ledger 1
  • The last entry ID of ledger 1 is 10000
  • After the remote cluster get replicated data with ledger ID greater than 1
  • Then we can delete the deduplication state for ledger 1

pulsar-common/src/main/proto/PulsarApi.proto Outdated Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants