Skip to content

Commit

Permalink
Make bucket merge operate asynchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Mar 22, 2023
1 parent 4ab4463 commit f854c59
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker

private final Table<Long, Long, ImmutableBucket> snapshotSegmentLastIndexTable;

private static final Long INVALID_BUCKET_ID = -1L;

public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
Expand Down Expand Up @@ -246,12 +248,12 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
immutableBucket);

immutableBucket.getSnapshotCreateFuture().ifPresent(createFuture -> {
CompletableFuture<Long> future = createFuture.whenComplete((__, ex) -> {
CompletableFuture<Long> future = createFuture.handle((bucketId, ex) -> {
if (ex == null) {
immutableBucket.setSnapshotSegments(null);
log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(),
immutableBucket.bucketKey());
return;
return bucketId;
}

//TODO Record create snapshot failed
Expand All @@ -277,6 +279,7 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
snapshotSegmentLastIndexTable.remove(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getTimestamp());
}
return INVALID_BUCKET_ID;
});
immutableBucket.setSnapshotCreateFuture(future);
});
Expand Down Expand Up @@ -308,12 +311,7 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
lastMutableBucket.resetLastMutableBucketRange();

if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
try {
asyncMergeBucketSnapshot().get(2 * AsyncOperationTimeoutSeconds * MaxRetryTimes, TimeUnit.SECONDS);
} catch (Exception e) {
// Ignore exception to merge bucket on the next schedule.
log.error("[{}] An exception occurs when merge bucket snapshot.", dispatcher.getName(), e);
}
asyncMergeBucketSnapshot();
}
}

