Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void putMessagePositionInfoWrapper(DispatchRequest dispatchRequest) {

@Override
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, BoundaryType boundaryType) {
ConsumeQueueInterface logic = findOrCreateConsumeQueue(topic, queueId);
ConsumeQueueInterface logic = getConsumeQueue(topic, queueId);
if (logic != null) {
long resultOffset = logic.getOffsetInQueueByTime(timestamp, boundaryType);
// Make sure the result offset is in valid range.
Expand All @@ -193,7 +193,14 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, Bo
}

private FileQueueLifeCycle getLifeCycle(String topic, int queueId) {
return findOrCreateConsumeQueue(topic, queueId);
return getLifeCycle(topic, queueId, true);
}

private FileQueueLifeCycle getLifeCycle(String topic, int queueId, boolean create) {
if (create) {
return findOrCreateConsumeQueue(topic, queueId);
}
return getConsumeQueue(topic, queueId);
}

public boolean load(ConsumeQueueInterface consumeQueue) {
Expand Down Expand Up @@ -297,12 +304,12 @@ public long getMaxPhyOffsetInConsumeQueue() {

@Override
public long getMinOffsetInQueue(String topic, int queueId) {
ConsumeQueueInterface logic = findOrCreateConsumeQueue(topic, queueId);
ConsumeQueueInterface logic = getConsumeQueue(topic, queueId);
if (logic != null) {
return logic.getMinOffsetInQueue();
}

return -1;
return -1L;
}

public void checkSelf(ConsumeQueueInterface consumeQueue) {
Expand All @@ -320,7 +327,10 @@ public void checkSelf() {
}

public boolean flush(ConsumeQueueInterface consumeQueue, int flushLeastPages) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId(), false);
if (fileQueueLifeCycle == null) {
return false;
}
return fileQueueLifeCycle.flush(flushLeastPages);
}

Expand All @@ -334,17 +344,26 @@ public void flush() throws StoreException {

@Override
public void destroy(ConsumeQueueInterface consumeQueue) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId(), false);
if (fileQueueLifeCycle == null) {
return;
}
fileQueueLifeCycle.destroy();
}

public int deleteExpiredFile(ConsumeQueueInterface consumeQueue, long minCommitLogPos) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId(), false);
if (fileQueueLifeCycle == null) {
return 0;
}
return fileQueueLifeCycle.deleteExpiredFile(minCommitLogPos);
}

public void truncateDirtyLogicFiles(ConsumeQueueInterface consumeQueue, long phyOffset) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId(), false);
if (fileQueueLifeCycle == null) {
return;
}
fileQueueLifeCycle.truncateDirtyLogicFiles(phyOffset);
}

Expand All @@ -360,12 +379,19 @@ public void cleanSwappedMap(ConsumeQueueInterface consumeQueue, long forceCleanS
}

public boolean isFirstFileAvailable(ConsumeQueueInterface consumeQueue) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId(), false);
if (fileQueueLifeCycle == null) {
return false;
}

return fileQueueLifeCycle.isFirstFileAvailable();
}

public boolean isFirstFileExist(ConsumeQueueInterface consumeQueue) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId(), false);
if (fileQueueLifeCycle == null) {
return false;
}
return fileQueueLifeCycle.isFirstFileExist();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ public ConsumeQueueInterface findOrCreateConsumeQueue(String topic, int queueId)

@Override
public ConsumeQueueInterface getConsumeQueue(String topic, int queueId) {
// since rocksdb cq use lazy loading, we need to create it if not exist
return findOrCreateConsumeQueue(topic, queueId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ public void testIterator() throws Exception {

//The initial min max offset, before and after the creation of consume queue
Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(topic, queueId));
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, queueId));
Assert.assertEquals(-1, messageStore.getMinOffsetInQueue(topic, queueId));

ConsumeQueueInterface consumeQueue = messageStore.getConsumeQueue(topic, queueId);
ConsumeQueueInterface consumeQueue = messageStore.findConsumeQueue(topic, queueId);
Assert.assertEquals(CQType.SimpleCQ, consumeQueue.getCQType());
Assert.assertEquals(0, consumeQueue.getMaxOffsetInQueue());
Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
Expand All @@ -149,7 +149,7 @@ public void testIterator() throws Exception {
checkCQ(consumeQueue, msgNum, msgSize);

CombineConsumeQueueStore combineConsumeQueueStore = (CombineConsumeQueueStore) messageStore.getQueueStore();
ConsumeQueueInterface rocksDBConsumeQueue = combineConsumeQueueStore.getRocksDBConsumeQueueStore().getConsumeQueue(topic, queueId);
ConsumeQueueInterface rocksDBConsumeQueue = combineConsumeQueueStore.getRocksDBConsumeQueueStore().findOrCreateConsumeQueue(topic, queueId);
Assert.assertEquals(CQType.RocksDBCQ, rocksDBConsumeQueue.getCQType());
Assert.assertEquals(msgNum, rocksDBConsumeQueue.getMaxOffsetInQueue());
checkCQ(rocksDBConsumeQueue, msgNum, msgSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ public void testIterator() throws Exception {
String topic = UUID.randomUUID().toString();
//The initial min max offset, before and after the creation of consume queue
Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(topic, 0));
Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
Assert.assertEquals(-1, messageStore.getMinOffsetInQueue(topic, 0));

ConsumeQueueInterface consumeQueue = messageStore.getConsumeQueue(topic, 0);
ConsumeQueueInterface consumeQueue = messageStore.findConsumeQueue(topic, 0);
Assert.assertEquals(CQType.SimpleCQ, consumeQueue.getCQType());
Assert.assertEquals(0, consumeQueue.getMaxOffsetInQueue());
Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ public void testBatchSend_SysOuterBatch() throws Exception {
Assert.assertEquals(8, brokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
Assert.assertEquals(8, brokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
Assert.assertEquals(8, brokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
Assert.assertEquals(0, brokerController1.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
Assert.assertEquals(0, brokerController2.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
Assert.assertEquals(0, brokerController3.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
Assert.assertEquals(-1, brokerController1.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
Assert.assertEquals(-1, brokerController2.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
Assert.assertEquals(-1, brokerController3.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
Assert.assertEquals(0, brokerController1.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
Assert.assertEquals(0, brokerController2.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
Assert.assertEquals(0, brokerController3.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
Expand Down
Loading