diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 84a184ecf2a..17f6b58ddfe 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -98,7 +98,7 @@ import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.util.AclEvaluate; -import org.apache.kylin.rest.util.QueryRequestUtil; +import org.apache.kylin.rest.util.QueryRequestLimits; import org.apache.kylin.rest.util.TableauInterceptor; import org.apache.kylin.storage.hybrid.HybridInstance; import org.slf4j.Logger; @@ -412,25 +412,28 @@ public SQLResponse doQueryWithCache(SQLRequest sqlRequest, boolean secureEnabled final boolean isSelect = QueryUtil.isSelectStatement(sql); final boolean isPushDownUpdateEnabled = kylinConfig.isPushDownEnabled() && kylinConfig.isPushDownUpdateEnabled(); + final int maxConcurrentQuery = projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject(); if (!isSelect && !isPushDownUpdateEnabled) { logger.debug("Directly return exception as the sql is unsupported, and query pushdown is disabled"); throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); } - int maxConcurrentQuery = projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject(); - if (!QueryRequestUtil.openQueryRequest(projectInstance.getName(), maxConcurrentQuery)) { - logger.warn("Directly return exception as too many concurrent query requests for project:" + project); - throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING()); - } + SQLResponse sqlResponse = null; + + try { + // Check project level query request concurrency limitation per query server + if (!QueryRequestLimits.openQueryRequest(projectInstance.getName(), maxConcurrentQuery)) { + logger.warn( + "Directly return exception as too many concurrent query requests for project:" + project); + throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING()); + } - long startTime = System.currentTimeMillis(); + long startTime = System.currentTimeMillis(); - // force clear the query context before a new query - OLAPContext.clearThreadLocalContexts(); + // force clear the query context before a new query + OLAPContext.clearThreadLocalContexts(); - SQLResponse sqlResponse = null; - try { // to deal with the case that cache searching throws exception boolean queryCacheEnabled = checkCondition(kylinConfig.isQueryCacheEnabled(), "query cache disabled in KylinConfig") && // checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles"); @@ -498,7 +501,7 @@ && checkCondition( } } } finally { - QueryRequestUtil.closeQueryRequest(projectInstance.getName(), maxConcurrentQuery); + QueryRequestLimits.closeQueryRequest(projectInstance.getName(), maxConcurrentQuery); } logQuery(sqlRequest, sqlResponse); diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java similarity index 89% rename from server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java rename to server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java index 3eb1670d72d..cddaa12f022 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java +++ b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java @@ -31,10 +31,10 @@ import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; -public class QueryRequestUtil { - private static final Logger logger = LoggerFactory.getLogger(QueryRequestUtil.class); +public class QueryRequestLimits { + private static final Logger logger = LoggerFactory.getLogger(QueryRequestLimits.class); - private static LoadingCache queryRequestMap = CacheBuilder.newBuilder() + private static LoadingCache runningStats = CacheBuilder.newBuilder() .removalListener(new RemovalListener() { @Override public void onRemoval(RemovalNotification notification) { @@ -53,7 +53,7 @@ public static boolean openQueryRequest(String project, int maxConcurrentQuery) { return true; } try { - AtomicInteger nRunningQueries = queryRequestMap.get(project); + AtomicInteger nRunningQueries = runningStats.get(project); for (;;) { int nRunning = nRunningQueries.get(); if (nRunning < maxConcurrentQuery) { @@ -73,14 +73,14 @@ public static void closeQueryRequest(String project, int maxConcurrentQuery) { if (maxConcurrentQuery == 0) { return; } - AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); + AtomicInteger nRunningQueries = runningStats.getIfPresent(project); if (nRunningQueries != null) { nRunningQueries.decrementAndGet(); } } public static Integer getCurrentRunningQuery(String project) { - AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); + AtomicInteger nRunningQueries = runningStats.getIfPresent(project); if (nRunningQueries != null) { return nRunningQueries.get(); } else { diff --git a/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java b/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestLimitsTest.java similarity index 86% rename from server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java rename to server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestLimitsTest.java index fb6d2ff5783..021c057807f 100644 --- a/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java +++ b/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestLimitsTest.java @@ -24,7 +24,7 @@ import org.junit.Assert; import org.junit.Test; -public class QueryRequestUtilTest { +public class QueryRequestLimitsTest { @Test public void testOpenAndCloseQueryRequest() { @@ -43,11 +43,11 @@ public void testOpenAndCloseQueryRequest() { @Override public void run() { try { - boolean ifOpen = QueryRequestUtil.openQueryRequest(project, maxConcurrentQuery); + boolean ifOpen = QueryRequestLimits.openQueryRequest(project, maxConcurrentQuery); lock.countDown(); if (ifOpen) { lock.await(); - QueryRequestUtil.closeQueryRequest(project, maxConcurrentQuery); + QueryRequestLimits.closeQueryRequest(project, maxConcurrentQuery); } else { nQueryFailed.incrementAndGet(); } @@ -63,7 +63,7 @@ public void run() { } catch (InterruptedException e) { } } - Assert.assertEquals(new Integer(0), QueryRequestUtil.getCurrentRunningQuery(project)); + Assert.assertEquals(new Integer(0), QueryRequestLimits.getCurrentRunningQuery(project)); Assert.assertEquals(nThread - maxConcurrentQuery, nQueryFailed.get()); } }