Skip to content

Commit

Permalink
KYLIN-3375 Some improvements for lookup table - build change
Browse files Browse the repository at this point in the history
  • Loading branch information
allenma authored and shaofengshi committed Jun 1, 2018
1 parent 179eddb commit 777f8fa
Show file tree
Hide file tree
Showing 39 changed files with 2,147 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,19 @@ public int getTableSnapshotMaxMB() {
return Integer.parseInt(getOptional("kylin.snapshot.max-mb", "300"));
}

public int getExtTableSnapshotShardingMB() {
return Integer.parseInt(getOptional("kylin.snapshot.ext.shard-mb", "500"));
}

public String getExtTableSnapshotLocalCachePath() {
return getOptional("kylin.snapshot.ext.local.cache.path", "lookup_cache");
}

public double getExtTableSnapshotLocalCacheMaxSizeGB() {
return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200"));
}


// ============================================================================
// CUBE
// ============================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,27 @@ public void clearCacheForCubeMigration(String cube, String project, String model
}
}

public void buildLookupSnapshotCache(String project, String lookupTableName, String snapshotID) throws IOException {
String url = baseUrl + "/tables/" + project + "/" + lookupTableName + "/" + snapshotID + "/snapshotLocalCache";
HttpPut put = new HttpPut(url);
HttpResponse response = client.execute(put);
getContent(response);
if (response.getStatusLine().getStatusCode() != 200) {
throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n");
}
}

public String getLookupSnapshotCacheState(String lookupTableName, String snapshotID) throws IOException {
String url = baseUrl + "/tables/" + lookupTableName + "/" + snapshotID + "/snapshotLocalCache/state";
HttpGet get = new HttpGet(url);
HttpResponse response = client.execute(get);
String content = getContent(response);
if (response.getStatusLine().getStatusCode() != 200) {
throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n");
}
return content;
}

private HashMap dealResponse(HttpResponse response) throws IOException {
if (response.getStatusLine().getStatusCode() != 200) {
throw new IOException("Invalid response " + response.getStatusLine().getStatusCode());
Expand Down
118 changes: 92 additions & 26 deletions core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
Expand All @@ -44,9 +45,13 @@
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.SnapshotTableDesc;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.dict.lookup.LookupStringTable;
import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
import org.apache.kylin.dict.lookup.ILookupTable;
import org.apache.kylin.dict.lookup.LookupProviderFactory;
import org.apache.kylin.dict.lookup.SnapshotManager;
import org.apache.kylin.dict.lookup.SnapshotTable;
import org.apache.kylin.metadata.TableMetadataManager;
Expand Down Expand Up @@ -290,6 +295,18 @@ public CubeInstance updateCubeSegStatus(CubeSegment seg, SegmentStatusEnum statu
}
}

public CubeInstance updateCubeLookupSnapshot(CubeInstance cube, String lookupTableName, String newSnapshotResPath) throws IOException {
try (AutoLock lock = cubeMapLock.lockForWrite()) {
cube = cube.latestCopyForWrite();

CubeUpdate update = new CubeUpdate(cube);
Map<String, String> map = Maps.newHashMap();
map.put(lookupTableName, newSnapshotResPath);
update.setUpdateTableSnapshotPath(map);
return updateCube(update);
}
}

private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException {
if (update == null || update.getCubeInstance() == null)
throw new IllegalStateException();
Expand Down Expand Up @@ -353,6 +370,12 @@ private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IO
cube.setCuboidsRecommend(update.getCuboidsRecommend());
}

if (update.getUpdateTableSnapshotPath() != null) {
for(Map.Entry<String, String> lookupSnapshotPathEntry : update.getUpdateTableSnapshotPath().entrySet()) {
cube.putSnapshotResPath(lookupSnapshotPathEntry.getKey(), lookupSnapshotPathEntry.getValue());
}
}

try {
cube = crud.save(cube);
} catch (WriteConflictException ise) {
Expand Down Expand Up @@ -435,6 +458,55 @@ public CubeInstance dropCube(String cubeName, boolean deleteDesc) throws IOExcep
}
}

public ILookupTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
String tableName = join.getPKSide().getTableIdentity();
CubeDesc cubeDesc = cubeSegment.getCubeDesc();
SnapshotTableDesc snapshotTableDesc = cubeDesc.getSnapshotTableDesc(tableName);
if (snapshotTableDesc == null || !snapshotTableDesc.isExtSnapshotTable()) {
return getInMemLookupTable(cubeSegment, join, snapshotTableDesc);
} else {
return getExtLookupTable(cubeSegment, tableName, snapshotTableDesc);
}
}

private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join, SnapshotTableDesc snapshotTableDesc) {
String tableName = join.getPKSide().getTableIdentity();
String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc);
String[] pkCols = join.getPrimaryKey();

try {
SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject());
return LookupProviderFactory.getInMemLookupTable(tableDesc, pkCols, snapshot);
} catch (IOException e) {
throw new IllegalStateException(
"Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
}
}

private ILookupTable getExtLookupTable(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) {
String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc);

ExtTableSnapshotInfo extTableSnapshot = ExtTableSnapshotInfoManager.getInstance(config).getSnapshot(
snapshotResPath);
TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject());
return LookupProviderFactory.getExtLookupTable(tableDesc, extTableSnapshot);
}

private String getSnapshotResPath(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) {
String snapshotResPath;
if (snapshotTableDesc == null || !snapshotTableDesc.isGlobal()) {
snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
} else {
snapshotResPath = cubeSegment.getCubeInstance().getSnapshotResPath(tableName);
}
if (snapshotResPath == null) {
throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment"
+ cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
}
return snapshotResPath;
}

@VisibleForTesting
/*private*/ String generateStorageLocation() {
String namePrefix = config.getHBaseTableNamePrefix();
Expand Down Expand Up @@ -974,8 +1046,8 @@ public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable)
return dictAssist.buildSnapshotTable(cubeSeg, lookupTable);
}

public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
return dictAssist.getLookupTable(cubeSegment, join);
private TableMetadataManager getMetadataManager() {
return TableMetadataManager.getInstance(config);
}

private class DictionaryAssist {
Expand Down Expand Up @@ -1057,31 +1129,25 @@ public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable)
IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);

segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
CubeUpdate update = new CubeUpdate(cubeCopy);
update.setToUpdateSegs(segCopy);
updateCube(update);

return snapshot;
}

public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {

String tableName = join.getPKSide().getTableIdentity();
String[] pkCols = join.getPrimaryKey();
String snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
if (snapshotResPath == null)
throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment"
+ cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
CubeDesc cubeDesc = cubeSeg.getCubeDesc();
if (!cubeDesc.isGlobalSnapshotTable(lookupTable)) {
segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
CubeUpdate update = new CubeUpdate(cubeCopy);
update.setToUpdateSegs(segCopy);
updateCube(update);

// Update the input cubeSeg after the resource store updated
cubeSeg.putSnapshotResPath(lookupTable, segCopy.getSnapshotResPath(lookupTable));
} else {
CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy);
Map<String, String> map = Maps.newHashMap();
map.put(lookupTable, snapshot.getResourcePath());
cubeUpdate.setUpdateTableSnapshotPath(map);
updateCube(cubeUpdate);

try {
SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
TableDesc tableDesc = getTableManager().getTableDesc(tableName, cubeSegment.getProject());
return new LookupStringTable(tableDesc, pkCols, snapshot);
} catch (IOException e) {
throw new IllegalStateException(
"Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
cubeSeg.getCubeInstance().putSnapshotResPath(lookupTable, snapshot.getResourcePath());
}
return snapshot;
}
}

