Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@

public class PopReviveService extends ServiceThread {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
public static final int MAX_REVIVE_SIZE = 1024 * 10;
private final int[] ckRewriteIntervalsInSeconds = new int[] { 10, 20, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200 };

private int queueId;
Expand Down Expand Up @@ -143,10 +144,15 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
POP_LOGGER.error("reviveQueueId={}, revive error, msg is: {}", queueId, msgInner);
return false;
}
this.brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(popCheckPoint);
this.brokerController.getBrokerStatsManager().incBrokerPutNums(popCheckPoint.getTopic(), 1);
this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
if (this.brokerController.getPopInflightMessageCounter() != null) {
this.brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(popCheckPoint);
}
if (this.brokerController.getBrokerStatsManager() != null) {
this.brokerController.getBrokerStatsManager().incBrokerPutNums(popCheckPoint.getTopic(), 1);
this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
}
return true;
}

Expand Down Expand Up @@ -347,6 +353,13 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
POP_LOGGER.info("slave skip scan, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
break;
}
if (System.currentTimeMillis() - startScanTime > brokerController.getBrokerConfig().getReviveScanTime()) {
POP_LOGGER.info("reviveQueueId={}, scan timeout ", queueId);
break;
}
if (map.size() >= MAX_REVIVE_SIZE) {
break;
}
List<MessageExt> messageExts = getReviveMessage(offset, queueId);
if (messageExts == null || messageExts.isEmpty()) {
long old = endTime;
Expand Down Expand Up @@ -375,10 +388,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
} else {
noMsgCount = 0;
}
if (System.currentTimeMillis() - startScanTime > brokerController.getBrokerConfig().getReviveScanTime()) {
POP_LOGGER.info("reviveQueueId={}, scan timeout ", queueId);
break;
}

for (MessageExt messageExt : messageExts) {
if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) {
String raw = new String(messageExt.getBody(), DataConverter.CHARSET_UTF8);
Expand Down Expand Up @@ -500,12 +510,17 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl
sortList.get(0).getReviveOffset(), sortList.get(sortList.size() - 1).getStartOffset(), sortList.get(sortList.size() - 1).getReviveOffset());
}
long newOffset = consumeReviveObj.oldOffset;
long current = System.currentTimeMillis();
long maxEndTime = consumeReviveObj.endTime;
if (current > maxEndTime) {
maxEndTime = current;
}
for (PopCheckPoint popCheckPoint : sortList) {
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip ck process, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
break;
}
if (consumeReviveObj.endTime - popCheckPoint.getReviveTime() <= (PopAckConstants.ackTimeInterval + PopAckConstants.SECOND)) {
if (maxEndTime - popCheckPoint.getReviveTime() <= (PopAckConstants.ackTimeInterval + PopAckConstants.SECOND)) {
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -63,11 +64,12 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -104,25 +106,33 @@ public class PopReviveServiceTest {
private BrokerMetricsManager brokerMetricsManager;
@Mock
private PopMetricsManager popMetricsManager;
@Mock
private PopInflightMessageCounter popInflightMessageCounter;
private PopMessageProcessor popMessageProcessor;

private BrokerConfig brokerConfig;
private PopReviveService popReviveService;
private BrokerStatsManager brokerStatsManager;

@Before
public void before() {
brokerConfig = new BrokerConfig();
brokerConfig.setBrokerClusterName(CLUSTER_NAME);
brokerStatsManager = new BrokerStatsManager(brokerConfig);
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
when(brokerController.getMessageStore()).thenReturn(messageStore);
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
when(brokerController.getPopInflightMessageCounter()).thenReturn(popInflightMessageCounter);
when(brokerController.getBrokerStatsManager()).thenReturn(brokerStatsManager);
when(messageStore.getTimerMessageStore()).thenReturn(timerMessageStore);
when(timerMessageStore.getDequeueBehind()).thenReturn(0L);
when(timerMessageStore.getEnqueueBehind()).thenReturn(0L);

doNothing().when(popInflightMessageCounter).decrementInFlightMessageNum(any());

when(topicConfigManager.selectTopicConfig(anyString())).thenReturn(new TopicConfig());
when(subscriptionGroupManager.findSubscriptionGroupConfig(anyString())).thenReturn(new SubscriptionGroupConfig());

Expand Down Expand Up @@ -241,6 +251,88 @@ public void testSkipLongWaiteAckWithSameAck() throws Throwable {
assertEquals(maxReviveOffset, commitOffsetCaptor.getValue().longValue());
}

@Test
public void testCheckCKWithLotsOfSameReviveTime() throws Throwable {
brokerConfig.setReviveAckWaitMs(TimeUnit.SECONDS.toMillis(2));
brokerConfig.setReviveScanTime(10);
long maxReviveOffset = 10;
StringBuilder actualRetryTopic = new StringBuilder();

when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), anyString(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(Triple.of(new MessageExt(), "", false)));
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation -> {
MessageExtBrokerInner msg = invocation.getArgument(0);
actualRetryTopic.append(msg.getTopic());
return new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK));
});

when(consumerOffsetManager.queryOffset(PopAckConstants.REVIVE_GROUP, REVIVE_TOPIC, REVIVE_QUEUE_ID))
.thenReturn(0L);
List<MessageExt> reviveMessageExtList = new ArrayList<>();
long basePopTime = System.currentTimeMillis() - brokerConfig.getReviveAckWaitMs() * 2;
{
for (int i = 1; i <= maxReviveOffset; i++) {
PopCheckPoint ck = buildPopCheckPoint(i, basePopTime + 1, i);
MessageExtBrokerInner msg = buildCkMsg(ck);
reviveMessageExtList.add(msg);
}
}
doReturn(reviveMessageExtList, new ArrayList<>()).when(popReviveService).getReviveMessage(anyLong(), anyInt());

PopReviveService.ConsumeReviveObj consumeReviveObj = new PopReviveService.ConsumeReviveObj();
popReviveService.consumeReviveMessage(consumeReviveObj);

assertEquals(maxReviveOffset, consumeReviveObj.map.size());

ArgumentCaptor<Long> commitOffsetCaptor = ArgumentCaptor.forClass(Long.class);
doNothing().when(consumerOffsetManager).commitOffset(anyString(), anyString(), anyString(), anyInt(), commitOffsetCaptor.capture());
Thread.sleep(PopAckConstants.ackTimeInterval * 2);
popReviveService.mergeAndRevive(consumeReviveObj);
assertEquals(maxReviveOffset, consumeReviveObj.newOffset);
assertTrue("newOffset should be greater than oldOffset", consumeReviveObj.newOffset > consumeReviveObj.oldOffset);
}

