Skip to content

Commit

Permalink
KYLIN-3457 Distribute by multi column if not set distribute column
Browse files Browse the repository at this point in the history
  • Loading branch information
Wayne1c authored and shaofengshi committed Jul 19, 2018
1 parent 27eb8cd commit 95d2a5b
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,10 @@ public String getFlatHiveTableClusterByDictColumn() {
return getOptional("kylin.source.hive.flat-table-cluster-by-dict-column");
}

public int getHiveRedistributeColumnCount() {
return Integer.parseInt(getOptional("kylin.source.hive.redistribute-column-count", "3"));
}

public int getDefaultVarcharPrecision() {
int v = Integer.parseInt(getOptional("kylin.source.hive.default-varchar-precision", "256"));
if (v < 1) {
Expand Down
36 changes: 29 additions & 7 deletions core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import java.util.List;
import java.util.Set;

import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.RowKeyColDesc;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
Expand Down Expand Up @@ -188,8 +191,13 @@ public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuil
}
}

private static void appendDistributeStatement(StringBuilder sql, TblColRef redistCol) {
sql.append(" DISTRIBUTE BY ").append(colName(redistCol, true)).append(";\n");
private static void appendDistributeStatement(StringBuilder sql, List<TblColRef> redistCols) {
sql.append(" DISTRIBUTE BY ");
for (TblColRef redistCol : redistCols) {
sql.append(colName(redistCol, true)).append(",");
}
sql.deleteCharAt(sql.length() - 1);
sql.append(";\n");
}

private static void appendClusterStatement(StringBuilder sql, TblColRef clusterCol) {
Expand Down Expand Up @@ -252,16 +260,30 @@ private static String getHiveDataType(String javaDataType) {
return hiveDataType;
}

public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc) {
public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
final String tableName = flatDesc.getTableName();
StringBuilder sql = new StringBuilder();
sql.append("INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM " + tableName);

TblColRef clusterCol = flatDesc.getClusterBy();
if (clusterCol != null) {
appendClusterStatement(sql, clusterCol);
if (flatDesc.getClusterBy() != null) {
appendClusterStatement(sql, flatDesc.getClusterBy());
} else if (flatDesc.getDistributedBy() != null) {
appendDistributeStatement(sql, Lists.newArrayList(flatDesc.getDistributedBy()));
} else {
appendDistributeStatement(sql, flatDesc.getDistributedBy());
int redistColumnCount = KylinConfig.getInstanceFromEnv().getHiveRedistributeColumnCount();

RowKeyColDesc[] rowKeyColDescs = cubeDesc.getRowkey().getRowKeyColumns();

if (rowKeyColDescs.length < redistColumnCount)
redistColumnCount = rowKeyColDescs.length;

List<TblColRef> redistColumns = Lists.newArrayListWithCapacity(redistColumnCount);

for (int i = 0; i < redistColumnCount; i++) {
redistColumns.add(rowKeyColDescs[i].getColRef());
}

appendDistributeStatement(sql, redistColumns);
}

return sql.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.HiveCmdBuilder;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JoinedFlatTable;
Expand Down Expand Up @@ -81,11 +82,11 @@ protected static AbstractExecutable createFlatHiveTableStep(String hiveInitState
}

protected static AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName,
IJoinedFlatTableDesc flatDesc) {
IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
step.setInitStatement(hiveInitStatements);
step.setIntermediateTable(flatDesc.getTableName());
step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc));
step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc, cubeDesc));
CubingExecutableUtil.setCubeName(cubeName, step.getParams());
step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE);
return step;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
Expand Down Expand Up @@ -118,18 +119,17 @@ public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
@Override
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName)
.getConfig();
CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
final KylinConfig cubeConfig = cubeInstance.getConfig();

final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);

// create flat table first
addStepPhase1_DoCreateFlatTable(jobFlow);

// then count and redistribute
if (cubeConfig.isHiveRedistributeEnabled()) {
if (flatDesc.getClusterBy() != null || flatDesc.getDistributedBy() != null) {
jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc));
}
jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor()));
}

// special for hive
Expand All @@ -154,7 +154,6 @@ protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable j
}
}


@Override
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.spark.ISparkInput;
Expand Down Expand Up @@ -75,18 +76,16 @@ public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
@Override
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName)
.getConfig();
CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
final KylinConfig cubeConfig = cubeInstance.getConfig();
final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);

// create flat table first
addStepPhase1_DoCreateFlatTable(jobFlow, hdfsWorkingDir, flatDesc, flatTableDatabase);

// then count and redistribute
if (cubeConfig.isHiveRedistributeEnabled()) {
if (flatDesc.getClusterBy() != null || flatDesc.getDistributedBy() != null) {
jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc));
}
jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor()));
}

// special for hive
Expand All @@ -103,8 +102,6 @@ protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable j
}
}



@Override
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
Expand Down

0 comments on commit 95d2a5b

Please sign in to comment.