Skip to content

Commit

Permalink
KYLIN-3873 Fix inappropriate use of memory in SparkFactDistinct.java
Browse files Browse the repository at this point in the history
  • Loading branch information
Wayne1c authored and shaofengshi committed Mar 29, 2019
1 parent 4ec4051 commit 610d0f3
Showing 1 changed file with 65 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -193,12 +192,23 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {

final JavaRDD<String[]> recordRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable);

// read record from flat table
// output:
// 1, statistic
// 2, field value of dict col
// 3, min/max field value of not dict col
JavaPairRDD<SelfDefineSortableKey, Text> flatOutputRDD = recordRDD.mapPartitionsToPair(
new FlatOutputFucntion(cubeName, segmentId, metaUrl, sConf, samplingPercent, bytesWritten));

// repartition data, make each reducer handle only one col data or the statistic data
JavaPairRDD<SelfDefineSortableKey, Text> aggredRDD = flatOutputRDD
.repartitionAndSortWithinPartitions(new FactDistinctPartitioner(cubeName, metaUrl, sConf, reducerMapping.getTotalReducerNum()));

// multiple output result
// 1, CFG_OUTPUT_COLUMN: field values of dict col, which will not be built in reducer, like globalDictCol
// 2, CFG_OUTPUT_DICT: dictionary object built in reducer
// 3, CFG_OUTPUT_STATISTICS: cube statistic: hll of cuboids ...
// 4, CFG_OUTPUT_PARTITION: dimension value range(min,max)
JavaPairRDD<String, Tuple3<Writable, Writable, String>> outputRDD = aggredRDD
.mapPartitionsToPair(new MultiOutputFunction(cubeName, metaUrl, sConf, samplingPercent));

Expand Down Expand Up @@ -237,6 +247,12 @@ protected void execute(OptionsHelper optionsHelper) throws Exception {
}
}

/**
* output: Tuple2<SelfDefineSortableKey, Text>
* 1, for statistics, SelfDefineSortableKey = reducerId + cuboidId, Text = hll of cuboidId
* 2, for dict col, SelfDefineSortableKey = reducerId + field value, Text = ""
* 3, for not dict col, SelfDefineSortableKey = reducerId + min/max value, Text = ""
*/
static class FlatOutputFucntion implements PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
private transient volatile boolean initialized = false;
private String cubeName;
Expand Down Expand Up @@ -655,6 +671,8 @@ static class MultiOutputFunction implements
private CubeDesc cubeDesc;
private String maxValue = null;
private String minValue = null;
private boolean isDimensionCol;
private boolean isDictCol;
private List<Tuple2<String, Tuple3<Writable, Writable, String>>> result;

public MultiOutputFunction(String cubeName, String metaurl, SerializableConfiguration conf,
Expand Down Expand Up @@ -690,6 +708,9 @@ private void init() throws IOException {
col = reducerMapping.getColForReducer(taskId);
Preconditions.checkNotNull(col);

isDimensionCol = cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col) && col.getType().needCompare();
isDictCol = cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col);

// local build dict
buildDictInReducer = kConfig.isBuildDictInReducerEnabled();
if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder
Expand Down Expand Up @@ -727,45 +748,28 @@ public Iterator<Tuple2<String, Tuple3<Writable, Writable, String>>> call(
}
}
}
SelfDefineSortableKey prevKey = null;
List<Text> values = new ArrayList<Text>();
while (tuple2Iterator.hasNext()) {
Tuple2<SelfDefineSortableKey, Text> tuple = tuple2Iterator.next();
if (prevKey != null) {
int cmp = tuple._1.compareTo(prevKey);
// check
if (cmp < 0) {
throw new IOException(" key must be sorted. prevKey: " + prevKey + " current: " + tuple._1);
} else if (cmp > 0) {
processRow(prevKey.getText(), values);
prevKey = null;
values.clear();
}
}
prevKey = tuple._1;
values.add(tuple._2);
}

if (prevKey != null || values.size() > 0) {
processRow(prevKey.getText(), values);
prevKey = null;
values.clear();
}