@Test
public void testCheckCKReviveScanTimeAndMaxReviveSize() throws Throwable {
brokerConfig.setReviveAckWaitMs(TimeUnit.SECONDS.toMillis(2));
brokerConfig.setReviveScanTime(1000);
StringBuilder actualRetryTopic = new StringBuilder();
when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), anyString(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(Triple.of(new MessageExt(), "", false)));
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation -> {
MessageExtBrokerInner msg = invocation.getArgument(0);
actualRetryTopic.append(msg.getTopic());
return new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK));
});
when(consumerOffsetManager.queryOffset(PopAckConstants.REVIVE_GROUP, REVIVE_TOPIC, REVIVE_QUEUE_ID))
.thenReturn(0L);

doReturn(new ArrayList<MessageExt>(), new ArrayList<>()).when(popReviveService).getReviveMessage(anyLong(), anyInt());
long beginTs = System.currentTimeMillis();
PopReviveService.ConsumeReviveObj consumeReviveObj = new PopReviveService.ConsumeReviveObj();
popReviveService.consumeReviveMessage(consumeReviveObj);
long endTs = System.currentTimeMillis();
assertTrue(endTs - beginTs >= brokerConfig.getReviveScanTime());
long maxReviveOffset = PopReviveService.MAX_REVIVE_SIZE;
List<MessageExt> reviveMessageExtList = new ArrayList<>();
long basePopTime = System.currentTimeMillis() - brokerConfig.getReviveAckWaitMs() * 2;
{
for (int i = 1; i <= maxReviveOffset; i++) {
PopCheckPoint ck = buildPopCheckPoint(i, basePopTime + 1, i);
MessageExtBrokerInner msg = buildCkMsg(ck);
reviveMessageExtList.add(msg);
}
}
doReturn(reviveMessageExtList, new ArrayList<>()).when(popReviveService).getReviveMessage(anyLong(), anyInt());
brokerConfig.setReviveScanTime(10000);
consumeReviveObj = new PopReviveService.ConsumeReviveObj();
beginTs = System.currentTimeMillis();
popReviveService.consumeReviveMessage(consumeReviveObj);
endTs = System.currentTimeMillis();
assertTrue(endTs - beginTs < brokerConfig.getReviveScanTime());
assertEquals(PopReviveService.MAX_REVIVE_SIZE, consumeReviveObj.map.size());
}

@Test
public void testReviveMsgFromCk_messageFound_writeRetryOK() throws Throwable {
PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
Expand Down
Loading