Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][test] fix flaky test testRecoverSequenceId #18437

Merged
merged 3 commits into from
Nov 21, 2022
Merged

[fix][test] fix flaky test testRecoverSequenceId #18437

merged 3 commits into from
Nov 21, 2022

Conversation

liangyepianzhou
Copy link
Contributor

@liangyepianzhou liangyepianzhou commented Nov 11, 2022

Fixes: #18396

Motivation

The logic we expected:

  1. Set maxEntriesPerLedger = 3
  2. Write three messages into MlTransactionLog, such as [3:0] [3:1] [3:2]
  3. Call rollCurrentLedgerIfFull to close the old ledger and create a new ledger
  4. ManagedLedger call maybeUpdateCursorBeforeTrimmingConsumedLedger in the createComplete of ManagedLedger. The cursor position of the transaction log will be update to the earliest position in the new ledger, such as [5: -1]
    public void maybeUpdateCursorBeforeTrimmingConsumedLedger() {
    for (ManagedCursor cursor : cursors) {
    PositionImpl lastAckedPosition = (PositionImpl) cursor.getMarkDeletedPosition();
    LedgerInfo currPointedLedger = ledgers.get(lastAckedPosition.getLedgerId());
    LedgerInfo nextPointedLedger = Optional.ofNullable(ledgers.higherEntry(lastAckedPosition.getLedgerId()))
    .map(Map.Entry::getValue).orElse(null);
    if (currPointedLedger != null) {
    if (nextPointedLedger != null) {
    if (lastAckedPosition.getEntryId() != -1
    && lastAckedPosition.getEntryId() + 1 >= currPointedLedger.getEntries()) {
    lastAckedPosition = new PositionImpl(nextPointedLedger.getLedgerId(), -1);
    }
    } else {
    log.debug("No need to reset cursor: {}, current ledger is the last ledger.", cursor);
    }
    } else {
    log.warn("Cursor: {} does not exist in the managed-ledger.", cursor);
    }
    if (!lastAckedPosition.equals(cursor.getMarkDeletedPosition())) {
    try {
    log.info("Reset cursor:{} to {} since ledger consumed completely", cursor, lastAckedPosition);
    onCursorMarkDeletePositionUpdated((ManagedCursorImpl) cursor, lastAckedPosition);
    } catch (Exception e) {
    log.warn("Failed to reset cursor: {} from {} to {}. Trimming thread will retry next time.",
    cursor, cursor.getMarkDeletedPosition(), lastAckedPosition);
    log.warn("Caused by", e);
    }
    }
    }
    }
  5. The expired ledger (ledgerId = 3) will be deleted in internalTrimLedgers.
    long slowestReaderLedgerId = -1;
    if (!cursors.hasDurableCursors()) {
    // At this point the lastLedger will be pointing to the
    // ledger that has just been closed, therefore the +1 to
    // include lastLedger in the trimming.
    slowestReaderLedgerId = currentLedger.getId() + 1;
    } else {
    PositionImpl slowestReaderPosition = cursors.getSlowestReaderPosition();
    if (slowestReaderPosition != null) {
    slowestReaderLedgerId = slowestReaderPosition.getLedgerId();
    } else {
    promise.completeExceptionally(new ManagedLedgerException("Couldn't find reader position"));
    trimmerMutex.unlock();
    return;
    }
    }
    if (log.isDebugEnabled()) {
    log.debug("[{}] Slowest consumer ledger id: {}", name, slowestReaderLedgerId);
    }
    long totalSizeToDelete = 0;
    // skip ledger if retention constraint met
    for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) {
    // currentLedger can not be deleted
    if (ls.getLedgerId() == currentLedger.getId()) {
    if (log.isDebugEnabled()) {
    log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
    ls.getLedgerId());
    }
    break;
    }
    // if truncate, all ledgers besides currentLedger are going to be deleted
    if (isTruncate) {
    if (log.isDebugEnabled()) {
    log.debug("[{}] Ledger {} will be truncated with ts {}",
    name, ls.getLedgerId(), ls.getTimestamp());
    }
    ledgersToDelete.add(ls);
    continue;
    }
    totalSizeToDelete += ls.getSize();
    boolean overRetentionQuota = isLedgerRetentionOverSizeQuota(totalSizeToDelete);
    boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
    if (log.isDebugEnabled()) {
    log.debug(
    "[{}] Checking ledger {} -- time-old: {} sec -- "
    + "expired: {} -- over-quota: {} -- current-ledger: {}",
    name, ls.getLedgerId(), (clock.millis() - ls.getTimestamp()) / 1000.0, expired,
    overRetentionQuota, currentLedger.getId());
    }
    if (expired || overRetentionQuota) {
    if (log.isDebugEnabled()) {
    log.debug("[{}] Ledger {} has expired or over quota, expired is: {}, ts: {}, "
    + "overRetentionQuota is: {}, ledge size: {}",
    name, ls.getLedgerId(), expired, ls.getTimestamp(), overRetentionQuota, ls.getSize());
    }
    ledgersToDelete.add(ls);

But there is a scheduled task to do flushCursors.

this.flushCursorsTask = scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),
config.getCursorPositionFlushSeconds(), config.getCursorPositionFlushSeconds(), TimeUnit.SECONDS);

