Skip to content

Commit

Permalink
Address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Nov 18, 2022
1 parent 475d108 commit 3c17124
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,17 @@ public final class LedgerMetadataUtils {
"compacted-ledger".getBytes(StandardCharsets.UTF_8);
private static final byte[] METADATA_PROPERTY_COMPONENT_SCHEMA = "schema".getBytes(StandardCharsets.UTF_8);

private static final byte[] METADATA_PROPERTY_COMPONENT_DELAYED_INDEX_BUCKET =
"delayed-index-bucket".getBytes(StandardCharsets.UTF_8);

private static final String METADATA_PROPERTY_MANAGED_LEDGER_NAME = "pulsar/managed-ledger";
private static final String METADATA_PROPERTY_CURSOR_NAME = "pulsar/cursor";
private static final String METADATA_PROPERTY_COMPACTEDTOPIC = "pulsar/compactedTopic";
private static final String METADATA_PROPERTY_COMPACTEDTO = "pulsar/compactedTo";
private static final String METADATA_PROPERTY_SCHEMAID = "pulsar/schemaId";

private static final String METADATA_PROPERTY_BUCKETID = "pulsar/bucketId";

/**
* Build base metadata for every ManagedLedger.
*
Expand Down Expand Up @@ -100,6 +105,20 @@ public static Map<String, byte[]> buildMetadataForSchema(String schemaId) {
);
}

/**
* Build additional metadata for a delayed message index bucket.
*
* @param bucketKey key of the delayed message bucket
* @return an immutable map which describes the schema
*/
public static Map<String, byte[]> buildMetadataForDelayedIndexBucket(String bucketKey) {
return Map.of(
METADATA_PROPERTY_APPLICATION, METADATA_PROPERTY_APPLICATION_PULSAR,
METADATA_PROPERTY_COMPONENT, METADATA_PROPERTY_COMPONENT_DELAYED_INDEX_BUCKET,
METADATA_PROPERTY_BUCKETID, bucketKey.getBytes(StandardCharsets.UTF_8)
);
}

/**
* Build additional metadata for the placement policy config.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,30 @@
*/
package org.apache.pulsar.broker.delayed.bucket;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Slf4j
public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {

private static final Logger log = LoggerFactory.getLogger(BookkeeperBucketSnapshotStorage.class);
private static final byte[] LedgerPassword = "".getBytes();

private final PulsarService pulsar;
Expand All @@ -53,30 +55,28 @@ public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {

@Override
public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
List<SnapshotSegment> bucketSnapshotSegments) {
return createLedger()
List<SnapshotSegment> bucketSnapshotSegments,
String bucketKey) {
return createLedger(bucketKey)
.thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
.thenApply(__ -> ledgerHandle))
.thenCompose(ledgerHandle -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments)
.thenApply(__ -> ledgerHandle))
.thenCompose(ledgerHandle -> closeLedger(ledgerHandle)
.thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
.thenCompose(__ -> closeLedger(ledgerHandle))
.thenApply(__ -> ledgerHandle.getId()));
}

@Override
public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
return openLedger(bucketId).thenCompose(
ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
thenCompose(
entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
}

@Override
public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
long lastSegmentEntryId) {
return openLedger(bucketId).thenCompose(
ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId).thenCompose(
this::parseSnapshotSegmentEntries));
ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId,
lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
}

@Override
Expand Down Expand Up @@ -122,35 +122,31 @@ private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
return FutureUtil.waitForAll(addFutures);
}

private CompletableFuture<SnapshotMetadata> parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
CompletableFuture<SnapshotMetadata> result = new CompletableFuture<>();
private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
try {
result.complete(SnapshotMetadata.parseFrom(ledgerEntry.getEntry()));
} catch (IOException e) {
result.completeExceptionally(e);
return SnapshotMetadata.parseFrom(ledgerEntry.getEntry());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
return result;
}