if (isStatistics) {
//output the hll info
// calculate hll
calculateStatistics(tuple2Iterator);

// output the hll info
List<Long> allCuboids = Lists.newArrayList();
allCuboids.addAll(cuboidHLLMap.keySet());
Collections.sort(allCuboids);

logMapperAndCuboidStatistics(allCuboids); // for human check
outputStatistics(allCuboids, result);
} else {
//dimension col
if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
// calculate dict/dimRange/
calculateColData(tuple2Iterator);

// output dim range
if (isDimensionCol) {
outputDimRangeInfo(result);
}
// dic col

// output dict object
if (buildDictInReducer) {
Dictionary<String> dict = builder.build();
outputDict(col, dict, result);
Expand All @@ -775,33 +779,40 @@ public Iterator<Tuple2<String, Tuple3<Writable, Writable, String>>> call(
return result.iterator();
}

private void processRow(Text key, List<Text> values) throws IOException {
if (isStatistics) {
// for hll
long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
private void calculateStatistics(Iterator<Tuple2<SelfDefineSortableKey, Text>> tuple2Iterator) throws IOException {
while (tuple2Iterator.hasNext()) {
HLLCounter hll = new HLLCounter(cubeConfig.getCubeStatsHLLPrecision());

for (Text value : values) {
HLLCounter hll = new HLLCounter(cubeConfig.getCubeStatsHLLPrecision());
ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
hll.readRegisters(bf);
Tuple2<SelfDefineSortableKey, Text> tuple = tuple2Iterator.next();
long cuboidId = Bytes.toLong(tuple._1.getText().getBytes(), 1);

totalRowsBeforeMerge += hll.getCountEstimate();
ByteBuffer bf = ByteBuffer.wrap(tuple._2.getBytes(), 0, tuple._2.getLength());
hll.readRegisters(bf);

if (cuboidId == baseCuboidId) {
baseCuboidRowCountInMappers.add(hll.getCountEstimate());
}
totalRowsBeforeMerge += hll.getCountEstimate();

if (cuboidHLLMap.get(cuboidId) != null) {
cuboidHLLMap.get(cuboidId).merge(hll);
} else {
cuboidHLLMap.put(cuboidId, hll);
}
if (cuboidId == baseCuboidId) {
baseCuboidRowCountInMappers.add(hll.getCountEstimate());
}
} else {
String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);

if (cuboidHLLMap.get(cuboidId) != null) {
cuboidHLLMap.get(cuboidId).merge(hll);
} else {
cuboidHLLMap.put(cuboidId, hll);
}
}
}

private void calculateColData(Iterator<Tuple2<SelfDefineSortableKey, Text>> tuple2Iterator) {
while (tuple2Iterator.hasNext()) {
Tuple2<SelfDefineSortableKey, Text> tuple = tuple2Iterator.next();

String value = Bytes.toString(tuple._1.getText().getBytes(), 1, tuple._1.getText().getLength() - 1);
logAFewRows(value);

// if dimension col, compute max/min value
if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col) && col.getType().needCompare()) {
// include the col which is both dict col and dim col
if (isDimensionCol) {
if (minValue == null || col.getType().compare(minValue, value) > 0) {
minValue = value;
}
Expand All @@ -811,20 +822,19 @@ private void processRow(Text key, List<Text> values) throws IOException {
}

//if dict column
if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
if (isDictCol) {
if (buildDictInReducer) {
builder.addValue(value);
} else {
byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
// output written to baseDir/colName/-r-00000 (etc)
String fileName = col.getIdentity() + "/";
result.add(new Tuple2<String, Tuple3<Writable, Writable, String>>(
BatchConstants.CFG_OUTPUT_COLUMN, new Tuple3<Writable, Writable, String>(
NullWritable.get(), new Text(keyBytes), fileName)));
BatchConstants.CFG_OUTPUT_COLUMN, new Tuple3<Writable, Writable, String>(
NullWritable.get(), new Text(value.getBytes(StandardCharsets.UTF_8)), col.getIdentity() + "/")));
}
}

rowCount++;
}
rowCount++;
}

private void logMapperAndCuboidStatistics(List<Long> allCuboids) {
Expand Down

0 comments on commit 610d0f3

Please sign in to comment.