Skip to content

Commit

Permalink
KYLIN-3926 Code review
Browse files Browse the repository at this point in the history
  • Loading branch information
nichunen committed May 7, 2019
1 parent 5098f20 commit ab124ac
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ protected void doMap(IntWritable key, NullWritable value, Context context)
Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
Configuration conf = null;
int averageSamplingPercentage = 0;
long sourceRecordCount = 0;
long effectiveTimeRange = 0;

for (CubeSegment cubeSegment : mergingSegments) {
String filePath = cubeSegment.getStatisticsResourcePath();
Expand Down Expand Up @@ -162,7 +164,14 @@ protected void doMap(IntWritable key, NullWritable value, Context context)
if (keyW.get() == 0L) {
// sampling percentage;
averageSamplingPercentage += Bytes.toInt(valueW.getBytes());
} else if (keyW.get() > 0) {
} else if (keyW.get() == -3) {
long perSourceRecordCount = Bytes.toLong(valueW.getBytes());
if (perSourceRecordCount > 0) {
sourceRecordCount += perSourceRecordCount;
CubeSegment iSegment = cubeInstance.getSegmentById(segmentId);
effectiveTimeRange += iSegment.getTSRange().duration();
}
} else if (keyW.get() > 0) {
HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
ByteArray byteArray = new ByteArray(valueW.getBytes());
hll.readRegisters(byteArray.asBuffer());
Expand All @@ -181,12 +190,13 @@ protected void doMap(IntWritable key, NullWritable value, Context context)
IOUtils.closeStream(reader);
}
}

averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
averageSamplingPercentage);
sourceRecordCount *= effectiveTimeRange == 0 ? 0
: (double) newSegment.getTSRange().duration() / effectiveTimeRange;
Path statisticsFilePath = new Path(statOutputPath,
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
averageSamplingPercentage, sourceRecordCount);

FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream fis = fs.open(statisticsFilePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
long sourceRecordCount = 0;
long effectiveTimeRange = 0;
for (String segmentId : CubingExecutableUtil.getMergingSegmentIds(this.getParams())) {
String fileKey = CubeSegment.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
String fileKey = CubeSegment
.getStatisticsResourcePath(CubingExecutableUtil.getCubeName(this.getParams()), segmentId);
InputStream is = rs.getResource(fileKey).content();
File tempFile = null;
FileOutputStream tempFileStream = null;
Expand Down Expand Up @@ -129,12 +130,15 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio
tempFile.delete();
}
}
sourceRecordCount *= effectiveTimeRange == 0 ? 0 : newSegment.getTSRange().duration() / effectiveTimeRange;
averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
sourceRecordCount *= effectiveTimeRange == 0 ? 0
: (double) newSegment.getTSRange().duration() / effectiveTimeRange;
averageSamplingPercentage = averageSamplingPercentage
/ CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size();
CubeStatsWriter.writeCuboidStatistics(conf,
new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap,
averageSamplingPercentage, sourceRecordCount);
Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()),
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream is = fs.open(statisticsFilePath);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@

package org.apache.kylin.engine.spark;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.Serializable;
import java.util.List;
import java.util.Map;

import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
Expand Down Expand Up @@ -62,14 +67,11 @@
import org.apache.spark.api.java.function.PairFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import scala.Tuple2;

/**
merge dictionary
Expand Down Expand Up @@ -236,14 +238,16 @@ public Tuple2<Text, Text> call(Integer index) throws Exception {
Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
Configuration conf = null;
int averageSamplingPercentage = 0;
long sourceRecordCount = 0;
long effectiveTimeRange = 0;

for (CubeSegment cubeSegment : mergingSegments) {
String filePath = cubeSegment.getStatisticsResourcePath();

File tempFile = File.createTempFile(segmentId, ".seq");

try(InputStream is = rs.getResource(filePath).content();
FileOutputStream tempFileStream = new FileOutputStream(tempFile)) {
try (InputStream is = rs.getResource(filePath).content();
FileOutputStream tempFileStream = new FileOutputStream(tempFile)) {

org.apache.commons.io.IOUtils.copy(is, tempFileStream);
}
Expand All @@ -252,15 +256,24 @@ public Tuple2<Text, Text> call(Integer index) throws Exception {

conf = HadoopUtil.getCurrentConfiguration();

try(SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf)) {
try (SequenceFile.Reader reader = new SequenceFile.Reader(fs,
new Path(tempFile.getAbsolutePath()), conf)) {
//noinspection deprecation
LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(),
conf);

while (reader.next(key, value)) {
if (key.get() == 0L) {
// sampling percentage
averageSamplingPercentage += Bytes.toInt(value.getBytes());
} else if (key.get() == -3) {
long perSourceRecordCount = Bytes.toLong(value.getBytes());
if (perSourceRecordCount > 0) {
sourceRecordCount += perSourceRecordCount;
CubeSegment iSegment = cubeInstance.getSegmentById(segmentId);
effectiveTimeRange += iSegment.getTSRange().duration();
}
} else if (key.get() > 0) {
HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
ByteArray byteArray = new ByteArray(value.getBytes());
Expand All @@ -276,9 +289,13 @@ public Tuple2<Text, Text> call(Integer index) throws Exception {
}
}

sourceRecordCount *= effectiveTimeRange == 0 ? 0
: (double) newSegment.getTSRange().duration() / effectiveTimeRange;
averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
Path statisticsFilePath = new Path(statOutputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap,
averageSamplingPercentage, sourceRecordCount);
Path statisticsFilePath = new Path(statOutputPath,
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);

FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
FSDataInputStream fis = fs.open(statisticsFilePath);
Expand Down

0 comments on commit ab124ac

Please sign in to comment.