Skip to content

Commit e5f6a0b

Browse files
committed
Merge branch 'master' into client-brokerentrymetadata
2 parents 8be37b1 + efe2d3c commit e5f6a0b

File tree

70 files changed

+3250
-536
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+3250
-536
lines changed

docker/pulsar-standalone/Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ FROM apachepulsar/pulsar-dashboard:latest as dashboard
2626
# Restart from
2727
FROM ubuntu:20.04
2828

29+
ARG DEBIAN_FRONTEND=noninteractive
30+
2931
# Note that the libpq-dev package is needed here in order to install
3032
# the required python psycopg2 package (for postgresql) later
3133
RUN apt-get update \

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorMXBean.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@ public interface ManagedCursorMXBean {
3939
String getLedgerName();
4040

4141
/**
42-
* persist cursor by ledger
42+
* persist cursor by ledger.
4343
* @param success
4444
*/
4545
void persistToLedger(boolean success);
4646

4747
/**
48-
* persist cursor by zookeeper
48+
* persist cursor by zookeeper.
4949
* @param success
5050
*/
5151
void persistToZookeeper(boolean success);
@@ -70,4 +70,34 @@ public interface ManagedCursorMXBean {
7070
*/
7171
long getPersistZookeeperErrors();
7272

73+
/**
74+
* Add write data to a ledger of a cursor (in bytes).
75+
* This will update writeCursorLedgerLogicalSize and writeCursorLedgerSize.
76+
*
77+
* @param size Size of data written to cursor (in bytes)
78+
*/
79+
void addWriteCursorLedgerSize(long size);
80+
81+
/**
82+
* Add read data from a ledger of a cursor (in bytes).
83+
*
84+
* @param size Size of data read from cursor (in bytes)
85+
*/
86+
void addReadCursorLedgerSize(long size);
87+
88+
/**
89+
* @return the size of data written to cursor (in bytes)
90+
*/
91+
long getWriteCursorLedgerSize();
92+
93+
/**
94+
* @return the size of data written to cursor without replicas (in bytes)
95+
*/
96+
long getWriteCursorLedgerLogicalSize();
97+
98+
/**
99+
* @return the size of data read from cursor (in bytes)
100+
*/
101+
long getReadCursorLedgerSize();
102+
73103
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.bookkeeper.mledger;
2020

21+
import java.util.concurrent.CompletableFuture;
2122
import java.util.function.Supplier;
2223

2324
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
@@ -160,4 +161,14 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M
160161
*/
161162
void shutdown() throws InterruptedException, ManagedLedgerException;
162163

164+
/**
165+
* Check managed ledger has been initialized before.
166+
*
167+
* @param ledgerName {@link String}
168+
* @return a future represents the result of the operation.
169+
* an instance of {@link Boolean} is returned
170+
* if the operation succeeds.
171+
*/
172+
CompletableFuture<Boolean> asyncExists(String ledgerName);
173+
163174
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
421421
}
422422

423423
LedgerEntry entry = seq.nextElement();
424+
mbean.addReadCursorLedgerSize(entry.getLength());
424425
PositionInfo positionInfo;
425426
try {
426427
positionInfo = PositionInfo.parseFrom(entry.getEntry());
@@ -2599,7 +2600,8 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
25992600
}
26002601

26012602
checkNotNull(lh);
2602-
lh.asyncAddEntry(pi.toByteArray(), (rc, lh1, entryId, ctx) -> {
2603+
byte[] data = pi.toByteArray();
2604+
lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> {
26032605
if (rc == BKException.Code.OK) {
26042606
if (log.isDebugEnabled()) {
26052607
log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name, position,
@@ -2614,6 +2616,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
26142616
}
26152617

26162618
mbean.persistToLedger(true);
2619+
mbean.addWriteCursorLedgerSize(data.length);
26172620
callback.operationComplete();
26182621
} else {
26192622
log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name,

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ public class ManagedCursorMXBeanImpl implements ManagedCursorMXBean {
3030
private final LongAdder persistZookeeperSucceed = new LongAdder();
3131
private final LongAdder persistZookeeperFailed = new LongAdder();
3232

33+
private final LongAdder writeCursorLedgerSize = new LongAdder();
34+
private final LongAdder writeCursorLedgerLogicalSize = new LongAdder();
35+
private final LongAdder readCursorLedgerSize = new LongAdder();
36+
3337
private final ManagedCursor managedCursor;
3438

3539
public ManagedCursorMXBeanImpl(ManagedCursor managedCursor) {
@@ -83,4 +87,30 @@ public long getPersistZookeeperSucceed() {
8387
public long getPersistZookeeperErrors() {
8488
return persistZookeeperFailed.longValue();
8589
}
90+
91+
@Override
92+
public void addWriteCursorLedgerSize(final long size) {
93+
writeCursorLedgerSize.add(size * ((ManagedCursorImpl) managedCursor).config.getWriteQuorumSize());
94+
writeCursorLedgerLogicalSize.add(size);
95+
}
96+
97+
@Override
98+
public void addReadCursorLedgerSize(final long size) {
99+
readCursorLedgerSize.add(size);
100+
}
101+
102+
@Override
103+
public long getWriteCursorLedgerSize() {
104+
return writeCursorLedgerSize.longValue();
105+
}
106+
107+
@Override
108+
public long getWriteCursorLedgerLogicalSize() {
109+
return writeCursorLedgerLogicalSize.longValue();
110+
}
111+
112+
@Override
113+
public long getReadCursorLedgerSize() {
114+
return readCursorLedgerSize.longValue();
115+
}
86116
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,11 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
522522
entryCacheManager.clear();
523523
}
524524

525+
@Override
526+
public CompletableFuture<Boolean> asyncExists(String ledgerName) {
527+
return store.asyncExists(ledgerName);
528+
}
529+
525530
@Override
526531
public ManagedLedgerInfo getManagedLedgerInfo(String name) throws InterruptedException, ManagedLedgerException {
527532
class Result {

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.Queue;
4949
import java.util.Random;
5050
import java.util.Set;
51+
import java.util.TreeMap;
5152
import java.util.UUID;
5253
import java.util.concurrent.CompletableFuture;
5354
import java.util.concurrent.CompletionException;
@@ -128,6 +129,7 @@
128129
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
129130
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
130131
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
132+
import org.apache.pulsar.common.policies.data.OffloadPolicies;
131133
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
132134
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
133135
import org.apache.pulsar.common.util.DateFormatter;
@@ -2304,19 +2306,11 @@ private boolean isLedgerRetentionOverSizeQuota(long sizeToDelete) {
23042306
&& TOTAL_SIZE_UPDATER.get(this) - sizeToDelete >= config.getRetentionSizeInMB() * MegaByte;
23052307
}
23062308

2307-
private boolean isOffloadedNeedsDelete(OffloadContext offload) {
2309+
boolean isOffloadedNeedsDelete(OffloadContext offload, Optional<OffloadPolicies> offloadPolicies) {
23082310
long elapsedMs = clock.millis() - offload.getTimestamp();
2309-
2310-
if (config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
2311-
&& config.getLedgerOffloader().getOffloadPolicies() != null
2312-
&& config.getLedgerOffloader().getOffloadPolicies()
2313-
.getManagedLedgerOffloadDeletionLagInMillis() != null) {
2314-
return offload.getComplete() && !offload.getBookkeeperDeleted()
2315-
&& elapsedMs > config.getLedgerOffloader()
2316-
.getOffloadPolicies().getManagedLedgerOffloadDeletionLagInMillis();
2317-
} else {
2318-
return false;
2319-
}
2311+
return offloadPolicies.filter(policies -> offload.getComplete() && !offload.getBookkeeperDeleted()
2312+
&& policies.getManagedLedgerOffloadDeletionLagInMillis() != null
2313+
&& elapsedMs > policies.getManagedLedgerOffloadDeletionLagInMillis()).isPresent();
23202314
}
23212315

23222316
/**
@@ -2337,6 +2331,10 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
23372331

23382332
List<LedgerInfo> ledgersToDelete = Lists.newArrayList();
23392333
List<LedgerInfo> offloadedLedgersToDelete = Lists.newArrayList();
2334+
Optional<OffloadPolicies> optionalOffloadPolicies = Optional.ofNullable(config.getLedgerOffloader() != null
2335+
&& config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE
2336+
? config.getLedgerOffloader().getOffloadPolicies()
2337+
: null);
23402338
synchronized (this) {
23412339
if (log.isDebugEnabled()) {
23422340
log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(),
@@ -2420,7 +2418,8 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
24202418
}
24212419

24222420
for (LedgerInfo ls : ledgers.values()) {
2423-
if (isOffloadedNeedsDelete(ls.getOffloadContext()) && !ledgersToDelete.contains(ls)) {
2421+
if (isOffloadedNeedsDelete(ls.getOffloadContext(), optionalOffloadPolicies)
2422+
&& !ledgersToDelete.contains(ls)) {
24242423
log.debug("[{}] Ledger {} has been offloaded, bookkeeper ledger needs to be deleted", name,
24252424
ls.getLedgerId());
24262425
offloadedLedgersToDelete.add(ls);
@@ -3205,21 +3204,22 @@ public PositionImpl getPreviousPosition(PositionImpl position) {
32053204
// The previous position will be the last position of an earlier ledgers
32063205
NavigableMap<Long, LedgerInfo> headMap = ledgers.headMap(position.getLedgerId(), false);
32073206

3208-
if (headMap.isEmpty()) {
3207+
final Map.Entry<Long, LedgerInfo> firstEntry = headMap.firstEntry();
3208+
if (firstEntry == null) {
32093209
// There is no previous ledger, return an invalid position in the current ledger
32103210
return PositionImpl.get(position.getLedgerId(), -1);
32113211
}
32123212

32133213
// We need to find the most recent non-empty ledger
32143214
for (long ledgerId : headMap.descendingKeySet()) {
32153215
LedgerInfo li = headMap.get(ledgerId);
3216-
if (li.getEntries() > 0) {
3216+
if (li != null && li.getEntries() > 0) {
32173217
return PositionImpl.get(li.getLedgerId(), li.getEntries() - 1);
32183218
}
32193219
}
32203220

32213221
// in case there are only empty ledgers, we return a position in the first one
3222-
return PositionImpl.get(headMap.firstEntry().getKey(), -1);
3222+
return PositionImpl.get(firstEntry.getKey(), -1);
32233223
}
32243224

32253225
/**

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.bookkeeper.mledger.impl;
2020

2121
import java.util.List;
22+
import java.util.concurrent.CompletableFuture;
23+
2224
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
2325
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
2426
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
@@ -129,4 +131,14 @@ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorIn
129131
* @throws MetaStoreException
130132
*/
131133
Iterable<String> getManagedLedgers() throws MetaStoreException;
134+
135+
/**
136+
* Check ledger exists.
137+
*
138+
* @param ledgerName {@link String}
139+
* @return a future represents the result of the operation.
140+
* an instance of {@link Boolean} is returned
141+
* if the operation succeeds.
142+
*/
143+
CompletableFuture<Boolean> asyncExists(String ledgerName);
132144
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.ArrayList;
2424
import java.util.List;
2525
import java.util.Optional;
26+
import java.util.concurrent.CompletableFuture;
2627
import java.util.concurrent.CompletionException;
2728

2829
import io.netty.buffer.ByteBuf;
@@ -257,6 +258,11 @@ public Iterable<String> getManagedLedgers() throws MetaStoreException {
257258
}
258259
}
259260

261+
@Override
262+
public CompletableFuture<Boolean> asyncExists(String path) {
263+
return store.exists(PREFIX + path);
264+
}
265+
260266
//
261267
// update timestamp if missing or 0
262268
// 3 cases - timestamp does not exist for ledgers serialized before

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import static org.mockito.Mockito.doReturn;
2727
import static org.mockito.Mockito.mock;
2828
import static org.mockito.Mockito.spy;
29+
import static org.mockito.Mockito.times;
30+
import static org.mockito.Mockito.verify;
2931
import static org.mockito.Mockito.when;
3032
import static org.testng.Assert.assertEquals;
3133
import static org.testng.Assert.assertFalse;
@@ -87,6 +89,7 @@
8789
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
8890
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
8991
import org.apache.bookkeeper.mledger.Entry;
92+
import org.apache.bookkeeper.mledger.LedgerOffloader;
9093
import org.apache.bookkeeper.mledger.ManagedCursor;
9194
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
9295
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -105,6 +108,7 @@
105108
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
106109
import org.apache.bookkeeper.mledger.util.Futures;
107110
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
111+
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
108112
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
109113
import org.apache.commons.lang3.exception.ExceptionUtils;
110114
import org.apache.commons.lang3.mutable.MutableObject;
@@ -3173,4 +3177,34 @@ public void testInvalidateReadHandleWhenConsumed() throws Exception {
31733177
cursor3.close();
31743178
ledger.close();
31753179
}
3180+
3181+
@Test
3182+
public void testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers() throws Exception {
3183+
ManagedLedgerConfig config = new ManagedLedgerConfig();
3184+
config.setMaxEntriesPerLedger(1);
3185+
config.setMaxSizePerLedgerMb(1);
3186+
LedgerOffloader ledgerOffloader = mock(NullLedgerOffloader.class);
3187+
OffloadPoliciesImpl offloadPolicies = mock(OffloadPoliciesImpl.class);
3188+
when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies);
3189+
when(ledgerOffloader.getOffloadDriverName()).thenReturn("s3");
3190+
config.setLedgerOffloader(ledgerOffloader);
3191+
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open(
3192+
"testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers", config);
3193+
3194+
// Retain the data.
3195+
ledger.openCursor("test-cursor");
3196+
final int entries = 10;
3197+
byte[] data = new byte[1024 * 1024];
3198+
for (int i = 0; i < entries; i++) {
3199+
ledger.addEntry(data);
3200+
}
3201+
assertEquals(ledger.ledgers.size(), 10);
3202+
3203+
// Set a new offloader to cleanup the execution times of getOffloadPolicies()
3204+
ledgerOffloader = mock(NullLedgerOffloader.class);
3205+
config.setLedgerOffloader(ledgerOffloader);
3206+
3207+
ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
3208+
verify(ledgerOffloader, times(1)).getOffloadPolicies();
3209+
}
31763210
}

0 commit comments

Comments
 (0)