Skip to content

Commit

Permalink
Remove unused property, class and maven module which is for Kylin 3 only
Browse files Browse the repository at this point in the history
  • Loading branch information
hit-lacus committed Jul 5, 2021
1 parent 914b97f commit e9291e3
Show file tree
Hide file tree
Showing 38 changed files with 1,709 additions and 2,668 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.dict.ByteComparator;
import org.apache.kylin.dict.StringBytesConverter;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.source.IReadableTable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -92,9 +90,9 @@ public boolean exists() throws IOException {
}

private Comparator<String> getComparatorByType(DataType type) {
Comparator<String> comparator;
Comparator<String> comparator = null;
if (!type.isNumberFamily()) {
comparator = new ByteComparator<>(new StringBytesConverter());
// comparator = new ByteComparator<>(new StringBytesConverter());
} else if (type.isIntegerFamily()) {
comparator = new Comparator<String>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,12 +500,6 @@ private String[] overwriteJobConf(Configuration conf, KylinConfig config, String
for (Map.Entry<String, String> entry : configOverride.getMRConfigOverride().entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
if (conf.get("mapreduce.job.is-mem-hungry") != null
&& Boolean.parseBoolean(conf.get("mapreduce.job.is-mem-hungry"))) {
for (Map.Entry<String, String> entry : configOverride.getMemHungryConfigOverride().entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
}

if (StringUtils.isNotBlank(cubeName)) {
remainingArgs.add("-" + BatchConstants.ARG_CUBE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,147 +18,130 @@

package org.apache.kylin.engine.mr.common;

import java.io.IOException;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.job.exception.JobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Sets;

public class MapReduceUtil {

private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class);

/**
* @return reducer number for calculating hll
*/
public static int getCuboidHLLCounterReducerNum(CubeInstance cube) {
int nCuboids = cube.getCuboidScheduler().getAllCuboidIds().size();
int shardBase = (nCuboids - 1) / cube.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1;

int hllMaxReducerNumber = cube.getConfig().getHadoopJobHLLMaxReducerNumber();
if (shardBase > hllMaxReducerNumber) {
shardBase = hllMaxReducerNumber;
}
return shardBase;
}

/**
* @param cuboidScheduler specified can provide more flexibility
* */
public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
double totalMapInputMB, int level)
throws ClassNotFoundException, IOException, InterruptedException, JobException {
CubeDesc cubeDesc = cubeSegment.getCubeDesc();
KylinConfig kylinConfig = cubeDesc.getConfig();

double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level "
+ level);

CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig);

double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;

if (level == -1) {
//merge case
double estimatedSize = cubeStatsReader.estimateCubeSize();
adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize,
totalMapInputMB, adjustedCurrentLayerSizeEst);
} else if (level == 0) {
//base cuboid case TODO: the estimation could be very WRONG because it has no correction
adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
} else {
parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
logger.debug(
"totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}",
totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
}

// number of reduce tasks
int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);

// adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
if (cubeDesc.hasMemoryHungryMeasures()) {
logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures");
numReduceTasks = numReduceTasks * 4;
}

// at least 1 reducer by default
numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
// no more than 500 reducer by default
numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);

return numReduceTasks;
}

public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler)
throws IOException {
KylinConfig kylinConfig = cubeSeg.getConfig();

Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap();
double totalSizeInM = 0;
for (Double cuboidSize : cubeSizeMap.values()) {
totalSizeInM += cuboidSize;
}
return getReduceTaskNum(totalSizeInM, kylinConfig);
}

// @return the first indicates the total reducer number, the second indicates the reducer number for base cuboid
public static Pair<Integer, Integer> getConvergeCuboidDataReduceTaskNums(CubeSegment cubeSeg) throws IOException {
long baseCuboidId = cubeSeg.getCuboidScheduler().getBaseCuboidId();

Set<Long> overlapCuboids = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
overlapCuboids.retainAll(cubeSeg.getCubeInstance().getCuboidsRecommend());
overlapCuboids.add(baseCuboidId);

Pair<Map<Long, Long>, Long> cuboidStats = CuboidStatsReaderUtil
.readCuboidStatsWithSourceFromSegment(overlapCuboids, cubeSeg);
Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSeg, cuboidStats.getFirst(),
cuboidStats.getSecond());
double totalSizeInM = 0;
for (Double cuboidSize : cubeSizeMap.values()) {
totalSizeInM += cuboidSize;
}

double baseSizeInM = cubeSizeMap.get(baseCuboidId);

KylinConfig kylinConfig = cubeSeg.getConfig();
int nBase = getReduceTaskNum(baseSizeInM, kylinConfig);
int nOther = getReduceTaskNum(totalSizeInM - baseSizeInM, kylinConfig);
return new Pair<>(nBase + nOther, nBase);
}

