Skip to content

Commit

Permalink
[Enhancement] remove lock for QueryDetailQueue (#35645)
Browse files Browse the repository at this point in the history
Signed-off-by: kangkaisen <kangkaisen@apache.org>
  • Loading branch information
kangkaisen authored Nov 27, 2023
1 parent 9d77e62 commit edc7608
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 44 deletions.
8 changes: 8 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2111,6 +2111,14 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static boolean enable_collect_query_detail_info = false;

/**
* StarRocks-manager pull queries every 1 second
* metrics calculate query latency every 15 second
* do not set cacheTime lower than these time
*/
@ConfField(mutable = true)
public static long query_detail_cache_time_nanosecond = 30000000000L;

/**
* Min lag of routine load job to show in metrics
* Only show the routine load job whose lag is larger than min_routine_load_lag_for_metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static void logConnectionInfoToAuditLogAndQueryQueue(ConnectContext ctx,
queryDetail.setRemoteIP(ctx.getRemoteIP());
queryDetail.setDatabase(authPacket == null ? "null" : authPacket.getDb());
queryDetail.setErrorMessage(ctx.getState().getErrorMessage());
QueryDetailQueue.addAndRemoveTimeoutQueryDetail(queryDetail);
QueryDetailQueue.addQueryDetail(queryDetail);
}

public static List<String> getCurrentStackTraceToList() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ protected void addFinishedQueryDetail() {
queryDetail.setSpillBytes(statistics.spillBytes == null ? -1 : statistics.spillBytes);
}

QueryDetailQueue.addAndRemoveTimeoutQueryDetail(queryDetail);
QueryDetailQueue.addQueryDetail(queryDetail);
}

protected void addRunningQueryDetail(StatementBase parsedStmt) {
Expand Down Expand Up @@ -318,7 +318,7 @@ protected void addRunningQueryDetail(StatementBase parsedStmt) {
Optional.ofNullable(ctx.getResourceGroup()).map(TWorkGroup::getName).orElse(""));
ctx.setQueryDetail(queryDetail);
// copy queryDetail, cause some properties can be changed in future
QueryDetailQueue.addAndRemoveTimeoutQueryDetail(queryDetail.copy());
QueryDetailQueue.addQueryDetail(queryDetail.copy());
}

// process COM_QUERY statement,
Expand Down
61 changes: 29 additions & 32 deletions fe/fe-core/src/main/java/com/starrocks/qe/QueryDetailQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,42 +35,42 @@
package com.starrocks.qe;

import com.google.common.collect.Lists;
import com.starrocks.common.Config;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

// Queue of QueryDetail.
// It's used to collect queries for monitor.
public class QueryDetailQueue {
private static final LinkedList<QueryDetail> TOTAL_QUERIES = new LinkedList<QueryDetail>();
private static final ConcurrentLinkedDeque<QueryDetail> TOTAL_QUERIES = new ConcurrentLinkedDeque<>();
private static final ScheduledExecutorService SCHEDULED = Executors.newSingleThreadScheduledExecutor();

//starrocks-manager pull queries every 1 second
//metrics calculate query latency every 15 second
//do not set cacheTime lower than these time
private static final long CACHE_TIME_NS = 30000000000L;
private static long latestMS;
private static long latestMSCnt;
private static final AtomicLong LATEST_MS = new AtomicLong();
private static final AtomicLong LATEST_MS_CNT = new AtomicLong();

public static synchronized void addAndRemoveTimeoutQueryDetail(QueryDetail queryDetail) {
//set event time here to guarantee order
long now = getCurrentTimeNS();
queryDetail.setEventTime(now);
TOTAL_QUERIES.add(queryDetail);
static {
SCHEDULED.scheduleAtFixedRate(QueryDetailQueue::removeExpiredQueryDetails, 0, 5, TimeUnit.SECONDS);
}

Iterator<QueryDetail> it = TOTAL_QUERIES.iterator();
long deleteTime = now - CACHE_TIME_NS;
while (it.hasNext()) {
QueryDetail detail = it.next();
if (detail.getEventTime() < deleteTime) {
it.remove();
} else {
break;
}
public static void addQueryDetail(QueryDetail queryDetail) {
queryDetail.setEventTime(getCurrentTimeNS());
TOTAL_QUERIES.addLast(queryDetail);
}

private static void removeExpiredQueryDetails() {
long deleteTime = getCurrentTimeNS() - Config.query_detail_cache_time_nanosecond;

while (!TOTAL_QUERIES.isEmpty() && TOTAL_QUERIES.peekFirst().getEventTime() < deleteTime) {
TOTAL_QUERIES.pollFirst();
}
}

public static synchronized List<QueryDetail> getQueryDetailsAfterTime(long eventTime) {
public static List<QueryDetail> getQueryDetailsAfterTime(long eventTime) {
List<QueryDetail> results = Lists.newArrayList();
for (QueryDetail queryDetail : TOTAL_QUERIES) {
if (queryDetail.getEventTime() > eventTime) {
Expand All @@ -80,20 +80,17 @@ public static synchronized List<QueryDetail> getQueryDetailsAfterTime(long event
return results;
}

public static synchronized long getTotalQueriesCount() {
public static long getTotalQueriesCount() {
return TOTAL_QUERIES.size();
}

//must get lock before call
//NOTICE: this is not precise nano seconds, but good enough to make eventTime in order and unique
private static long getCurrentTimeNS() {
long ms = System.currentTimeMillis();
if (ms == latestMS) {
latestMSCnt++;
return ms * 1000000 + latestMSCnt;
if (ms == LATEST_MS.get()) {
return ms * 1000000 + LATEST_MS_CNT.incrementAndGet();
} else {
latestMS = ms;
latestMSCnt = 0;
LATEST_MS.set(ms);
LATEST_MS_CNT.set(0);
return ms * 1000000;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testQueryDetailQueue() {
startQueryDetail.setReturnRows(1);
startQueryDetail.setCpuCostNs(1002);
startQueryDetail.setMemCostBytes(100003);
QueryDetailQueue.addAndRemoveTimeoutQueryDetail(startQueryDetail);
QueryDetailQueue.addQueryDetail(startQueryDetail);

List<QueryDetail> queryDetails = QueryDetailQueue.getQueryDetailsAfterTime(startQueryDetail.getEventTime() - 1);
Assert.assertEquals(1, queryDetails.size());
Expand Down Expand Up @@ -83,16 +83,9 @@ public void testQueryDetailQueue() {
QueryDetail endQueryDetail = startQueryDetail.copy();
endQueryDetail.setLatency(1);
endQueryDetail.setState(QueryDetail.QueryMemState.FINISHED);
QueryDetailQueue.addAndRemoveTimeoutQueryDetail(endQueryDetail);
QueryDetailQueue.addQueryDetail(endQueryDetail);

queryDetails = QueryDetailQueue.getQueryDetailsAfterTime(startQueryDetail.getEventTime() - 1);
Assert.assertEquals(2, queryDetails.size());

//set first element eventTime to 1min ago to simulate queryDetail timeout
startQueryDetail.setEventTime(startQueryDetail.getEventTime() - 60000000000L);
//add new queryDetail, this will trigger delete
QueryDetailQueue.addAndRemoveTimeoutQueryDetail(new QueryDetail());
queryDetails = QueryDetailQueue.getQueryDetailsAfterTime(startQueryDetail.getEventTime() - 1);
Assert.assertEquals(2, queryDetails.size());
}
}

0 comments on commit edc7608

Please sign in to comment.