Which will call internalAsyncMarkDelete and then update the cursor position to [3:2].

// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
isDirty = true;
updateLastMarkDeleteEntryToLatest(newPosition, properties);
callback.markDeleteComplete(ctx);
return;
}
internalAsyncMarkDelete(newPosition, properties, callback, ctx);
}

ledger.onCursorMarkDeletePositionUpdated(ManagedCursorImpl.this, mdEntry.newPosition);

And then the expired ledger (ledger id = 3) can not be deleted in internalTrimLedgers.

long slowestReaderLedgerId = -1;
if (!cursors.hasDurableCursors()) {
// At this point the lastLedger will be pointing to the
// ledger that has just been closed, therefore the +1 to
// include lastLedger in the trimming.
slowestReaderLedgerId = currentLedger.getId() + 1;
} else {
PositionImpl slowestReaderPosition = cursors.getSlowestReaderPosition();
if (slowestReaderPosition != null) {
slowestReaderLedgerId = slowestReaderPosition.getLedgerId();
} else {
promise.completeExceptionally(new ManagedLedgerException("Couldn't find reader position"));
trimmerMutex.unlock();
return;
}
}
if (log.isDebugEnabled()) {
log.debug("[{}] Slowest consumer ledger id: {}", name, slowestReaderLedgerId);
}
long totalSizeToDelete = 0;
// skip ledger if retention constraint met
for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) {

Modifications

There is a markDeleteLimiter.tryAcquire() called before internalAsyncMarkDelete, we can mock it to make it aways return false.

// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
isDirty = true;
updateLastMarkDeleteEntryToLatest(newPosition, properties);
callback.markDeleteComplete(ctx);
return;
}
internalAsyncMarkDelete(newPosition, properties, callback, ctx);
}

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: liangyepianzhou#10

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Nov 11, 2022
@codecov-commenter
Copy link

codecov-commenter commented Nov 11, 2022

Codecov Report

Merging #18437 (4edf80b) into master (2e878e8) will increase coverage by 4.95%.
The diff coverage is n/a.

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #18437      +/-   ##
============================================
+ Coverage     40.04%   45.00%   +4.95%     
- Complexity     8625    10610    +1985     
============================================
  Files           687      757      +70     
  Lines         67436    72781    +5345     
  Branches       7221     7817     +596     
============================================
+ Hits          27007    32756    +5749     
+ Misses        37411    36346    -1065     
- Partials       3018     3679     +661     
Flag Coverage Δ
unittests 45.00% <ø> (+4.95%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
.../transaction/buffer/metadata/AbortTxnMetadata.java 28.57% <0.00%> (-57.15%) ⬇️
...rvice/schema/KeyValueSchemaCompatibilityCheck.java 21.62% <0.00%> (-45.95%) ⬇️
...ion/buffer/metadata/TransactionBufferSnapshot.java 42.85% <0.00%> (-42.86%) ⬇️
...ker/service/schema/exceptions/SchemaException.java 60.00% <0.00%> (-40.00%) ⬇️
...saction/timeout/TransactionTimeoutTrackerImpl.java 50.87% <0.00%> (-36.85%) ⬇️
...er/impl/SingleSnapshotAbortedTxnProcessorImpl.java 51.19% <0.00%> (-29.77%) ⬇️
...nsaction/pendingack/impl/PendingAckHandleImpl.java 36.76% <0.00%> (-15.45%) ⬇️
...ar/broker/transaction/util/LogIndexLagBackoff.java 42.85% <0.00%> (-14.29%) ⬇️
...ransaction/buffer/impl/TopicTransactionBuffer.java 45.05% <0.00%> (-13.67%) ⬇️
...ction/buffer/impl/TransactionBufferClientImpl.java 65.11% <0.00%> (-11.63%) ⬇️
... and 295 more

Copy link
Contributor

@labuladong labuladong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@poorbarcode poorbarcode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion add doc for this test case

/**
* Determine MLTransactionSequenceIdGenerator can recover the latest transaction id after restart.
* @isUseManagedLedgerProperties If true means that the last transaction id will be recovered by `ManagedLedger.prop`.
* @isUseManagedLedgerProperties  If false means that the last transaction id will be recovered by the last transaction log in ledger
*/
@Test(dataProvider = "isUseManagedLedgerProperties")
    public void testRecoverSequenceId(boolean isUseManagedLedgerProperties) throws Exception {

}

@Technoboy- Technoboy- merged commit 6e37d10 into apache:master Nov 21, 2022
lifepuzzlefun pushed a commit to lifepuzzlefun/pulsar that referenced this pull request Dec 9, 2022
lifepuzzlefun pushed a commit to lifepuzzlefun/pulsar that referenced this pull request Jan 10, 2023
tisonkun pushed a commit to tisonkun/pulsar that referenced this pull request Jul 12, 2023
…ting module to flink-connector-test-utils module

This closes apache#18437.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs ready-to-test type/flaky-tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Flaky-test: MLTransactionMetadataStoreTest#testRecoverSequenceId
5 participants