-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack #17751
base: master
Are you sure you want to change the base?
Conversation
@eolivelli @codelipenghui @Technoboy- PTAL,thanks! |
d1ee2ee
to
2f52eb1
Compare
2f52eb1
to
253273c
Compare
*/ | ||
public List<Entry> filterAndAcknowledgeSkippedEntry(List<Entry> entries) { | ||
List<Position> skippedPositions = new ArrayList<>(); | ||
List<Entry> filterEntries = Lists.newArrayList(Collections2.filter(entries, entry -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can move this into filterEntriesForConsumer? This way we save some allocations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed!
@eolivelli PTAL,thanks!
entry.ledgerId = ledgerId; | ||
entry.entryId = entryId; | ||
entry.skipped = skipped; | ||
entry.data = Unpooled.wrappedBuffer(new byte[0]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need to allocate this ? what about using a constant ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
entry.data = Unpooled.EMPTY_BUFFER ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed! PTAL,thanks! @eolivelli
@@ -66,6 +67,17 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) { | |||
return entry; | |||
} | |||
|
|||
public static EntryImpl create(long ledgerId, long entryId, boolean skipped) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about "createSkippedEntry" ?
otherwise people may want to use this factory method for other purposes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed! PTAL,thanks! @eolivelli
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
5b0c3a2
to
e835c9f
Compare
@codelipenghui @Technoboy- @Jason918 PTAL,thanks! |
if (entriesToFiltered == null) { | ||
entriesToFiltered = new ArrayList<>(); | ||
} | ||
entriesToFiltered.add(entryImpl.getPosition()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need entries.set(i, null);
and entry.release();
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK , I will fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, PTAL, thanks! @Jason918
PositionImpl endPosition = (PositionImpl) nexReadPosition; | ||
while (startPosition.compareTo(endPosition) < 0) { | ||
skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId)); | ||
startPosition = ledger.getNextValidPosition(startPosition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems, normally, it will just goes from readPosition
to nexReadPosition
?
Will you miss other entries to be acked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems, normally, it will just goes from readPosition to nexReadPosition?
YES
Will you miss other entries to be acked?
IMO, won't miss other entries to be acked
+1 Agree with @codelipenghui concern. individual ack directly is better than introducing new stuff. |
OK , I will fixed! |
break; | ||
} | ||
} | ||
List<Entry> filteredEntries = cursor.filterReadEntries(skippedEntries); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just drop the data with the instruction cursor.delete(positions)
? This saves the memory of entries
and is easier to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK , I will fix
skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId)); | ||
startPosition = ledger.getNextValidPosition(startPosition); | ||
toAckEntryNum++; | ||
if (toAckEntryNum > cursor.getConfig().getMaxAckEntryNumForAutoSkipNonRecoverableData()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the state of the cursor is like this:
read position {1:0}
individual deleted messages [ {1:0}, {1:50001} ]
After the entry filter, the nexReadPosition
will be {1,10000} and filteredEntries
will be {1,0}, then maxAckEntryNumForAutoSkipNonRecoverableData
could not work perfect. Can we let cursor
do the filtering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix
The pr had no activity for 30 days, mark with Stale label. |
@lordcheng10 hi, I move this PR to release/2.9.5, if you have any questions, please ping me. thanks. |
OK |
The pr had no activity for 30 days, mark with Stale label. |
As discussed on the mailing list https://lists.apache.org/thread/w4jzk27qhtosgsz7l9bmhf1t7o9mxjhp, there is no plan to release 2.9.6, so I am going to remove the release/2.9.6 label |
It seems that #21210 addresses the same issue. |
Motivation
When isAutoSkipNonRecoverableData=true and individual ack, the markdelete position does not move forward.
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
Lines 100 to 114 in 72dd01b
Modifications
Documentation
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
(Please explain why)
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)
Matching PR in forked repository
PR in forked repository: lordcheng10#19