Skip to content

Commit

Permalink
[improve][broker] Create the cursor ledger lazily to improve the subs…
Browse files Browse the repository at this point in the history
…cribe performance (apache#16389)

* [feature][broker] Support create ledger cursor lazily

### Motivation

Provide a way to create the ledger cursor lazily. It can reduce the subscription creation time-consuming.
In the case of millions of topics, consumers can complete subscriptions more quickly.
After enabling this feature, the cursor ledger will create will be created during the message acknowledgment.
If there are no message acknowledgments happened on a subscription, the cursor ledger will not be created.

### Modification

Added new configuration to enable the cursor ledger lazy creation

```
# Whether to create the cursor ledger lazily when recovering a managed cursor backing a durable subscription.
# It can reduce the subscription creation time-consuming. In the case of millions of topics, consumers can complete
# subscriptions more quickly.
#
# After enabling this option, the cursor ledger will create will be created during the message acknowledgment.
# If there are no message acknowledgments happened on a subscription, the cursor ledger will not be created.

# Default is false.
managedLedgerLazyCursorLedgerCreationEnabled=false
```

* add doc

* Remove configuraiton

* Address comment.

* address comments

* address comments

* Fix test

* Fix test
  • Loading branch information
codelipenghui authored Jul 19, 2022
1 parent 47959b4 commit cccf252
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -646,19 +646,18 @@ void initialize(PositionImpl position, Map<String, Long> properties, Map<String,
log.debug("[{}] Consumer {} cursor initialized with counters: consumed {} mdPos {} rdPos {}",
ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
}

createNewMetadataLedger(new VoidCallback() {
@Override
public void operationComplete() {
STATE_UPDATER.set(ManagedCursorImpl.this, State.Open);
callback.operationComplete();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
callback.operationFailed(exception);
}
});
persistPositionMetaStore(cursorLedger != null ? cursorLedger.getId() : -1L, position, properties,
new MetaStoreCallback<>() {
@Override
public void operationComplete(Void result, Stat stat) {
STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
callback.operationComplete();
}
@Override
public void operationFailed(MetaStoreException e) {
callback.operationFailed(e);
}
}, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,11 @@ void markDeleteWithErrors() throws Exception {
ManagedCursor cursor = ledger.openCursor("c1");
ledger.addEntry("dummy-entry-1".getBytes(Encoding));
List<Entry> entries = cursor.readEntries(100);

stopBookKeeper();
assertEquals(entries.size(), 1);
cursor.markDelete(entries.get(0).getPosition());
ledger.addEntry("dummy-entry-2".getBytes(Encoding));
entries = cursor.readEntries(100);
stopBookKeeper();

// Mark-delete should succeed if BK is down
cursor.markDelete(entries.get(0).getPosition());
Expand Down Expand Up @@ -1338,9 +1340,11 @@ void errorRecoveringCursor2() throws Exception {
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);

bkc.failAfter(4, BKException.Code.MetadataVersionException);

ledger = factory2.open("my_test_ledger");
ManagedCursor cursor = ledger.openCursor("c1");
Position position = ledger.addEntry("test".getBytes());
try {
ledger = factory2.open("my_test_ledger");
cursor.markDelete(position);
fail("should have failed");
} catch (ManagedLedgerException e) {
// ok
Expand Down Expand Up @@ -3262,6 +3266,11 @@ public void testBatchIndexesDeletionPersistAndRecover() throws ManagedLedgerExce
deleteBatchIndex(cursor, positions[3], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));
deleteBatchIndex(cursor, positions[4], 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(9)));

ManagedCursor finalCursor = cursor;
Awaitility.await().untilAsserted(() -> {
assertEquals(finalCursor.getMarkDeletedPosition(), positions[4]);
});

cursor.close();
ledger.close();
ledger = factory.open("test_batch_indexes_deletion_persistent", managedLedgerConfig);
Expand Down Expand Up @@ -3533,10 +3542,11 @@ public void testFlushCursorAfterError() throws Exception {
positions.add(ledger1.addEntry(new byte[1024]));
}

c1.markDelete(positions.get(0));
Thread.sleep(3000);
// Simulate BK write error
bkc.failNow(BKException.Code.NotEnoughBookiesException);
metadataStore.setAlwaysFail(new MetadataStoreException.BadVersionException(""));

try {
c1.markDelete(positions.get(positions.size() - 1));
fail("should have failed");
Expand Down Expand Up @@ -3694,6 +3704,10 @@ public void testCursorNoRolloverIfNoMetadataSession() throws Exception {

cursor.delete(positions.get(0));

Awaitility.await().untilAsserted(() -> {
assertEquals(cursor.getMarkDeletedPosition(), positions.get(0));
});

long initialLedgerId = cursor.getCursorLedger();

metadataStore.triggerSessionEvent(SessionEvent.SessionLost);
Expand All @@ -3702,6 +3716,10 @@ public void testCursorNoRolloverIfNoMetadataSession() throws Exception {
cursor.delete(positions.get(i));
}

Awaitility.await().untilAsserted(() -> {
assertEquals(cursor.getMarkDeletedPosition(), positions.get(positions.size() - 1));
});

assertEquals(cursor.getCursorLedger(), initialLedgerId);

// After the session gets reestablished, the rollover should restart
Expand Down Expand Up @@ -3812,5 +3830,65 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
Arrays.asList(1, 1, 1));
}

@Test
public void testLazyCursorLedgerCreation() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory
.open("testLazyCursorLedgerCreation", managedLedgerConfig);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test");
assertEquals(cursor.getState(), "NoLedger");
assertEquals(cursor.getMarkDeletedPosition(), ledger.getLastPosition());
Position lastPosition = null;
for (int i = 0; i < 10; i++) {
lastPosition = ledger.addEntry("test".getBytes(Encoding));
}
cursor.markDelete(lastPosition);
Position finalLastPosition = lastPosition;
Awaitility.await().untilAsserted(() -> {
assertEquals(cursor.getState(), "Open");
assertEquals(cursor.getMarkDeletedPosition(), finalLastPosition);
assertEquals(cursor.getPersistentMarkDeletedPosition(), finalLastPosition);
});

// Make sure the recovered mark delete position is correct.
cursor.close();
ledger.close();
ledger = (ManagedLedgerImpl) factory
.open("testLazyCursorLedgerCreation", managedLedgerConfig);
ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger.openCursor("test");
assertEquals(cursor1.getState(), "NoLedger");
assertEquals(cursor1.getMarkDeletedPosition(), finalLastPosition);

// Verify the recovered cursor can work with new mark delete.
lastPosition = null;
for (int i = 0; i < 10; i++) {
lastPosition = ledger.addEntry("test".getBytes(Encoding));
}
cursor1.markDelete(lastPosition);
Position finalLastPosition2 = lastPosition;
Awaitility.await().untilAsserted(() -> {
assertEquals(cursor1.getState(), "Open");
assertEquals(cursor1.getMarkDeletedPosition(), finalLastPosition2);
assertEquals(cursor1.getPersistentMarkDeletedPosition(), finalLastPosition2);
});
cursor1.close();
ledger.close();
}

@Test
public void testLazyCursorLedgerCreationForSubscriptionCreation() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testLazyCursorLedgerCreation", managedLedgerConfig);
Position p1 = ledger.addEntry("test".getBytes());
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test");
assertEquals(cursor.getMarkDeletedPosition(), p1);
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ledger = (ManagedLedgerImpl) factory2.open("testLazyCursorLedgerCreation", managedLedgerConfig);
assertNotNull(ledger.getCursors().get("test"));
ManagedCursorImpl cursor1 = (ManagedCursorImpl) ledger.openCursor("test");
assertEquals(cursor1.getMarkDeletedPosition(), p1);
factory2.shutdown();
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void removingCursor() throws Exception {
ledger.deleteCursor("c1");

assertFalse(metadataStore.exists("/managed-ledgers/my_test_ledger/c1").join());
assertEquals(bkc.getLedgers().size(), 2);
assertEquals(bkc.getLedgers().size(), 1);
}

@Test
Expand Down Expand Up @@ -491,15 +491,16 @@ public void recoverAfterMarkDeleteError() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
ManagedCursor cursor = ledger.openCursor("my-cursor");
Position position = ledger.addEntry("entry".getBytes());

Position position1 = ledger.addEntry("entry".getBytes());
cursor.markDelete(position);
bkc.failNow(BKException.Code.BookieHandleNotAvailableException);
metadataStore.failConditional(new MetadataStoreException("error"), (op, path) ->
path.equals("/managed-ledgers/my_test_ledger/my-cursor")
&& op == FaultInjectionMetadataStore.OperationType.PUT
);

try {
cursor.markDelete(position);
cursor.markDelete(position1);
fail("should fail");
} catch (ManagedLedgerException e) {
// ok
Expand All @@ -509,7 +510,7 @@ public void recoverAfterMarkDeleteError() throws Exception {
Thread.sleep(100);

// Next markDelete should succeed
cursor.markDelete(position);
cursor.markDelete(position1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public void testGetManagedLedgerInfoWithClose() throws Exception {
assertEquals(info.ledgers.size(), 4);

assertEquals(info.ledgers.get(0).ledgerId, 3);
assertEquals(info.ledgers.get(1).ledgerId, 5);
assertEquals(info.ledgers.get(2).ledgerId, 6);
assertEquals(info.ledgers.get(3).ledgerId, 7);
assertEquals(info.ledgers.get(1).ledgerId, 4);
assertEquals(info.ledgers.get(2).ledgerId, 5);
assertEquals(info.ledgers.get(3).ledgerId, 6);

assertEquals(info.cursors.size(), 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1632,7 +1632,7 @@ public void testCleanup() throws Exception {
ledger.openCursor("c1");

ledger.addEntry("data".getBytes(Encoding));
assertEquals(bkc.getLedgers().size(), 2);
assertEquals(bkc.getLedgers().size(), 1);

ledger.delete();
assertEquals(bkc.getLedgers().size(), 0);
Expand All @@ -1644,7 +1644,7 @@ public void testAsyncCleanup() throws Exception {
ledger.openCursor("c1");

ledger.addEntry("data".getBytes(Encoding));
assertEquals(bkc.getLedgers().size(), 2);
assertEquals(bkc.getLedgers().size(), 1);

final CountDownLatch latch = new CountDownLatch(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,15 @@ void markDeleteWithErrors() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
ManagedCursor cursor = ledger.openCursor("c1");
ledger.addEntry("dummy-entry-1".getBytes(Encoding));
ledger.addEntry("dummy-entry-2".getBytes(Encoding));
List<Entry> entries = cursor.readEntries(100);
assertEquals(entries.size(), 2);
cursor.markDelete(entries.get(0).getPosition());

stopBookKeeper();
assertEquals(entries.size(), 1);

// Mark-delete should succeed if BK is down
cursor.markDelete(entries.get(0).getPosition());
cursor.markDelete(entries.get(1).getPosition());

entries.forEach(Entry::release);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,9 +712,11 @@ public void testCheckPositionInPendingAckState() throws Exception {
messageId.getLedgerId(), messageId.getEntryId(), null);
assertEquals(result.state, PositionInPendingAckStats.State.PendingAck);
transaction.commit().get();
result = admin.transactions().getPositionStatsInPendingAck(topic, subName,
messageId.getLedgerId(), messageId.getEntryId(), null);
assertEquals(result.state, PositionInPendingAckStats.State.MarkDelete);
Awaitility.await().untilAsserted(() -> {
PositionInPendingAckStats r = admin.transactions().getPositionStatsInPendingAck(topic, subName,
messageId.getLedgerId(), messageId.getEntryId(), null);
assertEquals(r.state, PositionInPendingAckStats.State.MarkDelete);
});
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,16 @@ public void testManagedCursorMetrics() throws Exception {
.isAckReceiptEnabled(true)
.subscribe();


@Cleanup
Producer<byte[]> producer = this.pulsarClient.newProducer()
.topic(topicName)
.create();

producer.send("trigger-cursor-ledger-creation".getBytes());
// Trigger the cursor ledger creation
consumer.acknowledge(consumer.receive().getMessageId());

for(PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) {
ledgerHandle.close();
}
Expand Down Expand Up @@ -160,12 +165,12 @@ public void testCursorReadWriteMetrics() throws Exception {
}
metricsList = metrics.generate();
Assert.assertEquals(metricsList.size(), 2);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L);
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 0L);
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);

Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L);
Assert.assertNotEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 0L);
Assert.assertNotEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 0L);
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,13 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1053,4 +1055,24 @@ public void testActiveConsumerCleanup() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testManagedLedgerLazyCursorLedgerCreation() throws Exception {
String topic = "persistent://my-property/my-ns/testManagedLedgerLazyCursorLedgerCreationEnabled";
String sub = "my-subscriber-name";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(sub).subscribe();
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
assertEquals(stats.cursors.get(sub).state, "NoLedger");
producer.send("test".getBytes(UTF_8));
consumer.acknowledgeCumulative(consumer.receive());
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats1 = admin.topics().getInternalStats(topic);
assertEquals(stats1.cursors.get(sub).state, "Open");
assertEquals(stats1.lastConfirmedEntry, stats1.cursors.get(sub).markDeletePosition);
});
}

}

0 comments on commit cccf252

Please sign in to comment.