Skip to content

Commit

Permalink
KYLIN-3471 Merge dictionary using Mapreduce
Browse files Browse the repository at this point in the history
  • Loading branch information
Wayne1c authored and shaofengshi committed Aug 7, 2018
1 parent 050f1c1 commit 9a92f49
Show file tree
Hide file tree
Showing 9 changed files with 740 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private ExecutableConstants() {
public static final String STEP_NAME_COPY_DICTIONARY = "Copy dictionary from Old Segment";
public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
public static final String STEP_NAME_MERGE_UPDATE_DICTIONARY = "Update Dictionary Data";
public static final String STEP_NAME_MERGE_STATISTICS_WITH_OLD = "Merge Cuboid Statistics with Old for Optimization";
public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@

import java.util.List;

import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.MergeDictionaryJob;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -55,8 +60,8 @@ public CubingJob build() {

// Phase 1: Merge Dictionary
inputSide.addStepPhase1_MergeDictionary(result);
result.addTask(createMergeDictionaryStep(mergingSegmentIds));
result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
result.addTask(createMergeDictionaryStep(cubeSegment, jobId, mergingSegmentIds));
result.addTask(createUpdateDictionaryStep(cubeSegment, jobId, mergingSegmentIds));
outputSide.addStepPhase1_MergeDictionary(result);

// Phase 2: Merge Cube Files
Expand All @@ -69,4 +74,25 @@ public CubingJob build() {
return result;
}

public MapReduceExecutable createMergeDictionaryStep(CubeSegment seg, String jobID, List<String> mergingSegmentIds) {
MapReduceExecutable mergeDictionaryStep = new MapReduceExecutable();
mergeDictionaryStep.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
StringBuilder cmd = new StringBuilder();

appendMapReduceParameters(cmd);
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobID));
appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobID));
appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_MERGE_SEGMENT_IDS.getOpt(), StringUtil.join(mergingSegmentIds, ","));
appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_OUTPUT_PATH_DICT.getOpt(), getDictInfoPath(jobID));
appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_OUTPUT_PATH_STAT.getOpt(), getStatisticsPath(jobID));
appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Dictionary_" + seg.getCubeInstance().getName() + "_Step");

mergeDictionaryStep.setMapReduceParams(cmd.toString());
mergeDictionaryStep.setMapReduceJobClass(MergeDictionaryJob.class);

return mergeDictionaryStep;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
package org.apache.kylin.engine.mr;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidModeEnum;
import org.apache.kylin.cube.model.CubeDesc;
Expand All @@ -39,11 +43,12 @@
import org.apache.kylin.engine.mr.steps.UHCDictionaryJob;
import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
import org.apache.kylin.engine.mr.steps.UpdateDictionaryStep;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.metadata.model.TblColRef;

import com.google.common.base.Preconditions;
import org.apache.kylin.metadata.model.TblColRef;

/**
* Hold reusable steps for builders.
Expand Down Expand Up @@ -98,6 +103,22 @@ public MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<Strin
return result;
}

public UpdateDictionaryStep createUpdateDictionaryStep(CubeSegment seg, String jobId, List<String> mergingSegmentIds) {
UpdateDictionaryStep result = new UpdateDictionaryStep();
result.setName(ExecutableConstants.STEP_NAME_MERGE_UPDATE_DICTIONARY);

CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams());

// merged dict info path
result.getParams().put(BatchConstants.ARG_DICT_PATH, getDictInfoPath(jobId));
// metadata url
result.getParams().put(BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobId));

return result;
}

public MapReduceExecutable createBuildUHCDictStep(String jobId) {
MapReduceExecutable result = new MapReduceExecutable();
result.setName(ExecutableConstants.STEP_NAME_BUILD_UHC_DICTIONARY);
Expand Down Expand Up @@ -192,7 +213,6 @@ public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<Stri
return result;
}


public boolean isEnableUHCDictStep() {
if (!config.getConfig().isBuildUHCDictWithMREnabled()) {
return false;
Expand Down Expand Up @@ -220,7 +240,6 @@ public LookupMaterializeContext addMaterializeLookupTableSteps(final CubingJob r
return lookupMaterializeContext;
}


public SaveStatisticsStep createSaveStatisticsStep(String jobId) {
SaveStatisticsStep result = new SaveStatisticsStep();
result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
Expand Down Expand Up @@ -276,6 +295,10 @@ public String getDictRootPath(String jobId) {
return getRealizationRootPath(jobId) + "/dict";
}

public String getDictInfoPath(String jobId) {
return getRealizationRootPath(jobId) + "/dict_info";
}

public String getOptimizationRootPath(String jobId) {
return getRealizationRootPath(jobId) + "/optimize";
}
Expand Down Expand Up @@ -327,7 +350,6 @@ public String getDumpMetadataPath(String jobId) {
return getRealizationRootPath(jobId) + "/metadata";
}


public static String extractJobIDFromPath(String path) {
Matcher matcher = JOB_NAME_PATTERN.matcher(path);
// check the first occurrence
Expand All @@ -337,4 +359,10 @@ public static String extractJobIDFromPath(String path) {
throw new IllegalStateException("Can not extract job ID from file path : " + path);
}
}

public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
Map<String, String> param = new HashMap<>();
param.put("path", getDumpMetadataPath(jobId));
return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.kylin.engine.mr.common.JobRelatedMetaUtil.collectCubeMetadata;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
Expand Down Expand Up @@ -60,7 +61,9 @@
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.persistence.ResourceTool;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
Expand Down Expand Up @@ -126,6 +129,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
protected static final Option OPTION_LOOKUP_SNAPSHOT_ID = OptionBuilder.withArgName(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID).hasArg()
.isRequired(true).withDescription("Lookup table snapshotID")
.create(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID);
protected static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL)
.hasArg().isRequired(true).withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);


private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";

Expand Down Expand Up @@ -548,6 +554,41 @@ protected void attachSegmentsMetadataWithDict(List<CubeSegment> segments, Config
dumpKylinPropsAndMetadata(cube.getProject(), dumpList, cube.getConfig(), conf);
}

protected void attachSegmentsMetadataWithDict(List<CubeSegment> segments, String metaUrl) throws IOException {
Set<String> dumpList = new LinkedHashSet<>();
dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segments.get(0).getCubeInstance()));
for (CubeSegment segment : segments) {
dumpList.addAll(segment.getDictionaryPaths());
dumpList.add(segment.getStatisticsResourcePath());
}
dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig(), metaUrl);
}

private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig, String metadataUrl)
throws IOException {
File tmp = File.createTempFile("kylin_job_meta", "");
FileUtils.forceDelete(tmp); // we need a directory, so delete the file first

File metaDir = new File(tmp, "meta");
metaDir.mkdirs();

// dump metadata
JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList);

// write kylin.properties
Properties props = kylinConfig.exportToProperties();
props.setProperty("kylin.metadata.url", metadataUrl);

File kylinPropsFile = new File(metaDir, "kylin.properties");
try (FileOutputStream os = new FileOutputStream(kylinPropsFile)) {
props.store(os, kylinPropsFile.getAbsolutePath());
}

KylinConfig dstConfig = KylinConfig.createKylinConfig(props);
//upload metadata
ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig);
}

protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException {
attachSegmentMetadata(segment, conf, true, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public interface BatchConstants {
String ARG_TABLE_NAME = "tableName";
String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID";
String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots";
String ARG_META_URL = "metadataUrl";

/**
* logger and counter
Expand Down
Loading

0 comments on commit 9a92f49

Please sign in to comment.