Skip to content

Commit

Permalink
KYLIN-2894 Query cache expiration strategy switches from manual inval…
Browse files Browse the repository at this point in the history
…idation to signature checking
  • Loading branch information
mingmwang authored and shaofengshi committed Oct 29, 2018
1 parent 3bcbaa8 commit f120886
Show file tree
Hide file tree
Showing 12 changed files with 545 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1553,6 +1553,11 @@ public String getQueryRealizationFilter() {
return getOptional("kylin.query.realization-filter", null);
}

public String getSQLResponseSignatureClass() {
return this.getOptional("kylin.query.signature-class",
"org.apache.kylin.rest.signature.RealizationSetCalculator");
}

// ============================================================================
// SERVER
// ============================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public class SQLResponse implements Serializable {

protected String traceUrl = null;

// it's sql response signature for cache checking, no need to return and should be JsonIgnore
protected String signature;

public SQLResponse() {
}

Expand Down Expand Up @@ -205,7 +208,16 @@ public String getTraceUrl() {
public void setTraceUrl(String traceUrl) {
this.traceUrl = traceUrl;
}


@JsonIgnore
public String getSignature() {
return signature;
}

public void setSignature(String signature) {
this.signature = signature;
}

@JsonIgnore
public List<QueryContext.CubeSegmentStatisticsResult> getCubeSegmentStatisticsList() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ public void notifyMetadataChange(String entity, Event event, String cacheKey) th

public void cleanDataCache(String project) {
if (cacheManager != null) {
logger.info("cleaning cache for project " + project + " (currently remove all entries)");
cacheManager.getCache(QueryService.SUCCESS_QUERY_CACHE).removeAll();
logger.info("cleaning cache for project " + project + " (currently remove exception entries)");
// cacheManager.getCache(QueryService.SUCCESS_QUERY_CACHE).removeAll();
cacheManager.getCache(QueryService.EXCEPTION_QUERY_CACHE).removeAll();
} else {
logger.warn("skip cleaning cache for project " + project);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,25 +110,25 @@
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.rest.util.AclPermissionUtil;
import org.apache.kylin.rest.util.QueryRequestLimits;
import org.apache.kylin.rest.util.SQLResponseSignatureUtil;
import org.apache.kylin.rest.util.TableauInterceptor;
import org.apache.kylin.storage.hybrid.HybridInstance;
import org.apache.kylin.storage.hybrid.HybridManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;

import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Element;

/**
* @author xduo
*/
Expand Down Expand Up @@ -226,7 +226,7 @@ public SQLResponse update(SQLRequest sqlRequest) throws Exception {
columnMetas.add(new SelectedColumnMeta(false, false, false, false, 1, false, Integer.MAX_VALUE, "c0", "c0",
null, null, null, Integer.MAX_VALUE, 128, 1, "char", false, false, false));

return buildSqlResponse(true, r.getFirst(), columnMetas);
return buildSqlResponse(sqlRequest.getProject(), true, r.getFirst(), columnMetas);

} catch (Exception e) {
logger.info("pushdown engine failed to finish current non-select query");
Expand Down Expand Up @@ -313,6 +313,12 @@ public void logQuery(final String queryId, final SQLRequest request, final SQLRe
}
}

if (realizationNames.isEmpty()) {
if (!Strings.isNullOrEmpty(response.getCube())) {
realizationNames.addAll(Lists.newArrayList(response.getCube().split(",")));
}
}

int resultRowCount = 0;
if (!response.getIsException() && response.getResults() != null) {
resultRowCount = response.getResults().size();
Expand Down Expand Up @@ -458,6 +464,8 @@ private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, boolean queryCach
String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()),
String.valueOf(sqlResponse.getTotalScanCount()));
if (checkCondition(queryCacheEnabled, "query cache is disabled") //
&& checkCondition(!Strings.isNullOrEmpty(sqlResponse.getCube()),
"query does not hit cube nor hybrid") //
&& checkCondition(!sqlResponse.getIsException(), "query has exception") //
&& checkCondition(
!(sqlResponse.isPushDown()
Expand All @@ -473,7 +481,7 @@ && checkCondition(
&& checkCondition(sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(),
"query response is too large: {} ({})", sqlResponse.getResults().size(),
kylinConfig.getLargeQueryThreshold())) {
cacheManager.getCache(SUCCESS_QUERY_CACHE).put(new Element(sqlRequest.getCacheKey(), sqlResponse));
cacheManager.getCache(SUCCESS_QUERY_CACHE).put(sqlRequest.getCacheKey(), sqlResponse);
}

} catch (Throwable e) { // calcite may throw AssertError
Expand All @@ -482,15 +490,13 @@ && checkCondition(sqlResponse.getResults().size() < kylinConfig.getLargeQueryThr
logger.error("Exception while executing query", e);
String errMsg = makeErrorMsgUserFriendly(e);

sqlResponse = new SQLResponse(null, null, null, 0, true, errMsg, false, false);
sqlResponse.setTotalScanCount(queryContext.getScannedRows());
sqlResponse.setTotalScanBytes(queryContext.getScannedBytes());
sqlResponse.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
sqlResponse = buildSqlResponse(sqlRequest.getProject(), false, null, null, true, errMsg);
sqlResponse.setThrowable(e.getCause() == null ? e : ExceptionUtils.getRootCause(e));

if (queryCacheEnabled && e.getCause() != null
&& ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) {
Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse));
exceptionCache.put(sqlRequest.getCacheKey(), sqlResponse);
}
}
return sqlResponse;
Expand All @@ -515,22 +521,36 @@ private String getUserName() {
}

public SQLResponse searchQueryInCache(SQLRequest sqlRequest) {
SQLResponse response = null;
Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
Cache successCache = cacheManager.getCache(SUCCESS_QUERY_CACHE);

Element element = null;
if ((element = exceptionCache.get(sqlRequest.getCacheKey())) != null) {
logger.info("The sqlResponse is found in EXCEPTION_QUERY_CACHE");
response = (SQLResponse) element.getObjectValue();
response.setHitExceptionCache(true);
} else if ((element = successCache.get(sqlRequest.getCacheKey())) != null) {
logger.info("The sqlResponse is found in SUCCESS_QUERY_CACHE");
response = (SQLResponse) element.getObjectValue();
response.setStorageCacheUsed(true);
String[] cacheTypes = new String[] { EXCEPTION_QUERY_CACHE, SUCCESS_QUERY_CACHE };
for (String cacheType : cacheTypes) {
Cache cache = cacheManager.getCache(cacheType);
Cache.ValueWrapper wrapper = cache.get(sqlRequest.getCacheKey());
if (wrapper == null) {
continue;
}
SQLResponse response = (SQLResponse) wrapper.get();
if (response == null) {
return null;
}
logger.info("The sqlResponse is found in " + cacheType);
if (!SQLResponseSignatureUtil.checkSignature(getConfig(), response, sqlRequest.getProject())) {
logger.info("The sql response signature is changed. Remove it from QUERY_CACHE.");
cache.evict(sqlRequest.getCacheKey());
return null;
} else {
switch (cacheType) {
case EXCEPTION_QUERY_CACHE:
response.setHitExceptionCache(true);
break;
case SUCCESS_QUERY_CACHE:
response.setStorageCacheUsed(true);
break;
default:
}
}
return response;
}

return response;
return null;
}

private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws Exception {
Expand Down Expand Up @@ -579,7 +599,8 @@ private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws Exception
List<List<String>> results = Lists.newArrayList();
List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
if (BackdoorToggles.getPrepareOnly()) {
return getPrepareOnlySqlResponse(correctedSql, conn, false, results, columnMetas);
return getPrepareOnlySqlResponse(sqlRequest.getProject(), correctedSql, conn, false, results,
columnMetas);
}
if (!isPrepareRequest) {
return executeRequest(correctedSql, sqlRequest, conn);
Expand Down Expand Up @@ -893,7 +914,7 @@ private SQLResponse executeRequest(String correctedSql, SQLRequest sqlRequest, C
close(resultSet, stat, null); //conn is passed in, not my duty to close
}

return buildSqlResponse(isPushDown, r.getFirst(), r.getSecond());
return buildSqlResponse(sqlRequest.getProject(), isPushDown, r.getFirst(), r.getSecond());
}

private SQLResponse executePrepareRequest(String correctedSql, PrepareSqlRequest sqlRequest,
Expand Down Expand Up @@ -921,7 +942,7 @@ private SQLResponse executePrepareRequest(String correctedSql, PrepareSqlRequest
DBUtils.closeQuietly(resultSet);
}

return buildSqlResponse(isPushDown, r.getFirst(), r.getSecond());
return buildSqlResponse(sqlRequest.getProject(), isPushDown, r.getFirst(), r.getSecond());
}

private Pair<List<List<String>>, List<SelectedColumnMeta>> pushDownQuery(SQLRequest sqlRequest, String correctedSql,
Expand Down Expand Up @@ -972,7 +993,8 @@ protected String makeErrorMsgUserFriendly(Throwable e) {
return QueryUtil.makeErrorMsgUserFriendly(e);
}

private SQLResponse getPrepareOnlySqlResponse(String correctedSql, Connection conn, Boolean isPushDown,
private SQLResponse getPrepareOnlySqlResponse(String projectName, String correctedSql, Connection conn,
Boolean isPushDown,
List<List<String>> results, List<SelectedColumnMeta> columnMetas) throws SQLException {

CalcitePrepareImpl.KYLIN_ONLY_PREPARE.set(true);
Expand Down Expand Up @@ -1018,7 +1040,7 @@ private SQLResponse getPrepareOnlySqlResponse(String correctedSql, Connection co
DBUtils.closeQuietly(preparedStatement);
}

return buildSqlResponse(isPushDown, results, columnMetas);
return buildSqlResponse(projectName, isPushDown, results, columnMetas);
}

private boolean isPrepareStatementWithParams(SQLRequest sqlRequest) {
Expand All @@ -1028,10 +1050,17 @@ private boolean isPrepareStatementWithParams(SQLRequest sqlRequest) {
return false;
}

private SQLResponse buildSqlResponse(Boolean isPushDown, List<List<String>> results,
private SQLResponse buildSqlResponse(String projectName, Boolean isPushDown, List<List<String>> results,
List<SelectedColumnMeta> columnMetas) {
return buildSqlResponse(projectName, isPushDown, results, columnMetas, false, null);
}

private SQLResponse buildSqlResponse(String projectName, Boolean isPushDown, List<List<String>> results,
List<SelectedColumnMeta> columnMetas, boolean isException, String exceptionMessage) {

boolean isPartialResult = false;

List<String> realizations = Lists.newLinkedList();
StringBuilder cubeSb = new StringBuilder();
StringBuilder logSb = new StringBuilder("Processed rows for each storageContext: ");
QueryContext queryContext = QueryContextFacade.current();
Expand All @@ -1049,17 +1078,20 @@ private SQLResponse buildSqlResponse(Boolean isPushDown, List<List<String>> resu

realizationName = ctx.realization.getName();
realizationType = ctx.realization.getStorageType();

realizations.add(realizationName);
}
queryContext.setContextRealization(ctx.id, realizationName, realizationType);
}
}
logger.info(logSb.toString());

SQLResponse response = new SQLResponse(columnMetas, results, cubeSb.toString(), 0, false, null, isPartialResult,
isPushDown);
SQLResponse response = new SQLResponse(columnMetas, results, cubeSb.toString(), 0, isException,
exceptionMessage, isPartialResult, isPushDown);
response.setTotalScanCount(queryContext.getScannedRows());
response.setTotalScanBytes(queryContext.getScannedBytes());
response.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
response.setSignature(SQLResponseSignatureUtil.createSignature(getConfig(), response, projectName));
return response;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.rest.signature;

import java.io.Serializable;

abstract class ComponentSignature<T extends ComponentSignature> implements Serializable, Comparable<T> {

public abstract String getKey();

@Override
public int compareTo(T o) {
return getKey().compareTo(o.getKey());
}
}
Loading

0 comments on commit f120886

Please sign in to comment.