Skip to content

Commit

Permalink
KYLIN-2902 minor refine
Browse files Browse the repository at this point in the history
  • Loading branch information
lidongsjtu committed Dec 20, 2017
1 parent 7aef88a commit 8690fd2
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
import org.apache.kylin.rest.request.SQLRequest;
import org.apache.kylin.rest.response.SQLResponse;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.rest.util.QueryRequestUtil;
import org.apache.kylin.rest.util.QueryRequestLimits;
import org.apache.kylin.rest.util.TableauInterceptor;
import org.apache.kylin.storage.hybrid.HybridInstance;
import org.slf4j.Logger;
Expand Down Expand Up @@ -412,25 +412,28 @@ public SQLResponse doQueryWithCache(SQLRequest sqlRequest, boolean secureEnabled
final boolean isSelect = QueryUtil.isSelectStatement(sql);
final boolean isPushDownUpdateEnabled = kylinConfig.isPushDownEnabled()
&& kylinConfig.isPushDownUpdateEnabled();
final int maxConcurrentQuery = projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject();

if (!isSelect && !isPushDownUpdateEnabled) {
logger.debug("Directly return exception as the sql is unsupported, and query pushdown is disabled");
throw new BadRequestException(msg.getNOT_SUPPORTED_SQL());
}

int maxConcurrentQuery = projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject();
if (!QueryRequestUtil.openQueryRequest(projectInstance.getName(), maxConcurrentQuery)) {
logger.warn("Directly return exception as too many concurrent query requests for project:" + project);
throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING());
}
SQLResponse sqlResponse = null;

try {
// Check project level query request concurrency limitation per query server
if (!QueryRequestLimits.openQueryRequest(projectInstance.getName(), maxConcurrentQuery)) {
logger.warn(
"Directly return exception as too many concurrent query requests for project:" + project);
throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING());
}

long startTime = System.currentTimeMillis();
long startTime = System.currentTimeMillis();

// force clear the query context before a new query
OLAPContext.clearThreadLocalContexts();
// force clear the query context before a new query
OLAPContext.clearThreadLocalContexts();

SQLResponse sqlResponse = null;
try { // to deal with the case that cache searching throws exception
boolean queryCacheEnabled = checkCondition(kylinConfig.isQueryCacheEnabled(),
"query cache disabled in KylinConfig") && //
checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles");
Expand Down Expand Up @@ -498,7 +501,7 @@ && checkCondition(
}
}
} finally {
QueryRequestUtil.closeQueryRequest(projectInstance.getName(), maxConcurrentQuery);
QueryRequestLimits.closeQueryRequest(projectInstance.getName(), maxConcurrentQuery);
}

logQuery(sqlRequest, sqlResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;

public class QueryRequestUtil {
private static final Logger logger = LoggerFactory.getLogger(QueryRequestUtil.class);
public class QueryRequestLimits {
private static final Logger logger = LoggerFactory.getLogger(QueryRequestLimits.class);

private static LoadingCache<String, AtomicInteger> queryRequestMap = CacheBuilder.newBuilder()
private static LoadingCache<String, AtomicInteger> runningStats = CacheBuilder.newBuilder()
.removalListener(new RemovalListener<String, AtomicInteger>() {
@Override
public void onRemoval(RemovalNotification<String, AtomicInteger> notification) {
Expand All @@ -53,7 +53,7 @@ public static boolean openQueryRequest(String project, int maxConcurrentQuery) {
return true;
}
try {
AtomicInteger nRunningQueries = queryRequestMap.get(project);
AtomicInteger nRunningQueries = runningStats.get(project);
for (;;) {
int nRunning = nRunningQueries.get();
if (nRunning < maxConcurrentQuery) {
Expand All @@ -73,14 +73,14 @@ public static void closeQueryRequest(String project, int maxConcurrentQuery) {
if (maxConcurrentQuery == 0) {
return;
}
AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project);
AtomicInteger nRunningQueries = runningStats.getIfPresent(project);
if (nRunningQueries != null) {
nRunningQueries.decrementAndGet();
}
}

public static Integer getCurrentRunningQuery(String project) {
AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project);
AtomicInteger nRunningQueries = runningStats.getIfPresent(project);
if (nRunningQueries != null) {
return nRunningQueries.get();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.junit.Assert;
import org.junit.Test;

public class QueryRequestUtilTest {
public class QueryRequestLimitsTest {

@Test
public void testOpenAndCloseQueryRequest() {
Expand All @@ -43,11 +43,11 @@ public void testOpenAndCloseQueryRequest() {
@Override
public void run() {
try {
boolean ifOpen = QueryRequestUtil.openQueryRequest(project, maxConcurrentQuery);
boolean ifOpen = QueryRequestLimits.openQueryRequest(project, maxConcurrentQuery);
lock.countDown();
if (ifOpen) {
lock.await();
QueryRequestUtil.closeQueryRequest(project, maxConcurrentQuery);
QueryRequestLimits.closeQueryRequest(project, maxConcurrentQuery);
} else {
nQueryFailed.incrementAndGet();
}
Expand All @@ -63,7 +63,7 @@ public void run() {
} catch (InterruptedException e) {
}
}
Assert.assertEquals(new Integer(0), QueryRequestUtil.getCurrentRunningQuery(project));
Assert.assertEquals(new Integer(0), QueryRequestLimits.getCurrentRunningQuery(project));
Assert.assertEquals(nThread - maxConcurrentQuery, nQueryFailed.get());
}
}

0 comments on commit 8690fd2

Please sign in to comment.