Expand Down Expand Up @@ -341,18 +339,24 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
List<ImmutableBucket> values = immutableBuckets.asMapOfRanges().values().stream().toList();
long minNumberMessages = Long.MAX_VALUE;
long minScheduleTimestampSum = Long.MAX_VALUE;
int minIndex = -1;
for (int i = 0; i + 1 < values.size(); i++) {
ImmutableBucket bucketL = values.get(i);
ImmutableBucket bucketR = values.get(i + 1);
long numberMessages = bucketL.numberBucketDelayedMessages + bucketR.numberBucketDelayedMessages;
if (numberMessages < minNumberMessages) {
minNumberMessages = (int) numberMessages;
if (bucketL.lastSegmentEntryId > bucketL.getCurrentSegmentEntryId()
&& bucketR.lastSegmentEntryId > bucketR.getCurrentSegmentEntryId()
&& bucketL.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone()
&& bucketR.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone()) {
minIndex = i;
if (bucketL.lastSegmentEntryId > bucketL.getCurrentSegmentEntryId()
&& bucketR.lastSegmentEntryId > bucketR.getCurrentSegmentEntryId()
&& !bucketL.merging && !bucketR.merging){
long scheduleTimestampSum =
Math.min(bucketL.firstScheduleTimestamps.get(bucketL.currentSegmentEntryId + 1),
bucketR.firstScheduleTimestamps.get(bucketR.currentSegmentEntryId + 1));
long numberMessages = bucketL.numberBucketDelayedMessages + bucketR.numberBucketDelayedMessages;
if (scheduleTimestampSum <= minScheduleTimestampSum) {
minScheduleTimestampSum = scheduleTimestampSum;
if (numberMessages < minNumberMessages) {
minNumberMessages = numberMessages;
minIndex = i;
}
}
}
}
Expand All @@ -369,7 +373,14 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
log.info("[{}] Merging bucket snapshot, bucketAKey: {}, bucketBKey: {}", dispatcher.getName(),
immutableBucketA.bucketKey(), immutableBucketB.bucketKey());
}

immutableBucketA.merging = true;
immutableBucketB.merging = true;
return asyncMergeBucketSnapshot(immutableBucketA, immutableBucketB).whenComplete((__, ex) -> {
synchronized (this) {
immutableBucketA.merging = false;
immutableBucketB.merging = false;
}
if (ex != null) {
log.error("[{}] Failed to merge bucket snapshot, bucketAKey: {}, bucketBKey: {}",
dispatcher.getName(), immutableBucketA.bucketKey(), immutableBucketB.bucketKey(), ex);
Expand All @@ -382,46 +393,58 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {

private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(ImmutableBucket bucketA,
ImmutableBucket bucketB) {
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> futureA =
bucketA.getRemainSnapshotSegment();
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> futureB =
bucketB.getRemainSnapshotSegment();
return futureA.thenCombine(futureB, CombinedSegmentDelayedIndexQueue::wrap)
.thenAccept(combinedDelayedIndexQueue -> {
Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
timeStepPerBucketSnapshotSegmentInMillis, maxIndexesPerBucketSnapshotSegment,
sharedBucketPriorityQueue, combinedDelayedIndexQueue, bucketA.startLedgerId,
bucketB.endLedgerId);

// Merge bit map to new bucket
Map<Long, RoaringBitmap> delayedIndexBitMapA = bucketA.getDelayedIndexBitMap();
Map<Long, RoaringBitmap> delayedIndexBitMapB = bucketB.getDelayedIndexBitMap();
Map<Long, RoaringBitmap> delayedIndexBitMap = new HashMap<>(delayedIndexBitMapA);
delayedIndexBitMapB.forEach((ledgerId, bitMapB) -> {
delayedIndexBitMap.compute(ledgerId, (k, bitMapA) -> {
if (bitMapA == null) {
return bitMapB;
}
CompletableFuture<Long> createAFuture = bucketA.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE);
CompletableFuture<Long> createBFuture = bucketB.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE);

bitMapA.or(bitMapB);
return bitMapA;
});
});
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);

afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
return CompletableFuture.allOf(createAFuture, createBFuture).thenCompose(bucketId -> {
if (INVALID_BUCKET_ID.equals(createAFuture.join()) || INVALID_BUCKET_ID.equals(createBFuture.join())) {
return FutureUtil.failedFuture(new RuntimeException("Can't merge buckets due to bucket create failed"));
}

immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
.orElse(NULL_LONG_PROMISE).thenCompose(___ -> {
CompletableFuture<Void> removeAFuture = bucketA.asyncDeleteBucketSnapshot();
CompletableFuture<Void> removeBFuture = bucketB.asyncDeleteBucketSnapshot();
return CompletableFuture.allOf(removeAFuture, removeBFuture);
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> futureA =
bucketA.getRemainSnapshotSegment();
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> futureB =
bucketB.getRemainSnapshotSegment();
return futureA.thenCombine(futureB, CombinedSegmentDelayedIndexQueue::wrap)
.thenAccept(combinedDelayedIndexQueue -> {
synchronized (BucketDelayedDeliveryTracker.this) {
Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
timeStepPerBucketSnapshotSegmentInMillis,
maxIndexesPerBucketSnapshotSegment,
sharedBucketPriorityQueue, combinedDelayedIndexQueue, bucketA.startLedgerId,
bucketB.endLedgerId);

// Merge bit map to new bucket
Map<Long, RoaringBitmap> delayedIndexBitMapA = bucketA.getDelayedIndexBitMap();
Map<Long, RoaringBitmap> delayedIndexBitMapB = bucketB.getDelayedIndexBitMap();
Map<Long, RoaringBitmap> delayedIndexBitMap = new HashMap<>(delayedIndexBitMapA);
delayedIndexBitMapB.forEach((ledgerId, bitMapB) -> {
delayedIndexBitMap.compute(ledgerId, (k, bitMapA) -> {
if (bitMapA == null) {
return bitMapB;
}

bitMapA.or(bitMapB);
return bitMapA;
});
});
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);

afterCreateImmutableBucket(immutableBucketDelayedIndexPair);

immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
.orElse(NULL_LONG_PROMISE).thenCompose(___ -> {
CompletableFuture<Void> removeAFuture = bucketA.asyncDeleteBucketSnapshot();
CompletableFuture<Void> removeBFuture = bucketB.asyncDeleteBucketSnapshot();
return CompletableFuture.allOf(removeAFuture, removeBFuture);
});

immutableBuckets.remove(Range.closed(bucketA.startLedgerId, bucketA.endLedgerId));
immutableBuckets.remove(Range.closed(bucketB.startLedgerId, bucketB.endLedgerId));
}
});

immutableBuckets.remove(Range.closed(bucketA.startLedgerId, bucketA.endLedgerId));
immutableBuckets.remove(Range.closed(bucketB.startLedgerId, bucketB.endLedgerId));
});
});
}

@Override
Expand Down Expand Up @@ -477,6 +500,12 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa

ImmutableBucket bucket = snapshotSegmentLastIndexTable.get(ledgerId, entryId);
if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) {
if (bucket.merging) {
log.info("[{}] Skip load to wait for bucket snapshot merge finish, bucketKey:{}",
dispatcher.getName(), bucket.bucketKey());
break;
}

final int preSegmentEntryId = bucket.currentSegmentEntryId;
if (log.isDebugEnabled()) {
log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}",
Expand Down Expand Up @@ -525,14 +554,14 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa
dispatcher.getName(), bucket.bucketKey(), bucket.currentSegmentEntryId);
}
}).get(AsyncOperationTimeoutSeconds * MaxRetryTimes, TimeUnit.SECONDS);
snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
} catch (Exception e) {
// Ignore exception to reload this segment on the next schedule.
log.error("[{}] An exception occurs when load next bucket snapshot, bucketKey:{}",
dispatcher.getName(), bucket.bucketKey(), e);
break;
}
}
snapshotSegmentLastIndexTable.remove(ledgerId, entryId);

