Skip to content

Commit

Permalink
[issue #3975] Bugfix NPE on non durable consumer (#3988)
Browse files Browse the repository at this point in the history
*Motivation*

Trying to fix #3975

When a reset of a cursor is performed with some timestamp on a non-durable
consumer the message finder will fail with null pointer exception due to
`cursor.getName()` being null.

*Modifications*

  - Add method overloading for `newNonDurableCursor()` with subscription name.
  - Fix method getNonDurableSubscription to call `newNonDurableCursor()` with
    proper subscription name
  - Add test to assert issue.
  • Loading branch information
lovelle authored and merlimat committed May 19, 2019
1 parent 6a8d838 commit 5031142
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ public interface ManagedLedger {
* @return the new NonDurableCursor
*/
ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException;
ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException;

/**
* Delete a ManagedCursor asynchronously.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,16 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws Ma
return new NonDurableCursorImpl(bookKeeper, config, this, null, (PositionImpl) startCursorPosition);
}

@Override
public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName)
throws ManagedLedgerException {
checkManagedLedgerIsOpen();
checkFenced();

return new NonDurableCursorImpl(bookKeeper, config, this, cursorName,
(PositionImpl) startCursorPosition);
}

@Override
public Iterable<ManagedCursor> getCursors() {
return cursors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri
Position startPosition = new PositionImpl(ledgerId, entryId);
ManagedCursor cursor = null;
try {
cursor = ledger.newNonDurableCursor(startPosition);
cursor = ledger.newNonDurableCursor(startPosition, subscriptionName);
} catch (ManagedLedgerException e) {
subscriptionFuture.completeExceptionally(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -479,7 +481,7 @@ public void testMessageAvailableAfterRestart() throws Exception {
assertTrue(reader.hasMessageAvailable());

String readOut = new String(reader.readNext().getData());
assertTrue(readOut.equals(content));
assertEquals(content, readOut);
assertFalse(reader.hasMessageAvailable());
}

Expand Down

0 comments on commit 5031142

Please sign in to comment.