diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 637502ef05b..b2331e16271 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -788,6 +788,10 @@ public String getFlatHiveTableClusterByDictColumn() { return getOptional("kylin.source.hive.flat-table-cluster-by-dict-column"); } + public int getHiveRedistributeColumnCount() { + return Integer.parseInt(getOptional("kylin.source.hive.redistribute-column-count", "3")); + } + public int getDefaultVarcharPrecision() { int v = Integer.parseInt(getOptional("kylin.source.hive.default-varchar-precision", "256")); if (v < 1) { diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index a6c6daad71a..392323e2c13 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -25,9 +25,12 @@ import java.util.List; import java.util.Set; +import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.RowKeyColDesc; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; @@ -188,8 +191,13 @@ public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuil } } - private static void appendDistributeStatement(StringBuilder sql, TblColRef redistCol) { - sql.append(" DISTRIBUTE BY ").append(colName(redistCol, true)).append(";\n"); + private static void appendDistributeStatement(StringBuilder sql, List redistCols) { + sql.append(" DISTRIBUTE BY "); + for (TblColRef redistCol : redistCols) { + sql.append(colName(redistCol, true)).append(","); + } + sql.deleteCharAt(sql.length() - 1); + sql.append(";\n"); } private static void appendClusterStatement(StringBuilder sql, TblColRef clusterCol) { @@ -252,16 +260,30 @@ private static String getHiveDataType(String javaDataType) { return hiveDataType; } - public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc) { + public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) { final String tableName = flatDesc.getTableName(); StringBuilder sql = new StringBuilder(); sql.append("INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM " + tableName); - TblColRef clusterCol = flatDesc.getClusterBy(); - if (clusterCol != null) { - appendClusterStatement(sql, clusterCol); + if (flatDesc.getClusterBy() != null) { + appendClusterStatement(sql, flatDesc.getClusterBy()); + } else if (flatDesc.getDistributedBy() != null) { + appendDistributeStatement(sql, Lists.newArrayList(flatDesc.getDistributedBy())); } else { - appendDistributeStatement(sql, flatDesc.getDistributedBy()); + int redistColumnCount = KylinConfig.getInstanceFromEnv().getHiveRedistributeColumnCount(); + + RowKeyColDesc[] rowKeyColDescs = cubeDesc.getRowkey().getRowKeyColumns(); + + if (rowKeyColDescs.length < redistColumnCount) + redistColumnCount = rowKeyColDescs.length; + + List redistColumns = Lists.newArrayListWithCapacity(redistColumnCount); + + for (int i = 0; i < redistColumnCount; i++) { + redistColumns.add(rowKeyColDescs[i].getColRef()); + } + + appendDistributeStatement(sql, redistColumns); } return sql.toString(); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java index eae2e1cf4ae..9a2c2429e36 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java @@ -27,6 +27,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.HiveCmdBuilder; +import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.JoinedFlatTable; @@ -81,11 +82,11 @@ protected static AbstractExecutable createFlatHiveTableStep(String hiveInitState } protected static AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName, - IJoinedFlatTableDesc flatDesc) { + IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) { RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep(); step.setInitStatement(hiveInitStatements); step.setIntermediateTable(flatDesc.getTableName()); - step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc)); + step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc, cubeDesc)); CubingExecutableUtil.setCubeName(cubeName, step.getParams()); step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE); return step; diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index bfea632887d..d1b4fc901fb 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -28,6 +28,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; @@ -118,8 +119,9 @@ public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { @Override public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); - final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName) - .getConfig(); + CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + final KylinConfig cubeConfig = cubeInstance.getConfig(); + final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); // create flat table first @@ -127,9 +129,7 @@ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { // then count and redistribute if (cubeConfig.isHiveRedistributeEnabled()) { - if (flatDesc.getClusterBy() != null || flatDesc.getDistributedBy() != null) { - jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc)); - } + jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor())); } // special for hive @@ -154,7 +154,6 @@ protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable j } } - @Override public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java index 779835bbdcf..881be1ab3b6 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java @@ -23,6 +23,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.spark.ISparkInput; @@ -75,8 +76,8 @@ public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { @Override public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); - final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName) - .getConfig(); + CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); + final KylinConfig cubeConfig = cubeInstance.getConfig(); final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); // create flat table first @@ -84,9 +85,7 @@ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { // then count and redistribute if (cubeConfig.isHiveRedistributeEnabled()) { - if (flatDesc.getClusterBy() != null || flatDesc.getDistributedBy() != null) { - jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc)); - } + jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor())); } // special for hive @@ -103,8 +102,6 @@ protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable j } } - - @Override public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);