Skip to content

Commit

Permalink
KYLIN-3377 Some improvements for lookup table - snapshot management
Browse files Browse the repository at this point in the history
  • Loading branch information
allenma authored and shaofengshi committed Jun 1, 2018
1 parent 5a96f8b commit b7d2cb7
Show file tree
Hide file tree
Showing 12 changed files with 876 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.engine.mr;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;

public class LookupSnapshotBuildJob extends DefaultChainedExecutable {

public static final Integer DEFAULT_PRIORITY = 30;

private static final String DEPLOY_ENV_NAME = "envName";
private static final String PROJECT_INSTANCE_NAME = "projectName";
private static final String CUBE_NAME = "cubeName";

private static final String JOB_TYPE = "Lookup ";

public static LookupSnapshotBuildJob createJob(CubeInstance cube, String tableName, String submitter,
KylinConfig kylinConfig) {
return initJob(cube, tableName, submitter, kylinConfig);
}

private static LookupSnapshotBuildJob initJob(CubeInstance cube, String tableName, String submitter,
KylinConfig kylinConfig) {
List<ProjectInstance> projList = ProjectManager.getInstance(kylinConfig).findProjects(cube.getType(),
cube.getName());
if (projList == null || projList.size() == 0) {
throw new RuntimeException("Cannot find the project containing the cube " + cube.getName() + "!!!");
} else if (projList.size() >= 2) {
String msg = "Find more than one project containing the cube " + cube.getName()
+ ". It does't meet the uniqueness requirement!!! ";
throw new RuntimeException(msg);
}

LookupSnapshotBuildJob result = new LookupSnapshotBuildJob();
SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
format.setTimeZone(TimeZone.getTimeZone(kylinConfig.getTimeZone()));
result.setDeployEnvName(kylinConfig.getDeployEnv());
result.setProjectName(projList.get(0).getName());
CubingExecutableUtil.setCubeName(cube.getName(), result.getParams());
result.setName(JOB_TYPE + " CUBE - " + cube.getName() + " - " + " TABLE - " + tableName + " - "
+ format.format(new Date(System.currentTimeMillis())));
result.setSubmitter(submitter);
result.setNotifyList(cube.getDescriptor().getNotifyList());
return result;
}

protected void setDeployEnvName(String name) {
setParam(DEPLOY_ENV_NAME, name);
}

public String getDeployEnvName() {
return getParam(DEPLOY_ENV_NAME);
}

public String getProjectName() {
return getParam(PROJECT_INSTANCE_NAME);
}

public void setProjectName(String name) {
setParam(PROJECT_INSTANCE_NAME, name);
}

public String getCubeName() {
return getParam(CUBE_NAME);
}

@Override
public int getDefaultPriority() {
return DEFAULT_PRIORITY;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.engine.mr;

import java.util.List;

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.SnapshotTableDesc;
import org.apache.kylin.engine.mr.steps.lookup.LookupExecutableUtil;
import org.apache.kylin.engine.mr.steps.lookup.LookupSnapshotToMetaStoreStep;
import org.apache.kylin.engine.mr.steps.lookup.UpdateCubeAfterSnapshotStep;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LookupSnapshotJobBuilder {
private static final Logger logger = LoggerFactory.getLogger(LookupSnapshotJobBuilder.class);
private CubeInstance cube;
private String lookupTable;
private List<String> segments;
private String submitter;
private KylinConfig kylinConfig;

public LookupSnapshotJobBuilder(CubeInstance cube, String lookupTable, List<String> segments, String submitter) {
this.cube = cube;
this.lookupTable = lookupTable;
this.segments = segments;
this.submitter = submitter;
this.kylinConfig = cube.getConfig();
}

public LookupSnapshotBuildJob build() {
logger.info("new job to build lookup snapshot:{} for cube:{}", lookupTable, cube.getName());
LookupSnapshotBuildJob result = LookupSnapshotBuildJob.createJob(cube, lookupTable, submitter, kylinConfig);
CubeDesc cubeDesc = cube.getDescriptor();
SnapshotTableDesc snapshotTableDesc = cubeDesc.getSnapshotTableDesc(lookupTable);
if (snapshotTableDesc != null && snapshotTableDesc.isExtSnapshotTable()) {
addExtMaterializeLookupTableSteps(result, snapshotTableDesc);
} else {
addInMetaStoreMaterializeLookupTableSteps(result);
}
return result;
}

private void addExtMaterializeLookupTableSteps(final LookupSnapshotBuildJob result,
SnapshotTableDesc snapshotTableDesc) {
ILookupMaterializer materializer = MRUtil.getExtLookupMaterializer(snapshotTableDesc.getStorageType());
materializer.materializeLookupTable(result, cube, lookupTable);

UpdateCubeAfterSnapshotStep afterSnapshotStep = new UpdateCubeAfterSnapshotStep();
afterSnapshotStep.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_UPDATE_CUBE);
LookupExecutableUtil.setCubeName(cube.getName(), afterSnapshotStep.getParams());
LookupExecutableUtil.setLookupTableName(lookupTable, afterSnapshotStep.getParams());
LookupExecutableUtil.setSegments(segments, afterSnapshotStep.getParams());
LookupExecutableUtil.setJobID(result.getId(), afterSnapshotStep.getParams());
result.addTask(afterSnapshotStep);
}

private void addInMetaStoreMaterializeLookupTableSteps(final LookupSnapshotBuildJob result) {
LookupSnapshotToMetaStoreStep lookupSnapshotToMetaStoreStep = new LookupSnapshotToMetaStoreStep();
lookupSnapshotToMetaStoreStep.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_META_STORE);
LookupExecutableUtil.setCubeName(cube.getName(), lookupSnapshotToMetaStoreStep.getParams());
LookupExecutableUtil.setLookupTableName(lookupTable, lookupSnapshotToMetaStoreStep.getParams());
LookupExecutableUtil.setSegments(segments, lookupSnapshotToMetaStoreStep.getParams());
result.addTask(lookupSnapshotToMetaStoreStep);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.engine.mr.steps.lookup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;

import com.google.common.collect.Lists;

public class LookupExecutableUtil {

public static final String CUBE_NAME = "cubeName";
public static final String LOOKUP_TABLE_NAME = "lookupTableName";
public static final String PROJECT_NAME = "projectName";
public static final String LOOKUP_SNAPSHOT_ID = "snapshotID";
public static final String SEGMENT_IDS = "segments";
public static final String JOB_ID = "jobID";


public static void setCubeName(String cubeName, Map<String, String> params) {
params.put(CUBE_NAME, cubeName);
}

public static String getCubeName(Map<String, String> params) {
return params.get(CUBE_NAME);
}

public static void setLookupTableName(String lookupTableName, Map<String, String> params) {
params.put(LOOKUP_TABLE_NAME, lookupTableName);
}

public static String getLookupTableName(Map<String, String> params) {
return params.get(LOOKUP_TABLE_NAME);
}

public static void setProjectName(String projectName, Map<String, String> params) {
params.put(PROJECT_NAME, projectName);
}

public static String getProjectName(Map<String, String> params) {
return params.get(PROJECT_NAME);
}

public static void setLookupSnapshotID(String snapshotID, Map<String, String> params) {
params.put(LOOKUP_SNAPSHOT_ID, snapshotID);
}

public static String getLookupSnapshotID(Map<String, String> params) {
return params.get(LOOKUP_SNAPSHOT_ID);
}

public static List<String> getSegments(Map<String, String> params) {
final String ids = params.get(SEGMENT_IDS);
if (ids != null) {
final String[] splitted = StringUtils.split(ids, ",");
ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
for (String id : splitted) {
result.add(id);
}
return result;
} else {
return Collections.emptyList();
}
}

public static void setSegments(List<String> segments, Map<String, String> params) {
params.put(SEGMENT_IDS, StringUtils.join(segments, ","));
}


public static String getJobID(Map<String, String> params) {
return params.get(JOB_ID);
}

public static void setJobID(String jobID, Map<String, String> params) {
params.put(JOB_ID, jobID);
}

public static void updateSnapshotPathToCube(CubeManager cubeManager, CubeInstance cube, String lookupTableName,
String snapshotPath) throws IOException {
cubeManager.updateCubeLookupSnapshot(cube, lookupTableName, snapshotPath);
cube.putSnapshotResPath(lookupTableName, snapshotPath);
}

public static void updateSnapshotPathToSegments(CubeManager cubeManager, CubeInstance cube, List<String> segmentIDs, String lookupTableName, String snapshotPath) throws IOException {
CubeInstance cubeCopy = cube.latestCopyForWrite();
if (segmentIDs.size() > 0) {
CubeSegment[] segments = new CubeSegment[segmentIDs.size()];
for (int i = 0; i < segments.length; i++) {
CubeSegment segment = cubeCopy.getSegmentById(segmentIDs.get(i));
if (segment == null) {
throw new IllegalStateException("the segment not exist in cube:" + segmentIDs.get(i));
}
segment.putSnapshotResPath(lookupTableName, snapshotPath);
segments[i] = segment;
}
CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy);
cubeUpdate.setToUpdateSegs(segments);
cubeManager.updateCube(cubeUpdate);

// Update the input cubeSeg after the resource store updated
for (int i = 0; i < segments.length; i++) {
CubeSegment segment = cube.getSegmentById(segmentIDs.get(i));
segment.putSnapshotResPath(lookupTableName, snapshotPath);
}
}
}

}
Loading

0 comments on commit b7d2cb7

Please sign in to comment.