Skip to content

Commit

Permalink
[fix][broker] Avoid splitting one batch message into two entries in S…
Browse files Browse the repository at this point in the history
…trategicTwoPhaseCompactor (#21091)
  • Loading branch information
Demogorgon314 authored Sep 4, 2023
1 parent 59a8e72 commit e59c850
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
Expand All @@ -44,17 +45,17 @@
* [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
*/
public class RawBatchMessageContainerImpl extends BatchMessageContainerImpl {
MessageCrypto msgCrypto;
Set<String> encryptionKeys;
CryptoKeyReader cryptoKeyReader;
private MessageCrypto<MessageMetadata, MessageMetadata> msgCrypto;
private Set<String> encryptionKeys;
private CryptoKeyReader cryptoKeyReader;
private MessageIdAdv lastAddedMessageId;

public RawBatchMessageContainerImpl(int maxNumMessagesInBatch, int maxBytesInBatch) {
public RawBatchMessageContainerImpl() {
super();
this.compressionType = CompressionType.NONE;
this.compressor = new CompressionCodecNone();
this.maxNumMessagesInBatch = maxNumMessagesInBatch;
this.maxBytesInBatch = maxBytesInBatch;
}

private ByteBuf encrypt(ByteBuf compressedPayload) {
if (msgCrypto == null) {
return compressedPayload;
Expand Down Expand Up @@ -90,6 +91,28 @@ public void setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
this.cryptoKeyReader = cryptoKeyReader;
}

@Override
public boolean add(MessageImpl<?> msg, SendCallback callback) {
this.lastAddedMessageId = (MessageIdAdv) msg.getMessageId();
return super.add(msg, callback);
}

@Override
protected boolean isBatchFull() {
return false;
}

@Override
public boolean haveEnoughSpace(MessageImpl<?> msg) {
if (lastAddedMessageId == null) {
return true;
}
// Keep same batch compact to same batch.
MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId();
return msgId.getLedgerId() == lastAddedMessageId.getLedgerId()
&& msgId.getEntryId() == lastAddedMessageId.getEntryId();
}

/**
* Serializes the batched messages and return the ByteBuf.
* It sets the CompressionType and Encryption Keys from the batched messages.
Expand Down Expand Up @@ -168,4 +191,10 @@ public ByteBuf toByteBuf() {
clear();
return buf;
}

@Override
public void clear() {
this.lastAddedMessageId = null;
super.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.compaction;

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.time.Duration;
import java.util.Iterator;
Expand Down Expand Up @@ -63,39 +62,19 @@
public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
private static final Logger log = LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
private static final int MAX_NUM_MESSAGES_IN_BATCH = 1000;
private static final int MAX_BYTES_IN_BATCH = 128 * 1024;
private static final int MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS = 20 * 1000;
private final Duration phaseOneLoopReadTimeout;
private final RawBatchMessageContainerImpl batchMessageContainer;

@VisibleForTesting
public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
BookKeeper bk,
ScheduledExecutorService scheduler,
int maxNumMessagesInBatch) {
this(conf, pulsar, bk, scheduler, maxNumMessagesInBatch, MAX_BYTES_IN_BATCH);
}

private StrategicTwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
BookKeeper bk,
ScheduledExecutorService scheduler,
int maxNumMessagesInBatch,
int maxBytesInBatch) {
ScheduledExecutorService scheduler) {
super(conf, pulsar, bk, scheduler);
batchMessageContainer = new RawBatchMessageContainerImpl(maxNumMessagesInBatch, maxBytesInBatch);
batchMessageContainer = new RawBatchMessageContainerImpl();
phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
}

public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
BookKeeper bk,
ScheduledExecutorService scheduler) {
this(conf, pulsar, bk, scheduler, MAX_NUM_MESSAGES_IN_BATCH, MAX_BYTES_IN_BATCH);
}

public CompletableFuture<Long> compact(String topic) {
throw new UnsupportedOperationException();
}
Expand Down Expand Up @@ -418,7 +397,6 @@ private <T> void phaseTwoLoop(String topic, Iterator<Message<T>> reader,
.whenComplete((res, exception2) -> {
if (exception2 != null) {
promise.completeExceptionally(exception2);
return;
}
});
phaseTwoLoop(topic, reader, lh, outstanding, promise);
Expand All @@ -443,35 +421,45 @@ private <T> void phaseTwoLoop(String topic, Iterator<Message<T>> reader,

<T> CompletableFuture<Boolean> addToCompactedLedger(
LedgerHandle lh, Message<T> m, String topic, Semaphore outstanding) {
if (m == null) {
return flushBatchMessage(lh, topic, outstanding);
}
if (batchMessageContainer.haveEnoughSpace((MessageImpl<?>) m)) {
batchMessageContainer.add((MessageImpl<?>) m, null);
return CompletableFuture.completedFuture(false);
}
CompletableFuture<Boolean> f = flushBatchMessage(lh, topic, outstanding);
batchMessageContainer.add((MessageImpl<?>) m, null);
return f;
}

private CompletableFuture<Boolean> flushBatchMessage(LedgerHandle lh, String topic,
Semaphore outstanding) {
if (batchMessageContainer.getNumMessagesInBatch() <= 0) {
return CompletableFuture.completedFuture(false);
}
CompletableFuture<Boolean> bkf = new CompletableFuture<>();
if (m == null || batchMessageContainer.add((MessageImpl<?>) m, null)) {
if (batchMessageContainer.getNumMessagesInBatch() > 0) {
try {
ByteBuf serialized = batchMessageContainer.toByteBuf();
outstanding.acquire();
mxBean.addCompactionWriteOp(topic, serialized.readableBytes());
long start = System.nanoTime();
lh.asyncAddEntry(serialized,
(rc, ledger, eid, ctx) -> {
outstanding.release();
mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, TimeUnit.NANOSECONDS);
if (rc != BKException.Code.OK) {
bkf.completeExceptionally(BKException.create(rc));
} else {
bkf.complete(true);
}
}, null);
try {
ByteBuf serialized = batchMessageContainer.toByteBuf();
outstanding.acquire();
mxBean.addCompactionWriteOp(topic, serialized.readableBytes());
long start = System.nanoTime();
lh.asyncAddEntry(serialized,
(rc, ledger, eid, ctx) -> {
outstanding.release();
mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, TimeUnit.NANOSECONDS);
if (rc != BKException.Code.OK) {
bkf.completeExceptionally(BKException.create(rc));
} else {
bkf.complete(true);
}
}, null);

} catch (Throwable t) {
log.error("Failed to add entry", t);
batchMessageContainer.discard((Exception) t);
return FutureUtil.failedFuture(t);
}
} else {
bkf.complete(false);
}
} else {
bkf.complete(false);
} catch (Throwable t) {
log.error("Failed to add entry", t);
batchMessageContainer.discard((Exception) t);
bkf.completeExceptionally(t);
return bkf;
}
return bkf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.pulsar.compaction.CompactionTest;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class RawBatchMessageContainerImplTest {
Expand All @@ -56,8 +55,6 @@ public class RawBatchMessageContainerImplTest {
CryptoKeyReader cryptoKeyReader;
Map<String, EncryptionContext.EncryptionKey> encryptKeys;

int maxBytesInBatch = 5 * 1024 * 1024;

public void setEncryptionAndCompression(boolean encrypt, boolean compress) {
if (compress) {
compressionType = ZSTD;
Expand Down Expand Up @@ -107,22 +104,22 @@ public MessageImpl createMessage(String topic, String value, int entryId) {
public void setup() throws Exception {
setEncryptionAndCompression(false, true);
}
@DataProvider(name = "testBatchLimitByMessageCount")
public static Object[][] testBatchLimitByMessageCount() {
return new Object[][] {{true}, {false}};
}

@Test(timeOut = 20000, dataProvider = "testBatchLimitByMessageCount")
public void testToByteBufWithBatchLimit(boolean testBatchLimitByMessageCount) throws IOException {
RawBatchMessageContainerImpl container = testBatchLimitByMessageCount ?
new RawBatchMessageContainerImpl(2, Integer.MAX_VALUE) :
new RawBatchMessageContainerImpl(Integer.MAX_VALUE, 5);
@Test(timeOut = 20000)
public void testToByteBufWithBatchLimit()throws IOException {
RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl();

String topic = "my-topic";
var full1 = container.add(createMessage(topic, "hi-1", 0), null);
var full2 = container.add(createMessage(topic, "hi-2", 1), null);
MessageImpl message1 = createMessage(topic, "hi-1", 0);
boolean hasEnoughSpase1 = container.haveEnoughSpace(message1);
var full1 = container.add(message1, null);
assertFalse(full1);
assertTrue(full2);
assertTrue(hasEnoughSpase1);
MessageImpl message2 = createMessage(topic, "hi-2", 1);
boolean hasEnoughSpase2 = container.haveEnoughSpace(message2);
assertFalse(hasEnoughSpase2);
var full2 = container.add(message2, null);
assertFalse(full2);

ByteBuf buf = container.toByteBuf();


Expand Down Expand Up @@ -167,7 +164,7 @@ public void testToByteBufWithBatchLimit(boolean testBatchLimitByMessageCount) th
public void testToByteBufWithCompressionAndEncryption() throws IOException {
setEncryptionAndCompression(true, true);

RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(2, maxBytesInBatch);
RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl();
container.setCryptoKeyReader(cryptoKeyReader);
String topic = "my-topic";
container.add(createMessage(topic, "hi-1", 0), null);
Expand Down Expand Up @@ -217,7 +214,7 @@ public void testToByteBufWithCompressionAndEncryption() throws IOException {

@Test
public void testToByteBufWithSingleMessage() throws IOException {
RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(2, maxBytesInBatch);
RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl();
String topic = "my-topic";
container.add(createMessage(topic, "hi-1", 0), null);
ByteBuf buf = container.toByteBuf();
Expand Down Expand Up @@ -250,25 +247,31 @@ public void testToByteBufWithSingleMessage() throws IOException {
}

@Test
public void testMaxNumMessagesInBatch() {
RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1, maxBytesInBatch);
public void testAddDifferentBatchMessage() {
RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl();
String topic = "my-topic";

boolean isFull = container.add(createMessage(topic, "hi", 0), null);
Assert.assertTrue(isFull);
Assert.assertTrue(container.isBatchFull());
Assert.assertFalse(isFull);
Assert.assertFalse(container.isBatchFull());
MessageImpl message = createMessage(topic, "hi-1", 0);
Assert.assertTrue(container.haveEnoughSpace(message));
isFull = container.add(message, null);
Assert.assertFalse(isFull);
message = createMessage(topic, "hi-2", 1);
Assert.assertFalse(container.haveEnoughSpace(message));
}

@Test(expectedExceptions = UnsupportedOperationException.class)
public void testCreateOpSendMsg() {
RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1, maxBytesInBatch);
RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl();
container.createOpSendMsg();
}

@Test
public void testToByteBufWithEncryptionWithoutCryptoKeyReader() {
setEncryptionAndCompression(true, false);
RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1, maxBytesInBatch);
RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl();
String topic = "my-topic";
container.add(createMessage(topic, "hi-1", 0), null);
Assert.assertEquals(container.getNumMessagesInBatch(), 1);
Expand All @@ -286,7 +289,7 @@ public void testToByteBufWithEncryptionWithoutCryptoKeyReader() {
@Test
public void testToByteBufWithEncryptionWithInvalidEncryptKeys() {
setEncryptionAndCompression(true, false);
RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1, maxBytesInBatch);
RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl();
container.setCryptoKeyReader(cryptoKeyReader);
encryptKeys = new HashMap<>();
encryptKeys.put(null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -542,14 +541,8 @@ public void testBatchMessageIdsDontChange() throws Exception {
Assert.assertEquals(message2.getKey(), "key2");
Assert.assertEquals(new String(message2.getData()), "my-message-3");
if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
MessageIdImpl id = (MessageIdImpl) messages.get(0).getMessageId();
MessageIdImpl id1 = new MessageIdImpl(
id.getLedgerId(), id.getEntryId(), id.getPartitionIndex());
Assert.assertEquals(message1.getMessageId(), id1);
id = (MessageIdImpl) messages.get(2).getMessageId();
MessageIdImpl id2 = new MessageIdImpl(
id.getLedgerId(), id.getEntryId(), id.getPartitionIndex());
Assert.assertEquals(message2.getMessageId(), id2);
Assert.assertEquals(message1.getMessageId(), messages.get(0).getMessageId());
Assert.assertEquals(message2.getMessageId(), messages.get(1).getMessageId());
} else {
Assert.assertEquals(message1.getMessageId(), messages.get(0).getMessageId());
Assert.assertEquals(message2.getMessageId(), messages.get(2).getMessageId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class StrategicCompactionRetentionTest extends CompactionRetentionTest {
@Override
public void setup() throws Exception {
super.setup();
compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1);
compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
strategy = new TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
}

Expand Down
Loading

0 comments on commit e59c850

Please sign in to comment.