From 480d80b672d6c91429e56542f24681edf84f309f Mon Sep 17 00:00:00 2001 From: javalife0312 Date: Thu, 28 Mar 2019 17:02:47 +0800 Subject: [PATCH] KYLIN-3841 Build Global Dict by MR/Hive --- .../apache/kylin/common/KylinConfigBase.java | 21 +- .../org/apache/kylin/cube/model/CubeDesc.java | 17 + .../org/apache/kylin/job/JoinedFlatTable.java | 4 +- .../job/constant/ExecutableConstants.java | 6 + .../measure/bitmap/BitmapMeasureType.java | 3 + .../kylin/metadata/model/FunctionDesc.java | 9 + .../engine/mr/common/BaseCuboidBuilder.java | 32 ++ .../mr/steps/FactDistinctColumnsMapper.java | 12 + .../kylin/source/hive/BeelineHiveClient.java | 23 ++ .../kylin/source/hive/CLIHiveClient.java | 31 ++ .../source/hive/CreateMrHiveDictStep.java | 322 ++++++++++++++++++ .../kylin/source/hive/HiveInputBase.java | 183 +++++++++- .../apache/kylin/source/hive/IHiveClient.java | 2 + .../kylin/source/hive/MRHiveDictUtil.java | 110 ++++++ 14 files changed, 771 insertions(+), 4 deletions(-) create mode 100644 source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java create mode 100644 source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index e700025d242..6f0578f2f46 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -26,6 +26,7 @@ import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Properties; import java.util.SortedSet; import java.util.TimeZone; @@ -1339,11 +1340,29 @@ public int getYarnStatusCheckIntervalSeconds() { return Integer.parseInt(getOptional("kylin.engine.mr.yarn-check-interval-seconds", "10")); } - public boolean isUseLocalClasspathEnabled() { return Boolean.parseBoolean(getOptional("kylin.engine.mr.use-local-classpath", TRUE)); } + // ============================================================================ + // mr-hive dict + // ============================================================================ + public String[] getMrHiveDictColumns() { + String columnStr = getOptional("kylin.dictionary.mr-hive.columns", ""); + if (Objects.nonNull(columnStr) && columnStr.length()>0) { + return columnStr.split(","); + } + return null; + } + public String getMrHiveDictDB() { + return getOptional("kylin.dictionary.mr-hive.database", getHiveDatabaseForIntermediateTable()); + } + + public String getMrHiveDictTableSuffix() { + return getOptional("kylin.dictionary.mr-hive.table.suffix", "_global_dict"); + } + + // ============================================================================ // ENGINE.SPARK // ============================================================================ diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 392c91543a8..083c8979d1e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -39,6 +39,7 @@ import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; @@ -1339,6 +1340,22 @@ public Set getAllColumnsHaveDictionary() { } } + //mr - hive global dict + if (overrideKylinProps.containsKey("kylin.dictionary.mr-hive.columns")) { + String mrHiveDictColumns = overrideKylinProps.get("kylin.dictionary.mr-hive.columns"); + if (Objects.nonNull(mrHiveDictColumns) && StringUtils.isNotEmpty(mrHiveDictColumns)) { + String[] mrHiveDictColumnArr = mrHiveDictColumns.split(","); + for (String dictColumn : mrHiveDictColumnArr) { + for (TblColRef colRef : result) { + String aliasCol = colRef.getTableAlias() + "_" + colRef.getName(); + if (aliasCol.equalsIgnoreCase(dictColumn)) { + result.remove(colRef); + } + } + } + } + } + return result; } diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 4e7e92cd696..297f6691275 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -250,11 +250,11 @@ private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBu sql.append(whereBuilder.toString()); } - private static String colName(TblColRef col) { + public static String colName(TblColRef col) { return colName(col, true); } - private static String colName(TblColRef col, boolean useAlias) { + public static String colName(TblColRef col, boolean useAlias) { return useAlias ? col.getTableAlias() + "_" + col.getName() : col.getName(); } diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index a47ff182bb2..8c835159c0e 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -77,4 +77,10 @@ private ExecutableConstants() { public static final String STEP_NAME_STREAMING_CREATE_DICTIONARY = "Build Dimension Dictionaries For Steaming Job"; public static final String STEP_NAME_STREAMING_BUILD_BASE_CUBOID = "Build Base Cuboid Data For Streaming Job"; public static final String STEP_NAME_STREAMING_SAVE_DICTS = "Save Cube Dictionaries"; + + // MR - Hive Dict + public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL = "Global Dict Mr/Hive extract dict_val from Data"; + public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL = "Global Dict Mr/Hive build dict_val"; + public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = "Global Dict Mr/Hive replace dict_val to Data"; + } diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java index f724257de5e..679edc4e79c 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java @@ -151,6 +151,9 @@ public List getColumnsNeedDictionary(FunctionDesc functionDesc) { // In order to keep compatibility with old version, tinyint/smallint/int column use value directly, without dictionary private boolean needDictionaryColumn(FunctionDesc functionDesc) { DataType dataType = functionDesc.getParameter().getColRefs().get(0).getType(); + if (functionDesc.isMrDict()) { + return false; + } if (dataType.isIntegerFamily() && !dataType.isBigInt()) { return false; } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java index 4ce0248188d..6fea6342f95 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java @@ -88,6 +88,15 @@ public static FunctionDesc newInstance(String expression, ParameterDesc param, S private DataType returnDataType; private MeasureType measureType; private boolean isDimensionAsMetric = false; + private boolean isMrDict = false; + + public boolean isMrDict() { + return isMrDict; + } + + public void setMrDict(boolean mrDict) { + isMrDict = mrDict; + } public void init(DataModelDesc model) { expression = expression.toUpperCase(Locale.ROOT); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java index 9322162c0ce..a7b17876058 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java @@ -21,7 +21,10 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; +import com.google.common.collect.Sets; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeSegment; @@ -30,8 +33,10 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; import org.apache.kylin.cube.util.KeyValueBuilder; +import org.apache.kylin.job.JoinedFlatTable; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; @@ -100,10 +105,37 @@ public ByteBuffer buildValue(String[] flatRow) { public Object[] buildValueObjects(String[] flatRow) { Object[] measures = new Object[cubeDesc.getMeasures().size()]; + + Set mrDicts = null; + if (Objects.nonNull(kylinConfig.getMrHiveDictColumns())) { + mrDicts = Sets.newHashSet(kylinConfig.getMrHiveDictColumns()); + logger.info("mr-dict ===1===" + mrDicts.toString()); + } + for (int i = 0; i < measures.length; i++) { String[] colValues = kvBuilder.buildValueOf(i, flatRow); + MeasureDesc measure = measureDescList.get(i); + logger.info("mr-dict ===2===" + measure.toString()); + + //mr dict + if (measure.getFunction().getExpression().equalsIgnoreCase(FunctionDesc.FUNC_COUNT_DISTINCT)) { + FunctionDesc functionDesc = measure.getFunction(); + TblColRef colRef = functionDesc.getParameter().getColRefs().get(0); + + logger.info("mr-dict ===3===" + colRef.getName()); + + if (Objects.nonNull(mrDicts) && mrDicts.contains(JoinedFlatTable.colName(colRef, true))) { + functionDesc.setMrDict(true); + measure.setFunction(functionDesc); + } + } + logger.info("mr-dict ===4===" + measure.getFunction().isMrDict()); + logger.info("mr-dict ===5===" + aggrIngesters[i]); + measures[i] = aggrIngesters[i].valueOf(colValues, measure, dictionaryMap); + + logger.info("mr-dict ===6===" + measures[i].toString()); } return measures; diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java index 7bffce7bcfc..b6bb44edc90 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java @@ -172,8 +172,20 @@ public void doMap(KEYIN key, Object record, Context context) throws IOException, Collection rowCollection = flatTableInputFormat.parseMapperInput(record); for (String[] row : rowCollection) { + if (rowCount < 10) { + StringBuilder builder = new StringBuilder(); + for (String item : row) { + builder.append(" | " + item + " | "); + } + logger.info("mr-dict ~ 1 : " + builder.toString()); + } + context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row)); for (int i = 0; i < allCols.size(); i++) { + if (rowCount < 10) { + logger.info("mr-dict ~ 2 | " + allCols.get(i)); + } + String fieldValue = row[columnIndex[i]]; if (fieldValue == null) continue; diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java index 6ad8593d8a9..619fbba06f0 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java @@ -25,6 +25,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -123,6 +124,28 @@ public long getHiveTableRows(String database, String tableName) throws Exception return count; } + @Override + public List getHiveResult(String hql) throws Exception { + ResultSet resultSet = null; + List datas = new ArrayList<>(); + try { + resultSet = stmt.executeQuery(hql); + int columnCtn = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { + Object[] data = new Object[columnCtn]; + for (int i = 0; i < columnCtn; i++) { + data[i] = resultSet.getObject(i + 1); + } + datas.add(data); + } + } catch (Exception e) { + throw new Exception(e); + } finally { + DBUtils.closeQuietly(resultSet); + } + return datas; + } + @Override public void executeHQL(String hql) throws IOException { throw new UnsupportedOperationException(); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java index bc9f17ed4ed..0592362e328 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java @@ -19,6 +19,7 @@ package org.apache.kylin.source.hive; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -129,6 +130,36 @@ public long getHiveTableRows(String database, String tableName) throws Exception return getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.ROW_COUNT); } + @Override + public List getHiveResult(String hql) throws Exception { + List data = new ArrayList<>(); + + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + hiveCmdBuilder.addStatement(hql); + Pair response = KylinConfig.getInstanceFromEnv().getCliCommandExecutor().execute(hiveCmdBuilder.toString()); + + String[] respData = response.getSecond().split("\n"); + + boolean isData = false; + + for (String item : respData) { + if (item.trim().equalsIgnoreCase("OK")) { + isData = true; + continue; + } + if (item.trim().startsWith("Time taken")) { + isData = false; + } + if (isData) { + Object[] arr = item.split("\t"); + data.add(arr); + } + + } + + return data; + } + private HiveMetaStoreClient getMetaStoreClient() throws Exception { if (metaStoreClient == null) { metaStoreClient = new HiveMetaStoreClient(hiveConf); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java new file mode 100644 index 00000000000..a69de5bb26c --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.hive; + +import com.google.common.base.Strings; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.lock.DistributedLock; +import org.apache.kylin.common.util.HiveCmdBuilder; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.common.PatternedLogger; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * + */ +public class CreateMrHiveDictStep extends AbstractExecutable { + + private static final Logger logger = LoggerFactory.getLogger(CreateMrHiveDictStep.class); + private final PatternedLogger stepLogger = new PatternedLogger(logger); + private DistributedLock lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread(); + + protected void createMrHiveDict(KylinConfig config) throws Exception { + try { + if (getIsLock()) { + String pathName = getLockPathName(); + if (Strings.isNullOrEmpty(pathName)) { + throw new IllegalArgumentException("create Mr-Hive dict lock path name is null"); + } + String lockPath = getLockPath(pathName); + boolean isLocked = true; + long lockStartTime = System.currentTimeMillis(); + while (isLocked) { + isLocked = lock.isLocked(lockPath); + stepLogger.log("zookeeper lock path :" + lockPath + ", result is " + isLocked); + if (!isLocked) { + break; + } + // wait 1 min and try again + Thread.sleep(60000); + } + stepLogger.log("zookeeper get lock costTime : " + ((System.currentTimeMillis() - lockStartTime) / 1000) + " s"); + lock.lock(lockPath); + } + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride()); + hiveCmdBuilder.addStatement(getInitStatement()); + + String sql = getCreateTableStatement(); + if (sql != null && sql.length() > 0) { + hiveCmdBuilder.addStatement(sql); + } + Map maxDictValMap = deSerilizableForMap(getMaxDictStatementMap()); + Map dictSqlMap = deSerilizableForMap(getCreateTableStatementMap()); + + if (dictSqlMap != null && dictSqlMap.size() > 0) { + IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + if (maxDictValMap != null && maxDictValMap.size() > 0) { + if (maxDictValMap.size() == dictSqlMap.size()) { + maxDictValMap.forEach((columnName, maxDictValSql) -> { + int max = 0; + List datas = null; + try { + datas = hiveClient.getHiveResult(maxDictValSql); + if (Objects.nonNull(datas) && datas.size() > 0) { + max = Integer.valueOf(datas.get(0)[0] + ""); + stepLogger.log(columnName + " Get Max Dict Value Sql : " + maxDictValSql); + stepLogger.log(columnName + " Get Max Dict Value Of : " + max); + } else { + stepLogger.log(columnName + " Get Max Dict Value Sql : " + maxDictValSql); + stepLogger.log(columnName + " Get Max Dict Value Of ERROR: hive execute result is null."); + throw new IOException("execute get max dict result fail : " + maxDictValSql); + } + } catch (Exception e) { + stepLogger.log(columnName + " Get Max Dict Value Sql : " + maxDictValSql); + stepLogger.log(columnName + " Get Max Dict Value Of ERROR :" + e.getMessage()); + logger.error("execute get max dict result fail : " + maxDictValSql); + e.printStackTrace(); + } + String dictSql = dictSqlMap.get(columnName).replace("___maxDictVal___", max + ""); + hiveCmdBuilder.addStatement(dictSql); + }); + } else { + logger.error("Max Dict Value size is not equals Dict Sql size ! "); + } + } else { + dictSqlMap.forEach((columnName, dictSql) -> hiveCmdBuilder.addStatement(dictSql)); + } + } + + final String cmd = hiveCmdBuilder.toString(); + + stepLogger.log("MR-Hive dict, cmd: " + cmd); + + Pair response = config.getCliCommandExecutor().execute(cmd, stepLogger); + getManager().addJobInfo(getId(), stepLogger.getInfo()); + if (response.getFirst() != 0) { + throw new RuntimeException("Failed to create mr hive dict, error code " + response.getFirst()); + } + if (getIsLock()) { + String pathName = getLockPathName(); + if (Strings.isNullOrEmpty(pathName)) { + throw new IllegalArgumentException(" create mr hive dict unlock path name is null"); + } + lock.unlock(getLockPath(pathName)); + stepLogger.log("zookeeper unlock path :" + getLockPathName()); + } + } catch (Exception e) { + if (getIsLock()) { + lock.unlock(getLockPath(getLockPathName())); + stepLogger.log("zookeeper unlock path :" + getLockPathName()); + } + e.printStackTrace(); + throw new Exception(e.getMessage()); + } + } + + private KylinConfig getCubeSpecificConfig() { + String cubeName = CubingExecutableUtil.getCubeName(getParams()); + CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = manager.getCube(cubeName); + return cube.getConfig(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + KylinConfig config = getCubeSpecificConfig(); + try { + + String preHdfsShell = getPreHdfsShell(); + if (Objects.nonNull(preHdfsShell) && !"".equalsIgnoreCase(preHdfsShell)) { + doRetry(preHdfsShell, config); + } + + createMrHiveDict(config); + + String postfixHdfsCmd = getPostfixHdfsShell(); + if (Objects.nonNull(postfixHdfsCmd) && !"".equalsIgnoreCase(postfixHdfsCmd)) { + doRetry(postfixHdfsCmd, config); + } + + return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); + + } catch (Exception e) { + logger.error("job:" + getId() + " execute finished with exception", e); + return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); + } + } + + private void doRetry(String cmd, KylinConfig config) throws Exception { + if (Objects.nonNull(cmd)) { + stepLogger.log("cmd : " + cmd); + int currTimes = 0; + int maxTimes = 1 * 60 * 6; //最长等待6个小时就失败 + boolean flag = true; + while (flag && currTimes <= maxTimes) { + try { + Pair result = config.getCliCommandExecutor().execute(cmd, stepLogger); + stepLogger.log(result.toString()); + flag = false; + } catch (Exception e) { + stepLogger.log("execute : " + cmd + " Failed && And errLog is " + e.getMessage()); + if (currTimes == maxTimes) { + throw new Exception(e); + } + Thread.sleep(1000 * 60); + } + } + } + } + + + public void setInitStatement(String sql) { + setParam("HiveInit", sql); + } + + public String getInitStatement() { + return getParam("HiveInit"); + } + + public void setCreateTableStatement(String sql) { + setParam("HiveRedistributeData", sql); + } + + public String getCreateTableStatement() { + return getParam("HiveRedistributeData"); + } + + public void setCreateTableStatementMap(Map dictSqlMap) { + setParam("HiveRedistributeDataMap", serilizableForMap(dictSqlMap)); + } + + public String getCreateTableStatementMap() { + return getParam("HiveRedistributeDataMap"); + } + + public void setMaxDictStatementMap(Map maxDictValMap) { + setParam("DictMaxMap", serilizableForMap(maxDictValMap)); + } + + public String getMaxDictStatementMap() { + return getParam("DictMaxMap"); + } + + public String getPreHdfsShell() { + return getParam("preHdfsCmd"); + } + + public void setPrefixHdfsShell(String cmd) { + setParam("preHdfsCmd", cmd); + } + + public String getPostfixHdfsShell() { + return getParam("postfixHdfsCmd"); + } + + public void setPostfixHdfsShell(String cmd) { + setParam("postfixHdfsCmd", cmd); + } + + public void setIsLock(Boolean isLock) { + setParam("isLock", String.valueOf(isLock)); + } + + public boolean getIsLock() { + String isLock = getParam("isLock"); + return Strings.isNullOrEmpty(isLock) ? false : Boolean.parseBoolean(isLock); + } + + public void setIsUnLock(Boolean isUnLock) { + setParam("isUnLock", String.valueOf(isUnLock)); + } + + public boolean getIsUnlock() { + String isUnLock = getParam("isUnLock"); + return Strings.isNullOrEmpty(isUnLock) ? false : Boolean.parseBoolean(isUnLock); + } + + public void setLockPathName(String pathName) { + setParam("lockPathName", pathName); + } + + public String getLockPathName() { + return getParam("lockPathName"); + } + + private static String serilizableForMap(Map map) { + JSONArray result = new JSONArray(); + if (map != null && map.size() > 0) { + map.forEach((key, value) -> { + JSONObject jsonObject = new JSONObject(); + try { + jsonObject.put(key, value); + } catch (JSONException e) { + e.printStackTrace(); + } + result.put(jsonObject); + }); + } + return result.toString(); + } + + private static Map deSerilizableForMap(String mapStr) { + Map result = new HashMap<>(); + if (mapStr != null) { + try { + JSONArray jsonArray = new JSONArray(mapStr); + int size = jsonArray.length(); + for (int i = 0; i < size; i++) { + JSONObject jsonObject = jsonArray.getJSONObject(i); + Iterator iterator = jsonObject.keys(); + while (iterator.hasNext()) { + String key = iterator.next(); + String value = jsonObject.getString(key); + result.put(key, value); + } + } + } catch (JSONException e) { + e.printStackTrace(); + } + } + return result; + } + + private String getLockPath(String pathName) { + return MRHiveDictUtil.DictHiveType.MrDictLockPath.getName() + pathName; + } + + +} diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java index bf674f52e3f..b839a6513ba 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java @@ -19,9 +19,13 @@ package org.apache.kylin.source.hive; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Objects; import java.util.Set; import com.google.common.collect.Lists; @@ -48,6 +52,7 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.JoinTableDesc; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,6 +88,18 @@ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { // create flat table first addStepPhase1_DoCreateFlatTable(jobFlow); + // create global dict + KylinConfig dictConfig = (flatDesc.getSegment()).getConfig(); + String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns(); + if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0 && !"".equals(mrHiveDictColumns[0])) { + String globalDictDatabase = dictConfig.getMrHiveDictDB(); + if (null == globalDictDatabase) { + throw new IllegalArgumentException("Mr-Hive Global dict database is null."); + } + String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix(); + addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, mrHiveDictColumns, globalDictDatabase, globalDictTable); + } + // then count and redistribute if (cubeConfig.isHiveRedistributeEnabled()) { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); @@ -98,6 +115,158 @@ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { addStepPhase1_DoMaterializeLookupTable(jobFlow); } + protected void addStepPhase1_DoCreateMrHiveGlobalDict(DefaultChainedExecutable jobFlow, + String[] mrHiveDictColumns, + String globalDictDatabase, + String globalDictTable) { + final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); + final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); + final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir); + + jobFlow.addTask(createMrHiveGlobalDictExtractStep(flatDesc, hiveInitStatements, jobWorkingDir, cubeName, mrHiveDictColumns, globalDictDatabase, globalDictTable)); + jobFlow.addTask(createMrHIveGlobalDictBuildStep(flatDesc, hiveInitStatements, hdfsWorkingDir, cubeName, mrHiveDictColumns, flatTableDatabase, globalDictDatabase, globalDictTable)); + jobFlow.addTask(createMrHiveGlobalDictReplaceStep(flatDesc, hiveInitStatements, hdfsWorkingDir, cubeName, mrHiveDictColumns, flatTableDatabase, globalDictDatabase, globalDictTable)); + } + + protected static AbstractExecutable createMrHiveGlobalDictExtractStep(IJoinedFlatTableDesc flatDesc, + String hiveInitStatements, + String jobWorkingDir, + String cubeName, + String[] mrHiveDictColumns, + String globalDictDatabase, + String globalDictTable) { + // Firstly, determine if the global dict hive table of cube is exists. + String createGlobalDictTableHql = "CREATE TABLE IF NOT EXISTS " + globalDictDatabase + "." + globalDictTable + "\n" + + "( dict_key STRING COMMENT '', \n" + + "dict_val INT COMMENT '' \n" + + ") \n" + + "COMMENT '' \n" + + "PARTITIONED BY (dict_column string) \n" + + "STORED AS TEXTFILE; \n"; + + final String dropDictIntermediateTableHql = MRHiveDictUtil.generateDropTableStatement(flatDesc); + final String createDictIntermediateTableHql = MRHiveDictUtil.generateCreateTableStatement(flatDesc); + + StringBuffer insertDataToDictIntermediateTableSql = new StringBuffer(); + for (String dictColumn : mrHiveDictColumns) { + insertDataToDictIntermediateTableSql.append(MRHiveDictUtil.generateInsertDataStatement(flatDesc, dictColumn)); + } + + CreateMrHiveDictStep step = new CreateMrHiveDictStep(); + step.setInitStatement(hiveInitStatements); + step.setCreateTableStatement(createGlobalDictTableHql + dropDictIntermediateTableHql + createDictIntermediateTableHql + + insertDataToDictIntermediateTableSql.toString()); + CubingExecutableUtil.setCubeName(cubeName, step.getParams()); + step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL); + return step; + } + + protected static AbstractExecutable createMrHIveGlobalDictBuildStep(IJoinedFlatTableDesc flatDesc, + String hiveInitStatements, + String hdfsWorkingDir, + String cubeName, + String[] mrHiveDictColumns, + String flatTableDatabase, + String globalDictDatabase, + String globalDictTable) { + String flatTable = flatTableDatabase + "." + MRHiveDictUtil.getHiveTableName(flatDesc, MRHiveDictUtil.DictHiveType.GroupBy); + Map maxDictValMap = new HashMap<>(); + Map dictHqlMap = new HashMap<>(); + + for (String dictColumn : mrHiveDictColumns) { + // get dict max value + String maxDictValHql = "SELECT if(max(dict_val) is null,0,max(dict_val)) as max_dict_val \n" + + " FROM " + globalDictDatabase + "." + globalDictTable + " \n" + + " WHERE dict_column = '" + dictColumn + "' \n"; + maxDictValMap.put(dictColumn, maxDictValHql); + try { + String dictHql + = "INSERT OVERWRITE TABLE " + globalDictDatabase + "." + globalDictTable + " \n" + + "PARTITION (dict_column = '" + dictColumn + "') \n" + + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + globalDictTable + " \n" + + "WHERE dict_column = '" + dictColumn + "' \n" + + "UNION \n" + + "SELECT a.dict_key as dict_key, (row_number() over(order by a.dict_key asc)) + (___maxDictVal___) as dict_val \n" + + "FROM \n" + + "( \n" + + " SELECT dict_key FROM " + flatTable + " WHERE dict_column = '" + dictColumn + "' AND dict_key is not null \n" + + ") a \n" + + "LEFT JOIN \n" + + "( \n" + + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + globalDictTable + " WHERE dict_column = '" + dictColumn + "' \n" + + ") b \n" + + "ON a.dict_key = b.dict_key \n" + + "WHERE b.dict_val is null; \n"; + dictHqlMap.put(dictColumn, dictHql); + } catch (Exception e) { + e.printStackTrace(); + } + } + String hiveInitStatementForUnstrict = "set hive.mapred.mode=unstrict;"; + CreateMrHiveDictStep step = new CreateMrHiveDictStep(); + step.setInitStatement(hiveInitStatements + hiveInitStatementForUnstrict); + step.setCreateTableStatementMap(dictHqlMap); + step.setMaxDictStatementMap(maxDictValMap); + step.setIsLock(true); + step.setLockPathName(cubeName); + CubingExecutableUtil.setCubeName(cubeName, step.getParams()); + step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL); + return step; + } + + protected static AbstractExecutable createMrHiveGlobalDictReplaceStep(IJoinedFlatTableDesc flatDesc, + String hiveInitStatements, + String hdfsWorkingDir, + String cubeName, + String[] mrHiveDictColumns, + String flatTableDatabase, + String globalDictDatabase, + String globalDictTable) { + Map dictHqlMap = new HashMap<>(); + for (String dictColumn : mrHiveDictColumns) { + StringBuffer dictHql = new StringBuffer(); + TblColRef dictColumnRef = null; + + String flatTable = flatTableDatabase + "." + flatDesc.getTableName(); + // replace the flat table's dict column value + dictHql.append("INSERT OVERWRITE TABLE " + flatTable + " \n"); + try { + dictHql.append("SELECT \n"); + Integer flatTableColumnSize = flatDesc.getAllColumns().size(); + for (int i = 0; i < flatTableColumnSize; i++) { + TblColRef tblColRef = flatDesc.getAllColumns().get(i); + if (i > 0) { + dictHql.append(","); + } + if (JoinedFlatTable.colName(tblColRef, flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) { + dictHql.append("b. dict_val \n"); + dictColumnRef = tblColRef; + } else { + dictHql.append("a." + JoinedFlatTable.colName(tblColRef) + " \n"); + } + } + dictHql.append("FROM " + flatTable + " a \n" + + "LEFT OUTER JOIN \n" + + "( \n" + + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + globalDictTable + " WHERE dict_column = '" + dictColumn + "' \n" + + ") b \n" + + " ON a." + JoinedFlatTable.colName(dictColumnRef) + " = b.dict_key;"); + dictHqlMap.put(dictColumn, dictHql.toString()); + } catch (Exception e) { + e.printStackTrace(); + } + } + CreateMrHiveDictStep step = new CreateMrHiveDictStep(); + step.setInitStatement(hiveInitStatements); + step.setCreateTableStatementMap(dictHqlMap); + step.setIsUnLock(true); + step.setLockPathName(cubeName); + + CubingExecutableUtil.setCubeName(cubeName, step.getParams()); + step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL); + return step; + } + protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); @@ -131,7 +300,19 @@ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { org.apache.kylin.source.hive.GarbageCollectionStep step = new org.apache.kylin.source.hive.GarbageCollectionStep(); step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP); - step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity())); + + List deleteTables = new ArrayList<>(); + deleteTables.add(getIntermediateTableIdentity()); + + //mr-hive dict and inner table do not need delete hdfs + String[] mrHiveDicts = flatDesc.getSegment().getConfig().getMrHiveDictColumns(); + if (Objects.nonNull(mrHiveDicts) && mrHiveDicts.length>0) { + String dictDb = flatDesc.getSegment().getConfig().getMrHiveDictDB(); + String tableName = dictDb + "." + flatDesc.getTableName() + "_" + MRHiveDictUtil.DictHiveType.GroupBy.getName(); + deleteTables.add(tableName); + } + step.setIntermediateTables(deleteTables); + step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir))); step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables, ",")); jobFlow.addTask(step); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java index ec2bf7fce2f..ba7d322bde5 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java @@ -34,4 +34,6 @@ public interface IHiveClient { List getHiveTableNames(String database) throws Exception; long getHiveTableRows(String database, String tableName) throws Exception; + + List getHiveResult(String hql) throws Exception; } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java new file mode 100644 index 00000000000..18912b933bd --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.source.hive; + +import org.apache.kylin.job.JoinedFlatTable; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MRHiveDictUtil { + private static final Logger logger = LoggerFactory.getLogger(MRHiveDictUtil.class); + + public enum DictHiveType { + GroupBy("group_by"), MrDictLockPath("/mr_dict_lock/"); + private String name; + + DictHiveType(String name) { + this.name = name; + } + + public String getName() { + return name; + } + } + + public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) { + StringBuilder ddl = new StringBuilder(); + String table = getHiveTableName(flatDesc, DictHiveType.GroupBy); + ddl.append("DROP TABLE IF EXISTS " + table + ";").append(" \n"); + return ddl.toString(); + } + + public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc) { + StringBuilder ddl = new StringBuilder(); + String table = getHiveTableName(flatDesc, DictHiveType.GroupBy); + + ddl.append("CREATE TABLE IF NOT EXISTS " + table + " \n"); + ddl.append("( \n "); + ddl.append("dict_key" + " " + "STRING" + " COMMENT '' \n"); + ddl.append(") \n"); + ddl.append("COMMENT '' \n"); + ddl.append("PARTITIONED BY (dict_column string) \n"); + ddl.append("STORED AS SEQUENCEFILE \n"); + ddl.append(";").append("\n"); + return ddl.toString(); + } + + public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, String dictColumn) { + String table = getHiveTableName(flatDesc, DictHiveType.GroupBy); + + StringBuilder sql = new StringBuilder(); + sql.append("SELECT" + "\n"); + + int index = 0; + for (TblColRef tblColRef : flatDesc.getAllColumns()) { + if (JoinedFlatTable.colName(tblColRef, flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) { + break; + } + index++; + } + + if (index == flatDesc.getAllColumns().size()) { + // dictColumn not in flatDesc,need throw Exception + index = -1; + } + + TblColRef col = flatDesc.getAllColumns().get(index); + sql.append(JoinedFlatTable.colName(col) + " \n"); + + MRHiveDictUtil.appendJoinStatement(flatDesc, sql); + + //group by + sql.append("GROUP BY "); + sql.append(JoinedFlatTable.colName(col) + " \n"); + + return "INSERT OVERWRITE TABLE " + table + " \n" + + "PARTITION (dict_column = '" + dictColumn + "')" + " \n" + + sql + ";\n"; + } + + + public static String getHiveTableName(IJoinedFlatTableDesc flatDesc, DictHiveType dictHiveType) { + StringBuffer table = new StringBuffer(flatDesc.getTableName()); + table.append("__"); + table.append(dictHiveType.getName()); + return table.toString(); + } + + public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql) { + sql.append("FROM " + flatDesc.getTableName() + "\n"); + } + +}