Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack #17751

Open
wants to merge 12 commits into
base: master
Choose a base branch
from

Conversation

lordcheng10
Copy link
Contributor

@lordcheng10 lordcheng10 commented Sep 20, 2022

Motivation

When isAutoSkipNonRecoverableData=true and individual ack, the markdelete position does not move forward.

} else if (cursor.config.isAutoSkipNonRecoverableData() && exception instanceof NonRecoverableLedgerException) {
log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(),
readPosition, exception.getMessage());
// try to find and move to next valid ledger
final Position nexReadPosition = cursor.getNextLedgerPosition(readPosition.getLedgerId());
// fail callback if it couldn't find next valid ledger
if (nexReadPosition == null) {
callback.readEntriesFailed(exception, ctx);
cursor.ledger.mbean.recordReadEntriesError();
recycle();
return;
}
updateReadPosition(nexReadPosition);
checkReadCompletion();
} else {

Modifications

  1. Add a skipped variable in EntryImpl to record whether the entry is automatically skipped;
  2. When pushing entrys to the consumer, automatically ack the skipped entrys;

Documentation

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

Matching PR in forked repository

PR in forked repository: lordcheng10#19

@lordcheng10 lordcheng10 changed the title When isAutoSkipNonRecoverableData=true, fix the problem that the mark… [fix][broker] Fix the problem that the markdelete position does not move forward Sep 20, 2022
@lordcheng10 lordcheng10 changed the title [fix][broker] Fix the problem that the markdelete position does not move forward [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true Sep 20, 2022
@lordcheng10 lordcheng10 changed the title [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack Sep 20, 2022
@lordcheng10
Copy link
Contributor Author

@eolivelli @codelipenghui @Technoboy- PTAL,thanks!

*/
public List<Entry> filterAndAcknowledgeSkippedEntry(List<Entry> entries) {
List<Position> skippedPositions = new ArrayList<>();
List<Entry> filterEntries = Lists.newArrayList(Collections2.filter(entries, entry -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can move this into filterEntriesForConsumer? This way we save some allocations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!
@eolivelli PTAL,thanks!

entry.ledgerId = ledgerId;
entry.entryId = entryId;
entry.skipped = skipped;
entry.data = Unpooled.wrappedBuffer(new byte[0]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to allocate this ? what about using a constant ?

Copy link
Contributor Author

@lordcheng10 lordcheng10 Sep 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entry.data = Unpooled.EMPTY_BUFFER ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed! PTAL,thanks! @eolivelli

@@ -66,6 +67,17 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
return entry;
}

public static EntryImpl create(long ledgerId, long entryId, boolean skipped) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about "createSkippedEntry" ?
otherwise people may want to use this factory method for other purposes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed! PTAL,thanks! @eolivelli

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lordcheng10
Copy link
Contributor Author

@codelipenghui @Technoboy- @Jason918 PTAL,thanks!

if (entriesToFiltered == null) {
entriesToFiltered = new ArrayList<>();
}
entriesToFiltered.add(entryImpl.getPosition());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need entries.set(i, null); and entry.release();?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK , I will fix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, PTAL, thanks! @Jason918

PositionImpl endPosition = (PositionImpl) nexReadPosition;
while (startPosition.compareTo(endPosition) < 0) {
skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId));
startPosition = ledger.getNextValidPosition(startPosition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems, normally, it will just goes from readPosition to nexReadPosition?
Will you miss other entries to be acked?

Copy link
Contributor Author

@lordcheng10 lordcheng10 Sep 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems, normally, it will just goes from readPosition to nexReadPosition?

YES

Will you miss other entries to be acked?

IMO, won't miss other entries to be acked

@lordcheng10 lordcheng10 requested review from Jason918 and removed request for AnonHxy September 27, 2022 13:43
@mattisonchao
Copy link
Member

mattisonchao commented Oct 8, 2022

+1 Agree with @codelipenghui concern. individual ack directly is better than introducing new stuff.

@lordcheng10
Copy link
Contributor Author

+1 Agree with @codelipenghui concern. individual ack directly is better than introducing new stuff.

OK , I will fixed!

break;
}
}
List<Entry> filteredEntries = cursor.filterReadEntries(skippedEntries);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just drop the data with the instruction cursor.delete(positions)? This saves the memory of entries and is easier to understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK , I will fix

skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId));
startPosition = ledger.getNextValidPosition(startPosition);
toAckEntryNum++;
if (toAckEntryNum > cursor.getConfig().getMaxAckEntryNumForAutoSkipNonRecoverableData()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the state of the cursor is like this:

read position {1:0}
individual deleted messages [ {1:0}, {1:50001} ]

After the entry filter, the nexReadPosition will be {1,10000} and filteredEntries will be {1,0}, then maxAckEntryNumForAutoSkipNonRecoverableData could not work perfect. Can we let cursor do the filtering?

Copy link
Contributor Author

@lordcheng10 lordcheng10 Oct 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix

@github-actions
Copy link

github-actions bot commented Nov 9, 2022

The pr had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Nov 9, 2022
@congbobo184
Copy link
Contributor

@lordcheng10 hi, I move this PR to release/2.9.5, if you have any questions, please ping me. thanks.

@lordcheng10
Copy link
Contributor Author

OpReadEntry.java

OK

@github-actions
Copy link

The pr had no activity for 30 days, mark with Stale label.

@michaeljmarshall
Copy link
Member

As discussed on the mailing list https://lists.apache.org/thread/w4jzk27qhtosgsz7l9bmhf1t7o9mxjhp, there is no plan to release 2.9.6, so I am going to remove the release/2.9.6 label

@github-actions github-actions bot removed the Stale label Jun 28, 2023
@Technoboy- Technoboy- added this to the 3.2.0 milestone Jul 31, 2023
@Technoboy- Technoboy- modified the milestones: 3.2.0, 3.3.0 Dec 22, 2023
@coderzc coderzc modified the milestones: 3.3.0, 3.4.0 May 8, 2024
@lhotari
Copy link
Member

lhotari commented Oct 14, 2024

It seems that #21210 addresses the same issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker doc-not-needed Your PR changes do not impact docs release/2.8.5 release/2.10.4 release/2.11.1 release/4.0.2 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.