Skip to content

Commit

Permalink
[fix][broker]Fix memoryLimitController currentUsage and MaxQueueSize …
Browse files Browse the repository at this point in the history
…semaphore leak when batchMessageContainer add message exception (#17276)

* fix memoryLimitController currentUsage and MaxQueueSize semaphore leak when batchMessageContainer add message

* fix unit test

Co-authored-by: nicklixinyang <nicklixinyang@didiglobal.com>
  • Loading branch information
Nicklee007 and nicklixinyang authored Sep 5, 2022
1 parent d139d88 commit e1b6187
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
*/
package org.apache.pulsar.client.impl;

import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import io.netty.buffer.ByteBufAllocator;
import java.lang.reflect.Field;
import lombok.Cleanup;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -125,6 +130,41 @@ public void testProducerBatchSendTimeoutMemoryRelease() throws Exception {
}
}

@Test(timeOut = 10_000)
public void testBatchMessageOOMMemoryRelease() throws Exception {
initClientWithMemoryLimit();
@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic("testProducerMemoryLimit")
.sendTimeout(5, TimeUnit.SECONDS)
.maxPendingMessages(0)
.enableBatching(true)
.batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
.batchingMaxBytes(12)
.create();
this.stopBroker();

try {
ProducerImpl<byte[]> spyProducer = Mockito.spy(producer);
final ByteBufAllocator mockAllocator = mock(ByteBufAllocator.class);
doAnswer((ignore) -> {
throw new OutOfMemoryError("memory-test");
}).when(mockAllocator).buffer(anyInt());

final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(mockAllocator);
Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
batchMessageContainerField.setAccessible(true);
batchMessageContainerField.set(spyProducer, batchMessageContainer);

spyProducer.send("memory-test".getBytes(StandardCharsets.UTF_8));
Assert.fail("can not reach here");
} catch (PulsarClientException ex) {
PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController();
Assert.assertEquals(memoryLimitController.currentUsage(), 0);
}
}

@Test(timeOut = 10_000)
public void testProducerCloseMemoryRelease() throws Exception {
initClientWithMemoryLimit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.pulsar.client.impl;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import io.netty.buffer.ByteBufAllocator;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -274,4 +278,38 @@ public void testBatchMessageSendTimeoutProducerSemaphoreRelease() throws Excepti
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 10);
}
}

@Test(timeOut = 10_000)
public void testBatchMessageOOMProducerSemaphoreRelease() throws Exception {
final int pendingQueueSize = 10;
@Cleanup
ProducerImpl<byte[]> producer =
(ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic("testProducerSemaphoreRelease")
.sendTimeout(5, TimeUnit.SECONDS)
.maxPendingMessages(pendingQueueSize)
.enableBatching(true)
.batchingMaxPublishDelay(500, TimeUnit.MILLISECONDS)
.batchingMaxBytes(12)
.create();
this.stopBroker();

try {
ProducerImpl<byte[]> spyProducer = Mockito.spy(producer);
final ByteBufAllocator mockAllocator = mock(ByteBufAllocator.class);
doAnswer((ignore) -> {
throw new OutOfMemoryError("semaphore-test");
}).when(mockAllocator).buffer(anyInt());

final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(mockAllocator);
Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
batchMessageContainerField.setAccessible(true);
batchMessageContainerField.set(spyProducer, batchMessageContainer);

spyProducer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
Assert.fail("can not reach here");
} catch (PulsarClientException ex) {
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public boolean add(MessageImpl<?> msg, SendCallback callback) {
}
} catch (Throwable e) {
log.error("construct first message failed, exception is ", e);
producer.semaphoreRelease(getNumMessagesInBatch());
producer.client.getMemoryLimitController().releaseMemory(msg.getUncompressedSize());
discard(new PulsarClientException(e));
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBufAllocator;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.junit.Assert;
import org.testng.annotations.Test;

public class BatchMessageContainerImplTest {
Expand All @@ -41,6 +43,17 @@ public void recoveryAfterOom() {
final ProducerImpl<?> producer = mock(ProducerImpl.class);
final ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
producerConfigurationData.setCompressionType(CompressionType.NONE);
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
MemoryLimitController memoryLimitController = mock(MemoryLimitController.class);
when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController);
try {
Field clientFiled = HandlerState.class.getDeclaredField("client");
clientFiled.setAccessible(true);
clientFiled.set(producer, pulsarClient);
} catch (Exception e){
Assert.fail(e.getMessage());
}

when(producer.getConfiguration()).thenReturn(producerConfigurationData);
final ByteBufAllocator mockAllocator = mock(ByteBufAllocator.class);
doAnswer((ignore) -> {
Expand Down

0 comments on commit e1b6187

Please sign in to comment.