Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transaction] Transaction buffer stable position and lowWaterMark implementation. #9195

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,11 @@ enum IndividualDeletedEntries {
* callback object
* @param ctx
* opaque context
* @param maxPosition
* max position can read
*/
void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx);
void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition);

/**
* Get 'N'th entry from the mark delete position in the cursor without updating any cursor positions.
Expand Down Expand Up @@ -174,8 +177,11 @@ List<Entry> readEntriesOrWait(int maxEntries, long maxSizeBytes)
* callback object
* @param ctx
* opaque context
* @param maxPosition
* max position can read
*/
void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx);
void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition);

/**
* Asynchronously read entries from the ManagedLedger, up to the specified number and size.
Expand All @@ -192,13 +198,16 @@ List<Entry> readEntriesOrWait(int maxEntries, long maxSizeBytes)
* callback object
* @param ctx
* opaque context
* @param maxPosition
* max position can read
*/
void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx);
void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition);

/**
* Cancel a previously scheduled asyncReadEntriesOrWait operation.
*
* @see #asyncReadEntriesOrWait(int, ReadEntriesCallback, Object)
* @see #asyncReadEntriesOrWait(int, ReadEntriesCallback, Object, PositionImpl)
* @return true if the read operation was canceled or false if there was no pending operation
*/
boolean cancelPendingReadRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ public interface ReadOnlyCursor {
* @param numberOfEntriesToRead maximum number of entries to return
* @param callback callback object
* @param ctx opaque context
* @param maxPosition max position can read
* @see #readEntries(int)
*/
void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx);
void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition);

