Skip to content

Commit

Permalink
KYLIN-3841 Build Global Dict by MR/Hive
Browse files Browse the repository at this point in the history
  • Loading branch information
javalife0312 authored and nichunen committed Apr 8, 2019
1 parent 80ac894 commit 480d80b
Show file tree
Hide file tree
Showing 14 changed files with 771 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import java.util.SortedSet;
import java.util.TimeZone;
Expand Down Expand Up @@ -1339,11 +1340,29 @@ public int getYarnStatusCheckIntervalSeconds() {
return Integer.parseInt(getOptional("kylin.engine.mr.yarn-check-interval-seconds", "10"));
}


public boolean isUseLocalClasspathEnabled() {
return Boolean.parseBoolean(getOptional("kylin.engine.mr.use-local-classpath", TRUE));
}

// ============================================================================
// mr-hive dict
// ============================================================================
public String[] getMrHiveDictColumns() {
String columnStr = getOptional("kylin.dictionary.mr-hive.columns", "");
if (Objects.nonNull(columnStr) && columnStr.length()>0) {
return columnStr.split(",");
}
return null;
}
public String getMrHiveDictDB() {
return getOptional("kylin.dictionary.mr-hive.database", getHiveDatabaseForIntermediateTable());
}

public String getMrHiveDictTableSuffix() {
return getOptional("kylin.dictionary.mr-hive.table.suffix", "_global_dict");
}


// ============================================================================
// ENGINE.SPARK
// ============================================================================
Expand Down
17 changes: 17 additions & 0 deletions core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;

Expand Down Expand Up @@ -1339,6 +1340,22 @@ public Set<TblColRef> getAllColumnsHaveDictionary() {
}
}

//mr - hive global dict
if (overrideKylinProps.containsKey("kylin.dictionary.mr-hive.columns")) {
String mrHiveDictColumns = overrideKylinProps.get("kylin.dictionary.mr-hive.columns");
if (Objects.nonNull(mrHiveDictColumns) && StringUtils.isNotEmpty(mrHiveDictColumns)) {
String[] mrHiveDictColumnArr = mrHiveDictColumns.split(",");
for (String dictColumn : mrHiveDictColumnArr) {
for (TblColRef colRef : result) {
String aliasCol = colRef.getTableAlias() + "_" + colRef.getName();
if (aliasCol.equalsIgnoreCase(dictColumn)) {
result.remove(colRef);
}
}
}
}
}

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,11 @@ private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBu
sql.append(whereBuilder.toString());
}