positions.add(new PositionImpl(ledgerId, entryId));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.AsyncOperationTimeoutSeconds;
import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.NULL_LONG_PROMISE;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -44,7 +45,12 @@
class ImmutableBucket extends Bucket {

@Setter
private volatile List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotSegments;
private List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotSegments;

boolean merging = false;

@Setter
List<Long> firstScheduleTimestamps = new ArrayList<>();

ImmutableBucket(String dispatcherName, ManagedCursor cursor,
BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) {
Expand Down Expand Up @@ -92,6 +98,9 @@ private CompletableFuture<List<DelayedIndex>> asyncLoadNextBucketSnapshotEntry(b

this.setLastSegmentEntryId(metadataList.size());
this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList);
List<Long> firstScheduleTimestamps = metadataList.stream().map(
SnapshotSegmentMetadata::getMinScheduleTimestamp).toList();
this.setFirstScheduleTimestamps(firstScheduleTimestamps);

return nextSnapshotEntryIndex + 1;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,15 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
SnapshotSegment.Builder snapshotSegmentBuilder = SnapshotSegment.newBuilder();
SnapshotSegmentMetadata.Builder segmentMetadataBuilder = SnapshotSegmentMetadata.newBuilder();

List<Long> firstScheduleTimestamps = new ArrayList<>();
long currentTimestampUpperLimit = 0;
long currentFirstTimestamp = 0L;
while (!delayedIndexQueue.isEmpty()) {
DelayedIndex delayedIndex = delayedIndexQueue.peek();
long timestamp = delayedIndex.getTimestamp();
if (currentTimestampUpperLimit == 0) {
currentFirstTimestamp = timestamp;
firstScheduleTimestamps.add(currentFirstTimestamp);
currentTimestampUpperLimit = timestamp + timeStepPerBucketSnapshotSegment - 1;
}

Expand All @@ -104,6 +108,7 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
|| (maxIndexesPerBucketSnapshotSegment != -1
&& snapshotSegmentBuilder.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
segmentMetadataBuilder.setMinScheduleTimestamp(currentFirstTimestamp);
currentTimestampUpperLimit = 0;

Iterator<Map.Entry<Long, RoaringBitmap>> iterator = bitMap.entrySet().iterator();
Expand Down Expand Up @@ -134,6 +139,7 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
bucket.setCurrentSegmentEntryId(1);
bucket.setNumberBucketDelayedMessages(numMessages);
bucket.setLastSegmentEntryId(lastSegmentEntryId);
bucket.setFirstScheduleTimestamps(firstScheduleTimestamps);

// Skip first segment, because it has already been loaded
List<SnapshotSegment> snapshotSegments = bucketSnapshotSegments.subList(1, bucketSnapshotSegments.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ message DelayedIndex {
message SnapshotSegmentMetadata {
map<uint64, bytes> delayed_index_bit_map = 1;
required uint64 max_schedule_timestamp = 2;
required uint64 min_schedule_timestamp = 3;
}

message SnapshotSegment {
Expand Down
Loading

0 comments on commit f854c59

Please sign in to comment.