/**
* Get the read position. This points to the next message to be read from the cursor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
counter.countDown();
}

}, null);
}, null, PositionImpl.latest);

counter.await();

Expand All @@ -560,15 +560,16 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

@Override
public void asyncReadEntries(final int numberOfEntriesToRead, final ReadEntriesCallback callback,
final Object ctx) {
final Object ctx, PositionImpl maxPosition) {
checkArgument(numberOfEntriesToRead > 0);
if (isClosed()) {
callback.readEntriesFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
return;
}

PENDING_READ_OPS_UPDATER.incrementAndGet(this);
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, ctx);
OpReadEntry op = OpReadEntry.create(this, readPosition,
numberOfEntriesToRead, callback, ctx, maxPosition);
ledger.asyncReadEntries(op);
}

Expand Down Expand Up @@ -669,7 +670,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
counter.countDown();
}

}, null);
}, null, PositionImpl.latest);

counter.await();

Expand All @@ -681,12 +682,14 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
}

@Override
public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx) {
asyncReadEntriesOrWait(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx);
public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition) {
asyncReadEntriesOrWait(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx, maxPosition);
}

@Override
public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx) {
public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition) {
checkArgument(maxEntries > 0);
if (isClosed()) {
callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
Expand All @@ -700,10 +703,10 @@ public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntrie
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Read entries immediately", ledger.getName(), name);
}
asyncReadEntries(numberOfEntriesToRead, callback, ctx);
asyncReadEntries(numberOfEntriesToRead, callback, ctx, maxPosition);
} else {
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,
ctx);
ctx, maxPosition);

if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
callback.readEntriesFailed(new ManagedLedgerException("We can only have a single waiting callback"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1774,6 +1774,11 @@ public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Ob
}

private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) {

if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maxPostion of the OpReadEntry might be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it not be null

opReadEntry.checkReadCompletion();
return;
}
// Perform the read
long firstEntry = opReadEntry.readPosition.getEntryId();
long lastEntryInLedger;
Expand All @@ -1789,6 +1794,11 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
lastEntryInLedger = ledger.getLastAddConfirmed();
}

// can read max position entryId
if (ledger.getId() == opReadEntry.maxPosition.getLedgerId()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the above comment

lastEntryInLedger = min(opReadEntry.maxPosition.getEntryId(), lastEntryInLedger);
}

if (firstEntry > lastEntryInLedger) {
if (log.isDebugEnabled()) {
log.debug("[{}] No more messages to read from ledger={} lastEntry={} readEntry={}", name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,20 @@ class OpReadEntry implements ReadEntriesCallback {
// Results
private List<Entry> entries;
private PositionImpl nextReadPosition;
PositionImpl maxPosition;

public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count,
ReadEntriesCallback callback, Object ctx) {
ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition) {
OpReadEntry op = RECYCLER.get();
op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);
op.cursor = cursor;
op.count = count;
op.callback = callback;
op.entries = Lists.newArrayList();
if (maxPosition == null) {
maxPosition = PositionImpl.latest;
}
op.maxPosition = maxPosition;
op.ctx = ctx;
op.nextReadPosition = PositionImpl.get(op.readPosition);
return op;
Expand Down Expand Up @@ -131,7 +136,8 @@ void updateReadPosition(Position newReadPosition) {
}

void checkReadCompletion() {
if (entries.size() < count && cursor.hasMoreEntries()) {
if (entries.size() < count && cursor.hasMoreEntries() &&
((PositionImpl) cursor.getReadPosition()).compareTo(maxPosition) < 0) {
// We still have more entries to read from the next ledger, schedule a new async operation
if (nextReadPosition.getLedgerId() != readPosition.getLedgerId()) {
cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
Expand Down Expand Up @@ -184,6 +190,7 @@ public void recycle() {
ctx = null;
entries = null;
nextReadPosition = null;
maxPosition = null;
recyclerHandle.recycle(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public List<Entry> readEntries(int numberOfEntriesToRead) throws ManagedLedgerEx
}

@Override
public void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx) {
public void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition) {
callback.readEntriesComplete(null, ctx);
}

Expand Down Expand Up @@ -254,7 +255,14 @@ public List<Entry> readEntriesOrWait(int numberOfEntriesToRead)
}

@Override
public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx) {
public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition) {
}

@Override
public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition) {

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -379,7 +380,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
fail(exception.getMessage());
}

}, null);
}, null, PositionImpl.latest);

counter.await();
}
Expand Down Expand Up @@ -407,7 +408,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
fail("async-call should not have failed");
}

}, null);
}, null, PositionImpl.latest);

counter.await();

Expand All @@ -429,7 +430,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
counter2.countDown();
}

}, null);
}, null, PositionImpl.latest);

counter2.await();
}
Expand All @@ -456,7 +457,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
counter.countDown();
}

}, null);
}, null, PositionImpl.latest);

counter.await();
}
Expand Down Expand Up @@ -1673,7 +1674,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
log.error("Error reading", exception);
}
}, null);
}, null, PositionImpl.latest);
}

ledger.addEntry("test".getBytes());
Expand Down Expand Up @@ -2549,7 +2550,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
counter.countDown();
}
}, null);
}, null, PositionImpl.latest);

assertTrue(c1.cancelPendingReadRequest());

Expand All @@ -2565,7 +2566,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
counter2.countDown();
}
}, null);
}, null, PositionImpl.latest);

ledger.addEntry("entry-1".getBytes(Encoding));

Expand Down Expand Up @@ -3253,6 +3254,56 @@ public void testReadEntriesOrWaitWithMaxSize() throws Exception {
entries.forEach(e -> e.release());
}

@Test
public void testReadEntriesOrWaitWithMaxPosition() throws Exception {
int readMaxNumber = 10;
int sendNumber = 20;
ManagedLedger ledger = factory.open("testReadEntriesOrWaitWithMaxPosition");
ManagedCursor c = ledger.openCursor("c");
Position position = PositionImpl.earliest;
Position maxCanReadPosition = PositionImpl.earliest;
for (int i = 0; i < sendNumber; i++) {
if (i == readMaxNumber - 1) {
position = ledger.addEntry(new byte[1024]);
} else if (i == sendNumber - 1) {
maxCanReadPosition = ledger.addEntry(new byte[1024]);
} else {
ledger.addEntry(new byte[1024]);
}

}
CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
c.asyncReadEntriesOrWait(sendNumber, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
completableFuture.complete(entries.size());
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
completableFuture.completeExceptionally(exception);
}
}, null, (PositionImpl) position);

int number = completableFuture.get();
assertEquals(number, readMaxNumber);

c.asyncReadEntriesOrWait(sendNumber, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
completableFuture.complete(entries.size());
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
completableFuture.completeExceptionally(exception);
}
}, null, (PositionImpl) maxCanReadPosition);

assertEquals(number, sendNumber - readMaxNumber);

}

@Test
public void testFlushCursorAfterInactivity() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
fail(exception.getMessage());
}
}, cursor);
}, cursor, PositionImpl.latest);
}

@Override
Expand Down Expand Up @@ -2557,7 +2557,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
responseException2.set(exception);
}

}, null);
}, null, PositionImpl.latest);
ledger.asyncReadEntry(ledgerHandle, PositionImpl.earliest.getEntryId(), PositionImpl.earliest.getEntryId(),
false, opReadEntry, ctxStr);
retryStrategically((test) -> {
Expand Down
Loading