Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist individually deleted messages #276

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
persist and recover individual deleted messages
  • Loading branch information
sschepens authored and merlimat committed Mar 5, 2017
commit 3402257d425515d46dc9590a7b819fb6254e5d35
2 changes: 2 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ managedLedgerCursorMaxEntriesPerLedger=50000
# Max time before triggering a rollover on a cursor ledger
managedLedgerCursorRolloverTimeInSeconds=14400

managedLedgerMaxUnackedRangesToPersist=1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add the documentation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should




### --- Load balancer --- ###
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
*/
package org.apache.bookkeeper.mledger;

import java.util.List;
import java.util.Set;

import com.google.common.annotations.Beta;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
Expand All @@ -26,8 +24,10 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;

import com.google.common.annotations.Beta;
import com.google.common.base.Predicate;

import java.util.List;
import java.util.Set;
import java.util.function.Predicate;

/**
* A ManangedCursor is a persisted cursor inside a ManagedLedger.
Expand Down Expand Up @@ -326,7 +326,7 @@ public void asyncSkipEntries(int numEntriesToSkip, IndividualDeletedEntries dele
* opaque context
*/
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
FindEntryCallback callback, Object ctx);
FindEntryCallback callback, Object ctx);

/**
* reset the cursor to specified position to enable replay of messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
@Beta
public class ManagedLedgerConfig {

private int maxUnackedRangesToPersist = 1000;
private int maxEntriesPerLedger = 50000;
private int maxSizePerLedgerMb = 100;
private int minimumRolloverTimeMs = 0;
Expand Down Expand Up @@ -347,4 +348,21 @@ public ManagedLedgerConfig setRetentionSizeInMB(long retentionSizeInMB) {
public long getRetentionSizeInMB() {
return retentionSizeInMB;
}

/**
* @return max unacked message ranges that will be persisted and recovered.
*
*/
public int getMaxUnackedRangesToPersist() {
return maxUnackedRangesToPersist;
}

/**
* @param maxUnackedRangesToPersist
* max unacked message ranges that will be persisted and receverd.
*/
public ManagedLedgerConfig setMaxUnackedRangesToPersist(int maxUnackedRangesToPersist) {
this.maxUnackedRangesToPersist = maxUnackedRangesToPersist;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
Expand All @@ -56,14 +56,14 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.Version;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.mledger.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
Expand Down Expand Up @@ -128,7 +128,7 @@ public PendingMarkDeleteEntry(PositionImpl newPosition, MarkDeleteCallback callb
}
}

private final ArrayDeque<PendingMarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<PendingMarkDeleteEntry>();
private final ArrayDeque<PendingMarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<>();
private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount");
private volatile int pendingMarkDeletedSubmittedCount = 0;
Expand All @@ -140,16 +140,16 @@ enum State {
Open, // Metadata ledger is ready
SwitchingLedger, // The metadata ledger is being switched
Closed // The managed cursor has been closed
};
}

private static final AtomicReferenceFieldUpdater<ManagedCursorImpl, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, State.class, "state");
private volatile State state = null;

public interface VoidCallback {
public void operationComplete();
void operationComplete();

public void operationFailed(ManagedLedgerException exception);
void operationFailed(ManagedLedgerException exception);
}

ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName) {
Expand Down Expand Up @@ -190,6 +190,9 @@ public void operationComplete(ManagedCursorInfo info, Version version) {
// closed and the last mark-delete position is stored in the ManagedCursorInfo itself.s
PositionImpl recoveredPosition = new PositionImpl(info.getMarkDeleteLedgerId(),
info.getMarkDeleteEntryId());
if (info.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(info.getIndividualDeletedMessagesList());
}
recoveredCursor(recoveredPosition);
callback.operationComplete();
} else {
Expand Down Expand Up @@ -259,12 +262,31 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
}

PositionImpl position = new PositionImpl(positionInfo);
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
}
recoveredCursor(position);
callback.operationComplete();
}, null);
}, null);
}

private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> individualDeletedMessagesList) {
lock.writeLock().lock();
try {
individualDeletedMessages.clear();
individualDeletedMessagesList
.forEach(messageRange -> individualDeletedMessages.add(
Range.openClosed(
new PositionImpl(messageRange.getLowerEndpoint()),
new PositionImpl(messageRange.getUpperEndpoint())
)
));
} finally {
lock.writeLock().unlock();
}
}