private static String colName(TblColRef col) {
public static String colName(TblColRef col) {
return colName(col, true);
}

private static String colName(TblColRef col, boolean useAlias) {
public static String colName(TblColRef col, boolean useAlias) {
return useAlias ? col.getTableAlias() + "_" + col.getName() : col.getName();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,10 @@ private ExecutableConstants() {
public static final String STEP_NAME_STREAMING_CREATE_DICTIONARY = "Build Dimension Dictionaries For Steaming Job";
public static final String STEP_NAME_STREAMING_BUILD_BASE_CUBOID = "Build Base Cuboid Data For Streaming Job";
public static final String STEP_NAME_STREAMING_SAVE_DICTS = "Save Cube Dictionaries";

// MR - Hive Dict
public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL = "Global Dict Mr/Hive extract dict_val from Data";
public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL = "Global Dict Mr/Hive build dict_val";
public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = "Global Dict Mr/Hive replace dict_val to Data";

}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ public List<TblColRef> getColumnsNeedDictionary(FunctionDesc functionDesc) {
// In order to keep compatibility with old version, tinyint/smallint/int column use value directly, without dictionary
private boolean needDictionaryColumn(FunctionDesc functionDesc) {
DataType dataType = functionDesc.getParameter().getColRefs().get(0).getType();
if (functionDesc.isMrDict()) {
return false;
}
if (dataType.isIntegerFamily() && !dataType.isBigInt()) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,15 @@ public static FunctionDesc newInstance(String expression, ParameterDesc param, S
private DataType returnDataType;
private MeasureType<?> measureType;
private boolean isDimensionAsMetric = false;
private boolean isMrDict = false;

public boolean isMrDict() {
return isMrDict;
}

public void setMrDict(boolean mrDict) {
isMrDict = mrDict;
}

public void init(DataModelDesc model) {
expression = expression.toUpperCase(Locale.ROOT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import com.google.common.collect.Sets;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeSegment;
Expand All @@ -30,8 +33,10 @@
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
import org.apache.kylin.cube.util.KeyValueBuilder;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
Expand Down Expand Up @@ -100,10 +105,37 @@ public ByteBuffer buildValue(String[] flatRow) {

public Object[] buildValueObjects(String[] flatRow) {
Object[] measures = new Object[cubeDesc.getMeasures().size()];

Set<String> mrDicts = null;
if (Objects.nonNull(kylinConfig.getMrHiveDictColumns())) {
mrDicts = Sets.newHashSet(kylinConfig.getMrHiveDictColumns());
logger.info("mr-dict ===1===" + mrDicts.toString());
}

for (int i = 0; i < measures.length; i++) {
String[] colValues = kvBuilder.buildValueOf(i, flatRow);

MeasureDesc measure = measureDescList.get(i);
logger.info("mr-dict ===2===" + measure.toString());

//mr dict
if (measure.getFunction().getExpression().equalsIgnoreCase(FunctionDesc.FUNC_COUNT_DISTINCT)) {
FunctionDesc functionDesc = measure.getFunction();
TblColRef colRef = functionDesc.getParameter().getColRefs().get(0);

logger.info("mr-dict ===3===" + colRef.getName());

if (Objects.nonNull(mrDicts) && mrDicts.contains(JoinedFlatTable.colName(colRef, true))) {
functionDesc.setMrDict(true);
measure.setFunction(functionDesc);
}
}
logger.info("mr-dict ===4===" + measure.getFunction().isMrDict());
logger.info("mr-dict ===5===" + aggrIngesters[i]);

measures[i] = aggrIngesters[i].valueOf(colValues, measure, dictionaryMap);

logger.info("mr-dict ===6===" + measures[i].toString());
}

return measures;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,20 @@ public void doMap(KEYIN key, Object record, Context context) throws IOException,
Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record);

for (String[] row : rowCollection) {
if (rowCount < 10) {
StringBuilder builder = new StringBuilder();
for (String item : row) {
builder.append(" | " + item + " | ");
}
logger.info("mr-dict ~ 1 : " + builder.toString());
}

context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
for (int i = 0; i < allCols.size(); i++) {
if (rowCount < 10) {
logger.info("mr-dict ~ 2 | " + allCols.get(i));
}

String fieldValue = row[columnIndex[i]];
if (fieldValue == null)
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

Expand Down Expand Up @@ -123,6 +124,28 @@ public long getHiveTableRows(String database, String tableName) throws Exception
return count;
}

@Override
public List<Object[]> getHiveResult(String hql) throws Exception {
ResultSet resultSet = null;
List<Object[]> datas = new ArrayList<>();
try {
resultSet = stmt.executeQuery(hql);
int columnCtn = resultSet.getMetaData().getColumnCount();
while (resultSet.next()) {
Object[] data = new Object[columnCtn];
for (int i = 0; i < columnCtn; i++) {
data[i] = resultSet.getObject(i + 1);
}
datas.add(data);
}
} catch (Exception e) {
throw new Exception(e);
} finally {
DBUtils.closeQuietly(resultSet);
}
return datas;
}

@Override
public void executeHQL(String hql) throws IOException {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.kylin.source.hive;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -129,6 +130,36 @@ public long getHiveTableRows(String database, String tableName) throws Exception
return getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.ROW_COUNT);
}

@Override
public List<Object[]> getHiveResult(String hql) throws Exception {
List<Object[]> data = new ArrayList<>();

final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
hiveCmdBuilder.addStatement(hql);
Pair<Integer, String> response = KylinConfig.getInstanceFromEnv().getCliCommandExecutor().execute(hiveCmdBuilder.toString());

String[] respData = response.getSecond().split("\n");

boolean isData = false;

for (String item : respData) {
if (item.trim().equalsIgnoreCase("OK")) {
isData = true;
continue;
}
if (item.trim().startsWith("Time taken")) {
isData = false;
}
if (isData) {
Object[] arr = item.split("\t");
data.add(arr);
}

}

return data;
}

private HiveMetaStoreClient getMetaStoreClient() throws Exception {
if (metaStoreClient == null) {
metaStoreClient = new HiveMetaStoreClient(hiveConf);
Expand Down
Loading

0 comments on commit 480d80b

Please sign in to comment.