Expand Down
9 changes: 9 additions & 0 deletions core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class CubeUpdate {
private int cost = -1;
private Map<Long, Long> cuboids = null;
private Set<Long> cuboidsRecommend = null;
private Map<String, String> updateTableSnapshotPath = null;

public CubeUpdate(CubeInstance cubeInstance) {
setCubeInstance(cubeInstance);
Expand Down Expand Up @@ -124,4 +125,12 @@ public CubeUpdate setCuboidsRecommend(Set<Long> cuboidsRecommend) {
this.cuboidsRecommend = cuboidsRecommend;
return this;
}

public Map<String, String> getUpdateTableSnapshotPath() {
return updateTableSnapshotPath;
}

public void setUpdateTableSnapshotPath(Map<String, String> updateTableSnapshotPath) {
this.updateTableSnapshotPath = updateTableSnapshotPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ private static void processSegment(KylinConfig config, CubeSegment cubeSeg, Dist
for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
TableRef table = dim.getTableRef();
if (cubeSeg.getModel().isLookupTable(table)) {
toSnapshot.add(table.getTableIdentity());
toCheckLookup.add(table);
// only the snapshot desc is not ext type, need to take snapshot
if (!cubeSeg.getCubeDesc().isExtSnapshotTable(table.getTableIdentity())) {
toSnapshot.add(table.getTableIdentity());
toCheckLookup.add(table);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.dict.lookup;

import java.nio.ByteBuffer;
import java.util.Arrays;

import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.dimension.DimensionEncoding;
import org.apache.kylin.metadata.model.TableDesc;

/**
* Abstract encoder/decoder
*
*/
abstract public class AbstractLookupRowEncoder<R> {
protected ByteBuffer keyByteBuffer = ByteBuffer.allocate(1024 * 1024);

protected int columnsNum;
protected int[] keyIndexes;
protected int[] valueIndexes;

public AbstractLookupRowEncoder(TableDesc tableDesc, String[] keyColumns) {
this.columnsNum = tableDesc.getColumns().length;
this.keyIndexes = new int[keyColumns.length];
this.valueIndexes = new int[columnsNum - keyColumns.length];
int keyIdx = 0;
int valIdx = 0;
for (int i = 0; i < columnsNum; i++) {
boolean isKeyColumn = false;
for (String keyColumn : keyColumns) {
if (keyColumn.equals(tableDesc.getColumns()[i].getName())) {
isKeyColumn = true;
break;
}
}
if (isKeyColumn) {
keyIndexes[keyIdx] = i;
keyIdx++;
} else {
valueIndexes[valIdx] = i;
valIdx++;
}
}
}

abstract public R encode(String[] row);

abstract public String[] decode(R result);

public String[] getKeyData(String[] row) {
return extractColValues(row, keyIndexes);
}

public String[] getValueData(String[] row) {
return extractColValues(row, valueIndexes);
}

public byte[] encodeStringsWithLenPfx(String[] keys, boolean allowNull) {
keyByteBuffer.clear();
for (String key : keys) {
if (key == null && !allowNull) {
throw new IllegalArgumentException("key cannot be null:" + Arrays.toString(keys));
}
byte[] byteKey = toBytes(key);
keyByteBuffer.putShort((short) byteKey.length);
keyByteBuffer.put(byteKey);
}
byte[] result = new byte[keyByteBuffer.position()];
System.arraycopy(keyByteBuffer.array(), 0, result, 0, keyByteBuffer.position());
return result;
}

protected void decodeFromLenPfxBytes(byte[] rowKey, int[] valueIdx, String[] result) {
ByteBuffer byteBuffer = ByteBuffer.wrap(rowKey);
for (int i = 0; i < valueIdx.length; i++) {
short keyLen = byteBuffer.getShort();
byte[] keyBytes = new byte[keyLen];
byteBuffer.get(keyBytes);
result[valueIdx[i]] = fromBytes(keyBytes);
}
}

protected String[] extractColValues(String[] row, int[] indexes) {
String[] result = new String[indexes.length];
int i = 0;
for (int idx : indexes) {
result[i++] = row[idx];
}
return result;
}

protected byte[] toBytes(String str) {
if (str == null) {
return new byte[] { DimensionEncoding.NULL };
}
return Bytes.toBytes(str);
}

protected String fromBytes(byte[] bytes) {
if (DimensionEncoding.isNull(bytes, 0, bytes.length)) {
return null;
}
return Bytes.toString(bytes);
}
}
Loading

0 comments on commit 777f8fa

Please sign in to comment.