private static int getReduceTaskNum(double totalSizeInM, KylinConfig kylinConfig) {
double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();

// number of reduce tasks
int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);

// at least 1 reducer by default
numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
// no more than 500 reducer by default
numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);

logger.info("Having total map input MB " + Math.round(totalSizeInM));
logger.info("Having per reduce MB " + perReduceInputMB);
logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
return numReduceTasks;
}
//
// private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class);
//
// /**
// * @return reducer number for calculating hll
// */
// public static int getCuboidHLLCounterReducerNum(CubeInstance cube) {
// int nCuboids = cube.getCuboidScheduler().getAllCuboidIds().size();
// int shardBase = (nCuboids - 1) / cube.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1;
//
// int hllMaxReducerNumber = cube.getConfig().getHadoopJobHLLMaxReducerNumber();
// if (shardBase > hllMaxReducerNumber) {
// shardBase = hllMaxReducerNumber;
// }
// return shardBase;
// }
//
// /**
// * @param cuboidScheduler specified can provide more flexibility
// * */
// public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
// double totalMapInputMB, int level)
// throws ClassNotFoundException, IOException, InterruptedException, JobException {
// CubeDesc cubeDesc = cubeSegment.getCubeDesc();
// KylinConfig kylinConfig = cubeDesc.getConfig();
//
// double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
// double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
// logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level "
// + level);
//
// CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig);
//
// double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
//
// if (level == -1) {
// //merge case
// double estimatedSize = cubeStatsReader.estimateCubeSize();
// adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
// logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize,
// totalMapInputMB, adjustedCurrentLayerSizeEst);
// } else if (level == 0) {
// //base cuboid case TODO: the estimation could be very WRONG because it has no correction
// adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
// logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
// } else {
// parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
// currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
// adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
// logger.debug(
// "totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}",
// totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
// }
//
// // number of reduce tasks
// int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);
//
// // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
// if (cubeDesc.hasMemoryHungryMeasures()) {
// logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures");
// numReduceTasks = numReduceTasks * 4;
// }
//
// // at least 1 reducer by default
// numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
// // no more than 500 reducer by default
// numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
//
// return numReduceTasks;
// }
//
// public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler)
// throws IOException {
// KylinConfig kylinConfig = cubeSeg.getConfig();
//
// Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap();
// double totalSizeInM = 0;
// for (Double cuboidSize : cubeSizeMap.values()) {
// totalSizeInM += cuboidSize;
// }
// return getReduceTaskNum(totalSizeInM, kylinConfig);
// }
//
// // @return the first indicates the total reducer number, the second indicates the reducer number for base cuboid
// public static Pair<Integer, Integer> getConvergeCuboidDataReduceTaskNums(CubeSegment cubeSeg) throws IOException {
// long baseCuboidId = cubeSeg.getCuboidScheduler().getBaseCuboidId();
//
// Set<Long> overlapCuboids = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
// overlapCuboids.retainAll(cubeSeg.getCubeInstance().getCuboidsRecommend());
// overlapCuboids.add(baseCuboidId);
//
// Pair<Map<Long, Long>, Long> cuboidStats = CuboidStatsReaderUtil
// .readCuboidStatsWithSourceFromSegment(overlapCuboids, cubeSeg);
// Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSeg, cuboidStats.getFirst(),
// cuboidStats.getSecond());
// double totalSizeInM = 0;
// for (Double cuboidSize : cubeSizeMap.values()) {
// totalSizeInM += cuboidSize;
// }
//
// double baseSizeInM = cubeSizeMap.get(baseCuboidId);
//
// KylinConfig kylinConfig = cubeSeg.getConfig();
// int nBase = getReduceTaskNum(baseSizeInM, kylinConfig);
// int nOther = getReduceTaskNum(totalSizeInM - baseSizeInM, kylinConfig);
// return new Pair<>(nBase + nOther, nBase);
// }
//
// private static int getReduceTaskNum(double totalSizeInM, KylinConfig kylinConfig) {
// double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
// double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
//
// // number of reduce tasks
// int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);
//
// // at least 1 reducer by default
// numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
// // no more than 500 reducer by default
// numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
//
// logger.info("Having total map input MB " + Math.round(totalSizeInM));
// logger.info("Having per reduce MB " + perReduceInputMB);
// logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
// return numReduceTasks;
// }
}
Loading

0 comments on commit e9291e3

Please sign in to comment.