private CompletableFuture<List<SnapshotSegment>> parseSnapshotSegmentEntries(
Enumeration<LedgerEntry> entryEnumeration) {
CompletableFuture<List<SnapshotSegment>> result = new CompletableFuture<>();
private List<SnapshotSegment> parseSnapshotSegmentEntries(Enumeration<LedgerEntry> entryEnumeration) {
List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
try {
while (entryEnumeration.hasMoreElements()) {
LedgerEntry ledgerEntry = entryEnumeration.nextElement();
snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
}
result.complete(snapshotMetadataList);
return snapshotMetadataList;
} catch (IOException e) {
result.completeExceptionally(e);
throw new RuntimeException(e);
}
return result;
}

@NotNull
private CompletableFuture<LedgerHandle> createLedger() {
private CompletableFuture<LedgerHandle> createLedger(String bucketKey) {
CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
Map<String, byte[]> metadata = LedgerMetadataUtils.buildMetadataForDelayedIndexBucket(bucketKey);
bookKeeper.asyncCreateLedger(
config.getManagedLedgerDefaultEnsembleSize(),
config.getManagedLedgerDefaultWriteQuorum(),
Expand All @@ -163,7 +159,7 @@ private CompletableFuture<LedgerHandle> createLedger() {
} else {
future.complete(handle);
}
}, null, null);
}, null, metadata);
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ long getAndUpdateBucketId() {
}

CompletableFuture<Long> asyncSaveBucketSnapshot(
ImmutableBucket bucketState, DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata,
ImmutableBucket bucket, DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata,
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments) {

return bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments)
final String bucketKey = bucket.bucketKey();
return bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey)
.thenCompose(newBucketId -> {
bucketState.setBucketId(newBucketId);
String bucketKey = bucketState.bucketKey();
bucket.setBucketId(newBucketId);

return putBucketKeyId(bucketKey, newBucketId).exceptionally(ex -> {
log.warn("Failed to record bucketId to cursor property, bucketKey: {}", bucketKey);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ public interface BucketSnapshotStorage {
*
* @param snapshotMetadata the metadata of snapshot
* @param bucketSnapshotSegments the list of snapshot segments
* @param bucketKey the key of bucket is used to generate custom storage metadata
* @return the future with bucketId(ledgerId).
*/
CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
List<SnapshotSegment> bucketSnapshotSegments);
List<SnapshotSegment> bucketSnapshotSegments,
String bucketKey);

/**
* Get delayed message index bucket snapshot metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand Down Expand Up @@ -59,7 +60,8 @@ public void testCreateSnapshot() throws ExecutionException, InterruptedException
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder().build();
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments);
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
bucketSnapshotSegments, UUID.randomUUID().toString());
Long bucketId = future.get();
Assert.assertNotNull(bucketId);
}
Expand Down Expand Up @@ -87,7 +89,8 @@ public void testGetSnapshot() throws ExecutionException, InterruptedException {
bucketSnapshotSegments.add(snapshotSegment);

CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments);
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
bucketSnapshotSegments, UUID.randomUUID().toString());
Long bucketId = future.get();
Assert.assertNotNull(bucketId);

Expand Down Expand Up @@ -125,7 +128,8 @@ public void testGetSnapshotMetadata() throws ExecutionException, InterruptedExce
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();

CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments);
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
bucketSnapshotSegments, UUID.randomUUID().toString());
Long bucketId = future.get();
Assert.assertNotNull(bucketId);

Expand All @@ -146,7 +150,8 @@ public void testDeleteSnapshot() throws ExecutionException, InterruptedException
DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata.newBuilder().build();
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments);
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
bucketSnapshotSegments, UUID.randomUUID().toString());
Long bucketId = future.get();
Assert.assertNotNull(bucketId);

Expand Down Expand Up @@ -183,7 +188,8 @@ public void testGetBucketSnapshotLength() throws ExecutionException, Interrupted
bucketSnapshotSegments.add(snapshotSegment);

CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments);
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
bucketSnapshotSegments, UUID.randomUUID().toString());
Long bucketId = future.get();
Assert.assertNotNull(bucketId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public MockBucketSnapshotStorage() {

@Override
public CompletableFuture<Long> createBucketSnapshot(
SnapshotMetadata snapshotMetadata, List<SnapshotSegment> bucketSnapshotSegments) {
SnapshotMetadata snapshotMetadata, List<SnapshotSegment> bucketSnapshotSegments, String bucketKey) {
return CompletableFuture.supplyAsync(() -> {
long bucketId = maxBucketId.getAndIncrement();
List<ByteBuf> entries = new ArrayList<>();
Expand Down

0 comments on commit 3c17124

Please sign in to comment.