Skip to content

Commit

Permalink
KYLIN-3839 Strorage clean up after refreshing and deleting segment
Browse files Browse the repository at this point in the history
  • Loading branch information
Wayne1c authored and shaofengshi committed Mar 18, 2019
1 parent 9c8d30b commit 62d5b03
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,10 @@ public int getHBaseReplicationScope() {
return Integer.parseInt(getOptional("kylin.storage.hbase.replication-scope", "0"));
}

public boolean cleanStorageAfterDelOperation() {
return Boolean.parseBoolean(getOptional("kylin.storage.clean-after-delete-operation", FALSE));
}

// ============================================================================
// ENGINE.MR
// ============================================================================
Expand Down
3 changes: 3 additions & 0 deletions core-common/src/main/resources/kylin-defaults.properties
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ kylin.storage.partition.max-scan-bytes=3221225472
# You can set it to a smaller value. 0 means use default.
# kylin.storage.hbase.coprocessor-timeout-seconds=0

# clean real storage after delete operation
# if you want to delete the real storage like htable of deleting segment, you can set it to true
kylin.storage.clean-after-delete-operation=false

### JOB ###

Expand Down
12 changes: 10 additions & 2 deletions core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,18 @@ public Segments<CubeSegment> getMergingSegments(CubeSegment mergedSegment) {
return segments.getMergingSegments(mergedSegment);
}

public CubeSegment getOriginalSegmentToRefresh(CubeSegment refreshedSegment) {
return getOriginalSegment(refreshedSegment);
}

public CubeSegment getOriginalSegmentToOptimize(CubeSegment optimizedSegment) {
return getOriginalSegment(optimizedSegment);
}