private void recoveredCursor(PositionImpl position) {
// if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
// we need to move to the next existing ledger
Expand Down Expand Up @@ -615,7 +637,7 @@ public void findEntryFailed(ManagedLedgerException exception, Object ctx) {

@Override
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
FindEntryCallback callback, Object ctx) {
FindEntryCallback callback, Object ctx) {
OpFindNewest op;
PositionImpl startPosition = null;
long max = 0;
Expand Down Expand Up @@ -859,10 +881,8 @@ public Set<? extends Position> asyncReplayEntries(final Set<? extends Position>
Set<Position> alreadyAcknowledgedPositions = Sets.newHashSet();
lock.readLock().lock();
try {
positions.stream().filter(position -> {
return individualDeletedMessages.contains((PositionImpl) position)
|| ((PositionImpl) position).compareTo(markDeletePosition) < 0;
}).forEach(pos -> alreadyAcknowledgedPositions.add(pos));
positions.stream().filter(position -> individualDeletedMessages.contains((PositionImpl) position)
|| ((PositionImpl) position).compareTo(markDeletePosition) < 0).forEach(alreadyAcknowledgedPositions::add);
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -894,17 +914,17 @@ public synchronized void readEntryFailed(ManagedLedgerException mle, Object ctx)
log.warn("[{}][{}] Error while replaying entries", ledger.getName(), name, mle);
if (exception.compareAndSet(null, mle)) {
// release the entries just once, any further read success will release the entry straight away
entries.forEach(e -> e.release());
entries.forEach(Entry::release);
}
if (--pendingCallbacks == 0) {
callback.readEntriesFailed(exception.get(), ctx);
}
}
};

positions.stream().filter(position -> !alreadyAcknowledgedPositions.contains(position)).forEach(p -> {
ledger.asyncReadEntry((PositionImpl) p, cb, ctx);
});
positions.stream()
.filter(position -> !alreadyAcknowledgedPositions.contains(position))
.forEach(p -> ledger.asyncReadEntry((PositionImpl) p, cb, ctx));

return alreadyAcknowledgedPositions;
}
Expand Down Expand Up @@ -1646,10 +1666,17 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl position, MetaStoreCallback<Void> callback) {
// When closing we store the last mark-delete position in the z-node itself, so we won't need the cursor ledger,
// hence we write it as -1. The cursor ledger is deleted once the z-node write is confirmed.
ManagedCursorInfo info = ManagedCursorInfo.newBuilder()
.setCursorsLedgerId(cursorsLedgerId)
.setMarkDeleteLedgerId(position.getLedgerId())
.setMarkDeleteEntryId(position.getEntryId()).build();
ManagedCursorInfo.Builder builder = ManagedCursorInfo.newBuilder()
.setCursorsLedgerId(cursorsLedgerId)
.setMarkDeleteLedgerId(position.getLedgerId())
.setMarkDeleteEntryId(position.getEntryId());
if (!individualDeletedMessages.isEmpty()) {
builder.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
}
ManagedCursorInfo info = builder.build();
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Closing cursor at md-position: {}", ledger.getName(), name, markDeletePosition);
}

ledger.getStore().asyncUpdateCursorInfo(ledger.getName(), name, info, cursorLedgerVersion,
new MetaStoreCallback<Void>() {
Expand Down Expand Up @@ -1825,39 +1852,66 @@ public void deleteComplete(int rc, Object ctx) {
}, null);
}

private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
lock.readLock().lock();
try {
MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo.newBuilder();
MLDataFormats.MessageRange.Builder messageRangeBuilder = MLDataFormats.MessageRange.newBuilder();
return individualDeletedMessages.asRanges().stream()
.limit(config.getMaxUnackedRangesToPersist())
.map(positionRange -> {
PositionImpl p = positionRange.lowerEndpoint();
nestedPositionBuilder.setLedgerId(p.getLedgerId());
nestedPositionBuilder.setEntryId(p.getEntryId());
messageRangeBuilder.setLowerEndpoint(nestedPositionBuilder.build());
p = positionRange.upperEndpoint();
nestedPositionBuilder.setLedgerId(p.getLedgerId());
nestedPositionBuilder.setEntryId(p.getEntryId());
messageRangeBuilder.setUpperEndpoint(nestedPositionBuilder.build());
return messageRangeBuilder.build();
})
.collect(Collectors.toList());
} finally {
lock.readLock().unlock();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we recycle below builders?

nestedPositionBuilder.recycle();
messageRangeBuilder.recycle();

and also should we document on the method that return List<MLDataFormats.MessageRange> should be recycled by client once client is done with it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ML protobuf classes are not generated with the custom protobuf, so they're not recyclable. I'd leave that for later, once we have completely phased out the text format.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's correct.

}

void persistPosition(final LedgerHandle lh, final PositionImpl position, final VoidCallback callback) {
PositionInfo pi = position.getPositionInfo();
PositionInfo.Builder builder = PositionInfo.newBuilder()
.setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId());
if (!individualDeletedMessages.isEmpty()) {
builder.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
}
PositionInfo pi = builder.build();
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} Appending to ledger={} position={}", ledger.getName(), name, lh.getId(),
position);
}

lh.asyncAddEntry(pi.toByteArray(), new AddCallback() {
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
if (rc == BKException.Code.OK) {
if (log.isDebugEnabled()) {
log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name,
position, lh.getId());
}
lh.asyncAddEntry(pi.toByteArray(), (rc, lh1, entryId, ctx) -> {
if (rc == BKException.Code.OK) {
if (log.isDebugEnabled()) {
log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name,
position, lh1.getId());
}

if (shouldCloseLedger(lh)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Need to create new metadata ledger for consumer {}", ledger.getName(),
name);
}
startCreatingNewMetadataLedger();
if (shouldCloseLedger(lh1)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Need to create new metadata ledger for consumer {}", ledger.getName(),
name);
}

callback.operationComplete();
} else {
log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name,
position, lh.getId(), BKException.getMessage(rc));
// If we've had a write error, the ledger will be automatically closed, we need to create a new one,
// in the meantime the mark-delete will be queued.
STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);
callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc)));
startCreatingNewMetadataLedger();
}

callback.operationComplete();
} else {
log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name,
position, lh1.getId(), BKException.getMessage(rc));
// If we've had a write error, the ledger will be automatically closed, we need to create a new one,
// in the meantime the mark-delete will be queued.
STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);
callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc)));
}
}, null);
}
Expand Down Expand Up @@ -2025,9 +2079,9 @@ void asyncDeleteCursorLedger() {
*/
private static boolean isBkErrorNotRecoverable(int rc) {
switch (rc) {
case Code.NoSuchLedgerExistsException:
case Code.ReadException:
case Code.LedgerRecoveryException:
case BKException.Code.NoSuchLedgerExistsException:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any reason changing it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason in this PR, will fix it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it's good to disambiguate between ZK & BK Code classes

case BKException.Code.ReadException:
case BKException.Code.LedgerRecoveryException:
return true;

default:
Expand Down
Loading