private CubeSegment getOriginalSegment(CubeSegment toSegment) {
for (CubeSegment segment : this.getSegments(SegmentStatusEnum.READY)) {
if (!optimizedSegment.equals(segment) //
&& optimizedSegment.getSegRange().equals(segment.getSegRange())) {
if (!toSegment.equals(segment) //
&& toSegment.getSegRange().equals(segment.getSegRange())) {
return segment;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
Expand All @@ -43,6 +45,7 @@
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.CuboidRecommenderUtil;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.common.PatternedLogger;
Expand Down Expand Up @@ -81,6 +84,8 @@
import org.apache.kylin.rest.response.MetricsResponse;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.rest.util.ValidateUtil;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.StorageCleanUtil;
import org.apache.kylin.storage.hybrid.HybridInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -348,8 +353,12 @@ public void deleteCube(CubeInstance cube) throws IOException {
}
}

List<CubeSegment> toRemoveSegs = cube.getSegments();

int cubeNum = getCubeManager().getCubesByDesc(cube.getDescriptor().getName()).size();
getCubeManager().dropCube(cube.getName(), cubeNum == 1);//only delete cube desc when no other cube is using it

cleanSegmentStorage(toRemoveSegs);
}

/**
Expand Down Expand Up @@ -380,7 +389,6 @@ public CubeInstance purgeCube(CubeInstance cube) throws IOException {

this.releaseAllSegments(cube);
return cube;

}

/**
Expand Down Expand Up @@ -550,7 +558,30 @@ public CubeInstance deleteSegment(CubeInstance cube, String segmentName) throws
logger.warn(String.format(Locale.ROOT, msg.getDELETE_SEGMENT_CAUSE_GAPS(), cube.getName(), segmentName));
}

return CubeManager.getInstance(getConfig()).updateCubeDropSegments(cube, toDelete);
CubeInstance cubeInstance = CubeManager.getInstance(getConfig()).updateCubeDropSegments(cube, toDelete);

cleanSegmentStorage(Collections.singletonList(toDelete));

return cubeInstance;
}

// clean segment data in hbase and hdfs
private void cleanSegmentStorage(List<CubeSegment> toRemoveSegs) throws IOException {
if (!KylinConfig.getInstanceFromEnv().cleanStorageAfterDelOperation()) {
return;
}

if (toRemoveSegs != null && !toRemoveSegs.isEmpty()) {
List<String> toDropHTables = Lists.newArrayListWithCapacity(toRemoveSegs.size());
List<String> toDelHDFSPaths = Lists.newArrayListWithCapacity(toRemoveSegs.size());
for (CubeSegment seg : toRemoveSegs) {
toDropHTables.add(seg.getStorageLocationIdentifier());
toDelHDFSPaths.add(JobBuilderSupport.getJobWorkingDir(seg.getConfig().getHdfsWorkingDirectory(), seg.getLastBuildJobID()));
}

StorageCleanUtil.dropHTables(new HBaseAdmin(HBaseConnection.getCurrentHBaseConfiguration()), toDropHTables);
StorageCleanUtil.deleteHDFSPath(HadoopUtil.getWorkingFileSystem(), toDelHDFSPaths);
}
}

public boolean isOrphonSegment(CubeInstance cube, String segId) {
Expand Down Expand Up @@ -586,7 +617,12 @@ protected void releaseAllJobs(CubeInstance cube) {
private void releaseAllSegments(CubeInstance cube) throws IOException {
releaseAllJobs(cube);

List<CubeSegment> toRemoveSegs = cube.getSegments();

// remove from metadata
getCubeManager().clearSegments(cube);

cleanSegmentStorage(toRemoveSegs);
}

public void updateOnNewSegmentReady(String cubeName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.kylin.storage.hbase.steps;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.kylin.common.util.StringUtil;
Expand Down Expand Up @@ -115,20 +116,6 @@ public HadoopShellExecutable createBulkLoadStep(String jobId) {
return bulkLoadStep;
}

public MergeGCStep createMergeGCStep() {
MergeGCStep result = new MergeGCStep();
result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE);
result.setOldHTables(getMergingHTables());
return result;
}

public MergeGCStep createOptimizeGCStep() {
MergeGCStep result = new MergeGCStep();
result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
result.setOldHTables(getOptimizeHTables());
return result;
}

public List<CubeSegment> getOptimizeSegments() {
CubeInstance cube = (CubeInstance) seg.getRealization();
List<CubeSegment> newSegments = Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY_PENDING));
Expand All @@ -153,19 +140,25 @@ public List<String> getOldHTables(final List<CubeSegment> oldSegments) {

public List<String> getMergingHTables() {
final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization())
.getMergingSegments((CubeSegment) seg);
.getMergingSegments(seg);
Preconditions.checkState(mergingSegments.size() > 1,
"there should be more than 2 segments to merge, target segment " + seg);
final List<String> mergingHTables = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
mergingHTables.add(merging.getStorageLocationIdentifier());
}
return mergingHTables;
return getOldHTables(mergingSegments);
}

public List<String> getRefreshingHTables() {
final CubeSegment refreshingSegment = ((CubeInstance) seg.getRealization()).getOriginalSegmentToRefresh(seg);
return getOldHTables(Collections.singletonList(refreshingSegment));
}

public List<String> getRefreshingHDFSPaths() {
final CubeSegment refreshingSegment = ((CubeInstance) seg.getRealization()).getOriginalSegmentToRefresh(seg);
return getOldHDFSPaths(Collections.singletonList(refreshingSegment));
}

public List<String> getMergingHDFSPaths() {
final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization())
.getMergingSegments((CubeSegment) seg);
.getMergingSegments(seg);
Preconditions.checkState(mergingSegments.size() > 1,
"there should be more than 2 segments to merge, target segment " + seg);
final List<String> mergingHDFSPaths = Lists.newArrayList();
Expand Down Expand Up @@ -203,43 +196,36 @@ public void addOptimizeGarbageCollectionSteps(DefaultChainedExecutable jobFlow)
List<String> toDeletePaths = new ArrayList<>();
toDeletePaths.add(getOptimizationRootPath(jobId));

HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
step.setDeletePaths(toDeletePaths);
step.setJobId(jobId);
HDFSPathGarbageCollectionStep step =createHDFSPathGCStep(toDeletePaths, jobId);

jobFlow.addTask(step);
}

public void addCheckpointGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
String jobId = jobFlow.getId();

jobFlow.addTask(createOptimizeGCStep());
MergeGCStep hBaseGCStep = createHBaseGCStep(getOptimizeHTables());
jobFlow.addTask(hBaseGCStep);

List<String> toDeletePaths = new ArrayList<>();
toDeletePaths.addAll(getOptimizeHDFSPaths());

HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
step.setDeletePaths(toDeletePaths);
step.setJobId(jobId);
HDFSPathGarbageCollectionStep step = createHDFSPathGCStep(toDeletePaths, jobId);

jobFlow.addTask(step);
}

public void addMergingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
String jobId = jobFlow.getId();

jobFlow.addTask(createMergeGCStep());
MergeGCStep hBaseGCStep = createHBaseGCStep(getMergingHTables());
jobFlow.addTask(hBaseGCStep);

List<String> toDeletePaths = new ArrayList<>();
toDeletePaths.addAll(getMergingHDFSPaths());
toDeletePaths.add(getHFilePath(jobId));

HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
step.setDeletePaths(toDeletePaths);
step.setJobId(jobId);
HDFSPathGarbageCollectionStep step = createHDFSPathGCStep(toDeletePaths, jobId);

jobFlow.addTask(step);
}
Expand All @@ -252,12 +238,46 @@ public void addCubingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
toDeletePaths.add(getHFilePath(jobId));
toDeletePaths.add(getShrunkenDictionaryPath(jobId));

HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE);
step.setDeletePaths(toDeletePaths);
step.setJobId(jobId);
CubeSegment oldSegment = ((CubeInstance)seg.getRealization()).getOriginalSegmentToRefresh(seg);

// refresh segment
if (oldSegment != null) {
// delete old hdfs job
toDeletePaths.addAll(getRefreshingHDFSPaths());

// drop old htables
MergeGCStep hBaseGCStep = createHBaseGCStep(getRefreshingHTables());
jobFlow.addTask(hBaseGCStep);
}

HDFSPathGarbageCollectionStep step = createHDFSPathGCStep(toDeletePaths, jobId);
jobFlow.addTask(step);
}

/**
* create 'HBase Garbage clean step' to drop HTables in HBase
* @param toDropHTables
* @return
*/
public MergeGCStep createHBaseGCStep(List<String> toDropHTables) {
MergeGCStep hBaseGCStep = new MergeGCStep();
hBaseGCStep.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE);
hBaseGCStep.setOldHTables(toDropHTables);
return hBaseGCStep;
}

/**
* create 'HDFS Garbage clean step' to delete paths on HDFS
* @param toDeletePaths
* @param jobId
* @return
*/
public HDFSPathGarbageCollectionStep createHDFSPathGCStep(List<String> toDeletePaths, String jobId) {
HDFSPathGarbageCollectionStep hdfsGCStep = new HDFSPathGarbageCollectionStep();
hdfsGCStep.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
hdfsGCStep.setDeletePaths(toDeletePaths);
hdfsGCStep.setJobId(jobId);
return hdfsGCStep;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {

@Override
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
// nothing to do
steps.addCubingGarbageCollectionSteps(jobFlow);
}

};
Expand Down
Loading

0 comments on commit 62d5b03

Please sign in to comment.