From 777f8fa5d3b1f24f59354909de1ca3d28d5bc137 Mon Sep 17 00:00:00 2001 From: "Ma,Gang" Date: Thu, 10 May 2018 13:00:10 +0800 Subject: [PATCH] KYLIN-3375 Some improvements for lookup table - build change --- .../apache/kylin/common/KylinConfigBase.java | 13 + .../kylin/common/restclient/RestClient.java | 21 ++ .../org/apache/kylin/cube/CubeManager.java | 118 +++++-- .../org/apache/kylin/cube/CubeUpdate.java | 9 + .../cube/cli/DictionaryGeneratorCLI.java | 7 +- .../dict/lookup/AbstractLookupRowEncoder.java | 122 +++++++ .../kylin/dict/lookup/IExtLookupProvider.java | 36 +++ .../dict/lookup/IExtLookupTableCache.java | 39 +++ .../kylin/dict/lookup/ILookupTable.java | 32 ++ .../dict/lookup/LookupProviderFactory.java | 112 +++++++ .../job/constant/ExecutableConstants.java | 5 + .../job/execution/AbstractExecutable.java | 6 +- .../execution/DefaultChainedExecutable.java | 26 ++ .../realization/IRealizationConstants.java | 2 + .../engine/mr/BatchCubingJobBuilder2.java | 17 + .../org/apache/kylin/engine/mr/CubingJob.java | 26 -- .../kylin/engine/mr/ILookupMaterializer.java | 28 ++ .../org/apache/kylin/engine/mr/MRUtil.java | 5 + .../engine/mr/common/AbstractHadoopJob.java | 4 + .../engine/mr/common/BatchConstants.java | 11 + .../steps/UpdateCubeInfoAfterBuildStep.java | 23 ++ .../cube_desc/ci_left_join_cube.json | 8 + .../kylin/rest/job/MetadataCleanupJob.java | 3 +- .../rest/job/StorageCleanJobHbaseUtil.java | 37 +++ server/src/main/resources/kylinSecurity.xml | 2 + storage-hbase/pom.xml | 6 + .../hbase/lookup/HBaseLookupMRSteps.java | 175 ++++++++++ .../hbase/lookup/HBaseLookupMaterializer.java | 40 +++ .../hbase/lookup/HBaseLookupProvider.java | 58 ++++ .../hbase/lookup/HBaseLookupRowEncoder.java | 134 ++++++++ .../hbase/lookup/HBaseLookupTable.java | 130 ++++++++ .../lookup/KVSortReducerWithDupKeyCheck.java | 62 ++++ .../lookup/LookupTableHFilesBulkLoadJob.java | 106 ++++++ .../hbase/lookup/LookupTableToHFileJob.java | 302 ++++++++++++++++++ .../lookup/LookupTableToHFileMapper.java | 109 +++++++ ...pdateSnapshotCacheForQueryServersStep.java | 106 ++++++ .../lookup/HBaseLookupRowEncoderTest.java | 98 ++++++ .../lookup/LookupTableToHFileJobTest.java | 110 +++++++ ...eSnapshotCacheForQueryServersStepTest.java | 55 ++++ 39 files changed, 2147 insertions(+), 56 deletions(-) create mode 100644 core-dictionary/src/main/java/org/apache/kylin/dict/lookup/AbstractLookupRowEncoder.java create mode 100644 core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupProvider.java create mode 100644 core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupTableCache.java create mode 100644 core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java create mode 100644 core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java create mode 100644 engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java create mode 100644 storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java create mode 100644 storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java create mode 100644 storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupProvider.java create mode 100644 storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java create mode 100644 storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupTable.java create mode 100644 storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/KVSortReducerWithDupKeyCheck.java create mode 100644 storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableHFilesBulkLoadJob.java create mode 100644 storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java create mode 100644 storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java create mode 100644 storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java create mode 100644 storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoderTest.java create mode 100644 storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJobTest.java create mode 100644 storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 689d08fc792..b2898911363 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -414,6 +414,19 @@ public int getTableSnapshotMaxMB() { return Integer.parseInt(getOptional("kylin.snapshot.max-mb", "300")); } + public int getExtTableSnapshotShardingMB() { + return Integer.parseInt(getOptional("kylin.snapshot.ext.shard-mb", "500")); + } + + public String getExtTableSnapshotLocalCachePath() { + return getOptional("kylin.snapshot.ext.local.cache.path", "lookup_cache"); + } + + public double getExtTableSnapshotLocalCacheMaxSizeGB() { + return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200")); + } + + // ============================================================================ // CUBE // ============================================================================ diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java index 93f5e19adbf..11284f64017 100644 --- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java +++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java @@ -303,6 +303,27 @@ public void clearCacheForCubeMigration(String cube, String project, String model } } + public void buildLookupSnapshotCache(String project, String lookupTableName, String snapshotID) throws IOException { + String url = baseUrl + "/tables/" + project + "/" + lookupTableName + "/" + snapshotID + "/snapshotLocalCache"; + HttpPut put = new HttpPut(url); + HttpResponse response = client.execute(put); + getContent(response); + if (response.getStatusLine().getStatusCode() != 200) { + throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n"); + } + } + + public String getLookupSnapshotCacheState(String lookupTableName, String snapshotID) throws IOException { + String url = baseUrl + "/tables/" + lookupTableName + "/" + snapshotID + "/snapshotLocalCache/state"; + HttpGet get = new HttpGet(url); + HttpResponse response = client.execute(get); + String content = getContent(response); + if (response.getStatusLine().getStatusCode() != 200) { + throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n"); + } + return content; + } + private HashMap dealResponse(HttpResponse response) throws IOException { if (response.getStatusLine().getStatusCode() != 200) { throw new IOException("Invalid response " + response.getStatusLine().getStatusCode()); diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 84fc9206729..3f4c5760530 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; @@ -44,9 +45,13 @@ import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.SnapshotTableDesc; import org.apache.kylin.dict.DictionaryInfo; import org.apache.kylin.dict.DictionaryManager; -import org.apache.kylin.dict.lookup.LookupStringTable; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager; +import org.apache.kylin.dict.lookup.ILookupTable; +import org.apache.kylin.dict.lookup.LookupProviderFactory; import org.apache.kylin.dict.lookup.SnapshotManager; import org.apache.kylin.dict.lookup.SnapshotTable; import org.apache.kylin.metadata.TableMetadataManager; @@ -290,6 +295,18 @@ public CubeInstance updateCubeSegStatus(CubeSegment seg, SegmentStatusEnum statu } } + public CubeInstance updateCubeLookupSnapshot(CubeInstance cube, String lookupTableName, String newSnapshotResPath) throws IOException { + try (AutoLock lock = cubeMapLock.lockForWrite()) { + cube = cube.latestCopyForWrite(); + + CubeUpdate update = new CubeUpdate(cube); + Map map = Maps.newHashMap(); + map.put(lookupTableName, newSnapshotResPath); + update.setUpdateTableSnapshotPath(map); + return updateCube(update); + } + } + private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException { if (update == null || update.getCubeInstance() == null) throw new IllegalStateException(); @@ -353,6 +370,12 @@ private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IO cube.setCuboidsRecommend(update.getCuboidsRecommend()); } + if (update.getUpdateTableSnapshotPath() != null) { + for(Map.Entry lookupSnapshotPathEntry : update.getUpdateTableSnapshotPath().entrySet()) { + cube.putSnapshotResPath(lookupSnapshotPathEntry.getKey(), lookupSnapshotPathEntry.getValue()); + } + } + try { cube = crud.save(cube); } catch (WriteConflictException ise) { @@ -435,6 +458,55 @@ public CubeInstance dropCube(String cubeName, boolean deleteDesc) throws IOExcep } } + public ILookupTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) { + String tableName = join.getPKSide().getTableIdentity(); + CubeDesc cubeDesc = cubeSegment.getCubeDesc(); + SnapshotTableDesc snapshotTableDesc = cubeDesc.getSnapshotTableDesc(tableName); + if (snapshotTableDesc == null || !snapshotTableDesc.isExtSnapshotTable()) { + return getInMemLookupTable(cubeSegment, join, snapshotTableDesc); + } else { + return getExtLookupTable(cubeSegment, tableName, snapshotTableDesc); + } + } + + private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join, SnapshotTableDesc snapshotTableDesc) { + String tableName = join.getPKSide().getTableIdentity(); + String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc); + String[] pkCols = join.getPrimaryKey(); + + try { + SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath); + TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject()); + return LookupProviderFactory.getInMemLookupTable(tableDesc, pkCols, snapshot); + } catch (IOException e) { + throw new IllegalStateException( + "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e); + } + } + + private ILookupTable getExtLookupTable(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) { + String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc); + + ExtTableSnapshotInfo extTableSnapshot = ExtTableSnapshotInfoManager.getInstance(config).getSnapshot( + snapshotResPath); + TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject()); + return LookupProviderFactory.getExtLookupTable(tableDesc, extTableSnapshot); + } + + private String getSnapshotResPath(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) { + String snapshotResPath; + if (snapshotTableDesc == null || !snapshotTableDesc.isGlobal()) { + snapshotResPath = cubeSegment.getSnapshotResPath(tableName); + } else { + snapshotResPath = cubeSegment.getCubeInstance().getSnapshotResPath(tableName); + } + if (snapshotResPath == null) { + throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment" + + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment); + } + return snapshotResPath; + } + @VisibleForTesting /*private*/ String generateStorageLocation() { String namePrefix = config.getHBaseTableNamePrefix(); @@ -974,8 +1046,8 @@ public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) return dictAssist.buildSnapshotTable(cubeSeg, lookupTable); } - public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) { - return dictAssist.getLookupTable(cubeSegment, join); + private TableMetadataManager getMetadataManager() { + return TableMetadataManager.getInstance(config); } private class DictionaryAssist { @@ -1057,31 +1129,25 @@ public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc); SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc); - segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath()); - CubeUpdate update = new CubeUpdate(cubeCopy); - update.setToUpdateSegs(segCopy); - updateCube(update); - - return snapshot; - } - - public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) { - - String tableName = join.getPKSide().getTableIdentity(); - String[] pkCols = join.getPrimaryKey(); - String snapshotResPath = cubeSegment.getSnapshotResPath(tableName); - if (snapshotResPath == null) - throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment" - + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment); + CubeDesc cubeDesc = cubeSeg.getCubeDesc(); + if (!cubeDesc.isGlobalSnapshotTable(lookupTable)) { + segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath()); + CubeUpdate update = new CubeUpdate(cubeCopy); + update.setToUpdateSegs(segCopy); + updateCube(update); + + // Update the input cubeSeg after the resource store updated + cubeSeg.putSnapshotResPath(lookupTable, segCopy.getSnapshotResPath(lookupTable)); + } else { + CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy); + Map map = Maps.newHashMap(); + map.put(lookupTable, snapshot.getResourcePath()); + cubeUpdate.setUpdateTableSnapshotPath(map); + updateCube(cubeUpdate); - try { - SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath); - TableDesc tableDesc = getTableManager().getTableDesc(tableName, cubeSegment.getProject()); - return new LookupStringTable(tableDesc, pkCols, snapshot); - } catch (IOException e) { - throw new IllegalStateException( - "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e); + cubeSeg.getCubeInstance().putSnapshotResPath(lookupTable, snapshot.getResourcePath()); } + return snapshot; } } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java index 378d0822743..62b46a9d22a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java @@ -36,6 +36,7 @@ public class CubeUpdate { private int cost = -1; private Map cuboids = null; private Set cuboidsRecommend = null; + private Map updateTableSnapshotPath = null; public CubeUpdate(CubeInstance cubeInstance) { setCubeInstance(cubeInstance); @@ -124,4 +125,12 @@ public CubeUpdate setCuboidsRecommend(Set cuboidsRecommend) { this.cuboidsRecommend = cuboidsRecommend; return this; } + + public Map getUpdateTableSnapshotPath() { + return updateTableSnapshotPath; + } + + public void setUpdateTableSnapshotPath(Map updateTableSnapshotPath) { + this.updateTableSnapshotPath = updateTableSnapshotPath; + } } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java index 36c06b79221..2a243702cb8 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java @@ -77,8 +77,11 @@ private static void processSegment(KylinConfig config, CubeSegment cubeSeg, Dist for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) { TableRef table = dim.getTableRef(); if (cubeSeg.getModel().isLookupTable(table)) { - toSnapshot.add(table.getTableIdentity()); - toCheckLookup.add(table); + // only the snapshot desc is not ext type, need to take snapshot + if (!cubeSeg.getCubeDesc().isExtSnapshotTable(table.getTableIdentity())) { + toSnapshot.add(table.getTableIdentity()); + toCheckLookup.add(table); + } } } diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/AbstractLookupRowEncoder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/AbstractLookupRowEncoder.java new file mode 100644 index 00000000000..5efe129febc --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/AbstractLookupRowEncoder.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict.lookup; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.dimension.DimensionEncoding; +import org.apache.kylin.metadata.model.TableDesc; + +/** + * Abstract encoder/decoder + * + */ +abstract public class AbstractLookupRowEncoder { + protected ByteBuffer keyByteBuffer = ByteBuffer.allocate(1024 * 1024); + + protected int columnsNum; + protected int[] keyIndexes; + protected int[] valueIndexes; + + public AbstractLookupRowEncoder(TableDesc tableDesc, String[] keyColumns) { + this.columnsNum = tableDesc.getColumns().length; + this.keyIndexes = new int[keyColumns.length]; + this.valueIndexes = new int[columnsNum - keyColumns.length]; + int keyIdx = 0; + int valIdx = 0; + for (int i = 0; i < columnsNum; i++) { + boolean isKeyColumn = false; + for (String keyColumn : keyColumns) { + if (keyColumn.equals(tableDesc.getColumns()[i].getName())) { + isKeyColumn = true; + break; + } + } + if (isKeyColumn) { + keyIndexes[keyIdx] = i; + keyIdx++; + } else { + valueIndexes[valIdx] = i; + valIdx++; + } + } + } + + abstract public R encode(String[] row); + + abstract public String[] decode(R result); + + public String[] getKeyData(String[] row) { + return extractColValues(row, keyIndexes); + } + + public String[] getValueData(String[] row) { + return extractColValues(row, valueIndexes); + } + + public byte[] encodeStringsWithLenPfx(String[] keys, boolean allowNull) { + keyByteBuffer.clear(); + for (String key : keys) { + if (key == null && !allowNull) { + throw new IllegalArgumentException("key cannot be null:" + Arrays.toString(keys)); + } + byte[] byteKey = toBytes(key); + keyByteBuffer.putShort((short) byteKey.length); + keyByteBuffer.put(byteKey); + } + byte[] result = new byte[keyByteBuffer.position()]; + System.arraycopy(keyByteBuffer.array(), 0, result, 0, keyByteBuffer.position()); + return result; + } + + protected void decodeFromLenPfxBytes(byte[] rowKey, int[] valueIdx, String[] result) { + ByteBuffer byteBuffer = ByteBuffer.wrap(rowKey); + for (int i = 0; i < valueIdx.length; i++) { + short keyLen = byteBuffer.getShort(); + byte[] keyBytes = new byte[keyLen]; + byteBuffer.get(keyBytes); + result[valueIdx[i]] = fromBytes(keyBytes); + } + } + + protected String[] extractColValues(String[] row, int[] indexes) { + String[] result = new String[indexes.length]; + int i = 0; + for (int idx : indexes) { + result[i++] = row[idx]; + } + return result; + } + + protected byte[] toBytes(String str) { + if (str == null) { + return new byte[] { DimensionEncoding.NULL }; + } + return Bytes.toBytes(str); + } + + protected String fromBytes(byte[] bytes) { + if (DimensionEncoding.isNull(bytes, 0, bytes.length)) { + return null; + } + return Bytes.toString(bytes); + } +} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupProvider.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupProvider.java new file mode 100644 index 00000000000..a09a439bf87 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupProvider.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict.lookup; + +import org.apache.kylin.metadata.model.TableDesc; + +public interface IExtLookupProvider { + ILookupTable getLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot); + + /** + * @return the local cache if the provider has, return null if no local cache exist + */ + IExtLookupTableCache getLocalCache(); + + /** + * Return an adaptor that implements specified interface as requested by the build engine. + * The ILookupMaterializer in particular, is required by the MR build engine. + */ + I adaptToBuildEngine(Class engineInterface); +} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupTableCache.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupTableCache.java new file mode 100644 index 00000000000..f473059002a --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupTableCache.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict.lookup; + +import org.apache.kylin.metadata.model.TableDesc; + +public interface IExtLookupTableCache { + enum CacheState {NONE, IN_BUILDING, AVAILABLE} + + /** + * @param tableDesc + * @param extTableSnapshotInfo + * @param buildIfNotExist if true, when the cached lookup table not exist, build it. + * @return null if no cached lookup table exist + */ + ILookupTable getCachedLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo, boolean buildIfNotExist); + + void buildSnapshotCache(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo, ILookupTable sourceTable); + + void removeSnapshotCache(ExtTableSnapshotInfo extTableSnapshotInfo); + + CacheState getCacheState(ExtTableSnapshotInfo extTableSnapshotInfo); +} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java new file mode 100644 index 00000000000..dccb7c4ff03 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict.lookup; + +import org.apache.kylin.common.util.Array; + +import java.io.Closeable; + +public interface ILookupTable extends Iterable, Closeable { + /** + * get row according the key + * @param key + * @return + */ + String[] getRow(Array key); +} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java new file mode 100644 index 00000000000..64ccef557c8 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict.lookup; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.Map; + +import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.IReadableTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +public class LookupProviderFactory { + private static final Logger logger = LoggerFactory.getLogger(LookupProviderFactory.class); + private static Map lookupProviderImplClassMap = Maps.newConcurrentMap(); + + static { + registerLookupProvider(ExtTableSnapshotInfo.STORAGE_TYPE_HBASE, + "org.apache.kylin.storage.hbase.lookup.HBaseLookupProvider"); + } + + public static void registerLookupProvider(String storageType, String implClassName) { + lookupProviderImplClassMap.put(storageType, implClassName); + } + + public static IExtLookupProvider getExtLookupProvider(String storageType) { + String className = lookupProviderImplClassMap.get(storageType); + if (className == null) { + throw new IllegalStateException("no implementation class found for storage type:" + storageType); + } + try { + Class clazz = Class.forName(className); + Constructor constructor = clazz.getConstructor(); + return (IExtLookupProvider) constructor.newInstance(); + } catch (ReflectiveOperationException e) { + throw new IllegalStateException("the lookup implementation class is invalid for storage type:" + + storageType, e); + } + } + + public static ILookupTable getInMemLookupTable(TableDesc tableDesc, String[] pkCols, IReadableTable readableTable) + throws IOException { + return new LookupStringTable(tableDesc, pkCols, readableTable); + } + + public static ILookupTable getExtLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) { + IExtLookupTableCache extLookupTableCache = getExtLookupProvider(extTableSnapshot.getStorageType()).getLocalCache(); + if (extLookupTableCache == null) { + return getExtLookupTableWithoutCache(tableDesc, extTableSnapshot); + } + ILookupTable cachedLookupTable = extLookupTableCache.getCachedLookupTable(tableDesc, extTableSnapshot, true); + if (cachedLookupTable != null) { + logger.info("try to use cached lookup table:{}", extTableSnapshot.getResourcePath()); + return cachedLookupTable; + } + logger.info("use ext lookup table:{}", extTableSnapshot.getResourcePath()); + return getExtLookupTableWithoutCache(tableDesc, extTableSnapshot); + } + + public static ILookupTable getExtLookupTableWithoutCache(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) { + IExtLookupProvider provider = getExtLookupProvider(extTableSnapshot.getStorageType()); + return provider.getLookupTable(tableDesc, extTableSnapshot); + } + + public static T createEngineAdapter(String lookupStorageType, Class engineInterface) { + IExtLookupProvider provider = getExtLookupProvider(lookupStorageType); + return provider.adaptToBuildEngine(engineInterface); + } + + public static void rebuildLocalCache(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo) { + IExtLookupTableCache tablesCache = getExtLookupProvider(extTableSnapshotInfo.getStorageType()).getLocalCache(); + if (tablesCache != null) { + tablesCache.buildSnapshotCache(tableDesc, extTableSnapshotInfo, getExtLookupTableWithoutCache(tableDesc, extTableSnapshotInfo)); + } + } + + public static void removeLocalCache(ExtTableSnapshotInfo extTableSnapshotInfo) { + IExtLookupTableCache tablesCache = getExtLookupProvider(extTableSnapshotInfo.getStorageType()).getLocalCache(); + if (tablesCache != null) { + tablesCache.removeSnapshotCache(extTableSnapshotInfo); + } + } + + public static CacheState getCacheState(ExtTableSnapshotInfo extTableSnapshotInfo) { + IExtLookupTableCache tablesCache = getExtLookupProvider(extTableSnapshotInfo.getStorageType()).getLocalCache(); + if (tablesCache != null) { + return tablesCache.getCacheState(extTableSnapshotInfo); + } + return CacheState.NONE; + } + +} diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index b9a36518373..7b3c5a37368 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -62,4 +62,9 @@ private ExecutableConstants() { public static final String STEP_NAME_GARBAGE_COLLECTION_HBASE = "Garbage Collection on HBase"; public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on HDFS"; public static final String STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE = "Redistribute Flat Hive Table"; + public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_CONVERT_HFILE = "Convert Lookup Table to HFile"; + public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_BULK_LOAD = "Load HFile to HBase Table"; + public static final String STEP_NAME_LOOKUP_SNAPSHOT_CACHE_UPDATE = "Update Lookup Snapshot Cache to Query Engine"; + public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_META_STORE = "Take Snapshot to Metadata Store"; + public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_UPDATE_CUBE = "Update Cube Info"; } diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index dbe11c2ea3d..1a534e110b9 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -399,10 +399,14 @@ public static long getExtraInfoAsLong(Output output, String key, long defaultVal } } - protected final void addExtraInfo(String key, String value) { + public final void addExtraInfo(String key, String value) { getManager().addJobInfo(getId(), key, value); } + public final String getExtraInfo(String key) { + return getExtraInfo().get(key); + } + protected final Map getExtraInfo() { return getOutput().getExtra(); } diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index 404db541106..2297be78558 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -18,6 +18,8 @@ package org.apache.kylin.job.execution; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -203,4 +205,28 @@ private boolean retryFetchTaskStatus(Executable task) { public int getDefaultPriority() { return DEFAULT_PRIORITY; } + + public String findExtraInfo(String key, String dft) { + return findExtraInfo(key, dft, false); + } + + public String findExtraInfoBackward(String key, String dft) { + return findExtraInfo(key, dft, true); + } + + private String findExtraInfo(String key, String dft, boolean backward) { + ArrayList tasks = new ArrayList(getTasks()); + + if (backward) { + Collections.reverse(tasks); + } + + for (AbstractExecutable child : tasks) { + Output output = getManager().getOutput(child.getId()); + String value = output.getExtra().get(key); + if (value != null) + return value; + } + return dft; + } } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java index 2e4f4008e48..ad311d99be3 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java @@ -22,6 +22,8 @@ */ public class IRealizationConstants { + public final static String LookupHbaseStorageLocationPrefix = "LOOKUP_"; + /** * For each cube htable, we leverage htable's metadata to keep track of * which kylin server(represented by its kylin_metadata prefix) owns this htable diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index faac724443b..a840bf7113d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -22,6 +22,7 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.CuboidUtil; +import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide; import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; @@ -69,6 +70,10 @@ public CubingJob build() { result.addTask(createBuildDictionaryStep(jobId)); result.addTask(createSaveStatisticsStep(jobId)); + + // add materialize lookup tables if needed + addMaterializeLookupTableSteps(result); + outputSide.addStepPhase2_BuildDictionary(result); // Phase 3: Build Cube @@ -97,6 +102,18 @@ private boolean isEnableUHCDictStep() { return true; } + private void addMaterializeLookupTableSteps(final CubingJob result) { + CubeDesc cubeDesc = seg.getCubeDesc(); + List allSnapshotTypes = cubeDesc.getAllExtLookupSnapshotTypes(); + if (allSnapshotTypes.isEmpty()) { + return; + } + for (String snapshotType : allSnapshotTypes) { + ILookupMaterializer materializer = MRUtil.getExtLookupMaterializer(snapshotType); + materializer.materializeLookupTablesForCube(result, seg.getCubeInstance()); + } + } + protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime final int maxLevel = CuboidUtil.getLongestDepth(seg.getCuboidScheduler().getAllCuboidIds()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java index c9ed3594fa0..7f7191d0476 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java @@ -19,8 +19,6 @@ package org.apache.kylin.engine.mr; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; @@ -350,28 +348,4 @@ public long findCubeSizeBytes() { return Long.parseLong(findExtraInfoBackward(CUBE_SIZE_BYTES, "0")); } - public String findExtraInfo(String key, String dft) { - return findExtraInfo(key, dft, false); - } - - public String findExtraInfoBackward(String key, String dft) { - return findExtraInfo(key, dft, true); - } - - private String findExtraInfo(String key, String dft, boolean backward) { - ArrayList tasks = new ArrayList(getTasks()); - - if (backward) { - Collections.reverse(tasks); - } - - for (AbstractExecutable child : tasks) { - Output output = getManager().getOutput(child.getId()); - String value = output.getExtra().get(key); - if (value != null) - return value; - } - return dft; - } - } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java new file mode 100644 index 00000000000..f103da299e3 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr; + +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.job.execution.DefaultChainedExecutable; + +public interface ILookupMaterializer { + void materializeLookupTable(DefaultChainedExecutable jobFlow, CubeInstance cube, String lookupTableName); + + void materializeLookupTablesForCube(DefaultChainedExecutable jobFlow, CubeInstance cube); +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java index b98608f2463..85a425cb71a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java @@ -23,6 +23,7 @@ import org.apache.hadoop.util.Tool; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.dict.lookup.LookupProviderFactory; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide; import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; @@ -69,6 +70,10 @@ public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) { return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg); } + + public static ILookupMaterializer getExtLookupMaterializer(String lookupStorageType) { + return LookupProviderFactory.createEngineAdapter(lookupStorageType, ILookupMaterializer.class); + } // use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 7b253549ac3..fa3c22e19de 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -120,6 +120,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { protected static final Option OPTION_NEED_UPDATE_BASE_CUBOID_SHARD = OptionBuilder .withArgName(BatchConstants.ARG_UPDATE_SHARD).hasArg().isRequired(false) .withDescription("If need to update base cuboid shard").create(BatchConstants.ARG_UPDATE_SHARD); + protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_TABLE_NAME).hasArg().isRequired(true).withDescription("Table name. For exmaple, default.table1").create(BatchConstants.ARG_TABLE_NAME); + protected static final Option OPTION_LOOKUP_SNAPSHOT_ID = OptionBuilder.withArgName(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID).hasArg() + .isRequired(true).withDescription("Lookup table snapshotID") + .create(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID); private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath"; diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 64163adb8a5..36f2566f8b3 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -72,6 +72,8 @@ public interface BatchConstants { String CFG_HLL_REDUCER_NUM = "cuboidHLLCounterReducerNum"; + String CFG_SHARD_NUM = "shard.num"; + /** * command line ARGuments */ @@ -95,6 +97,8 @@ public interface BatchConstants { String ARG_LEVEL = "level"; String ARG_CONF = "conf"; String ARG_DICT_PATH = "dictPath"; + String ARG_TABLE_NAME = "tableName"; + String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID"; /** * logger and counter @@ -106,4 +110,11 @@ public interface BatchConstants { * dictionaries builder class */ String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder"; + + /** + * the prefix of ext lookup table snapshot resource path that stored in the build context + */ + String LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX = "lookup.ext.snapshot.res.path."; + + String LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX = "lookup.ext.snapshot.src.record.cnt."; } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index 7262383ba13..3167bcafd3b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -21,6 +21,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSDataInputStream; @@ -30,6 +31,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.SnapshotTableDesc; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.job.exception.ExecuteException; @@ -71,6 +73,7 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio segment.setInputRecordsSize(sourceSizeBytes); try { + saveExtSnapshotIfNeeded(cubeManager, cubingJob, cube, segment); if (segment.isOffsetCube()) { updateTimeRange(segment); } @@ -83,6 +86,26 @@ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteExceptio } } + private void saveExtSnapshotIfNeeded(CubeManager cubeManager, CubingJob cubingJob, CubeInstance cube, CubeSegment segment) throws IOException { + List snapshotTableDescList = cube.getDescriptor().getSnapshotTableDescList(); + for (SnapshotTableDesc snapshotTableDesc : snapshotTableDescList) { + String tableName = snapshotTableDesc.getTableName(); + if (snapshotTableDesc.isExtSnapshotTable()) { + String contextKey = BatchConstants.LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX + tableName; + String newSnapshotResPath = cubingJob.getExtraInfo(contextKey); + if (newSnapshotResPath == null) { + continue; + } + + if (snapshotTableDesc.isGlobal()) { + cubeManager.updateCubeLookupSnapshot(cube, tableName, newSnapshotResPath); + } else { + segment.putSnapshotResPath(tableName, newSnapshotResPath); + } + } + } + } + private void updateTimeRange(CubeSegment segment) throws IOException { final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); diff --git a/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json b/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json index f1a42b11ab7..e42c522caa2 100644 --- a/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json +++ b/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json @@ -582,5 +582,13 @@ "override_kylin_properties": { "kylin.cube.algorithm": "INMEM" }, + "snapshot_table_desc_list": [ + { + "table_name": "DEFAULT.TEST_CATEGORY_GROUPINGS", + "storage_type": "hbase", + "local_cache_enable": true, + "global": true + } + ], "partition_date_start": 0 } diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java index 1aec4290641..5ee5c7a370b 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java +++ b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java @@ -73,7 +73,7 @@ public List cleanup(boolean delete, int jobOutdatedDays) throws Exceptio // two level resources, snapshot tables and cube statistics for (String resourceRoot : new String[] { ResourceStore.SNAPSHOT_RESOURCE_ROOT, - ResourceStore.CUBE_STATISTICS_ROOT }) { + ResourceStore.CUBE_STATISTICS_ROOT, ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT}) { for (String dir : noNull(store.listResources(resourceRoot))) { for (String res : noNull(store.listResources(dir))) { if (store.getResourceTimestamp(res) < newResourceTimeCut) @@ -97,6 +97,7 @@ public List cleanup(boolean delete, int jobOutdatedDays) throws Exceptio // exclude resources in use Set activeResources = Sets.newHashSet(); for (CubeInstance cube : cubeManager.listAllCubes()) { + activeResources.addAll(cube.getSnapshots().values()); for (CubeSegment segment : cube.getSegments()) { activeResources.addAll(segment.getSnapshotPaths()); activeResources.addAll(segment.getDictionaryPaths()); diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java index 126c5987e7b..4c8c42643be 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java +++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -28,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.google.common.collect.Lists; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -35,6 +37,8 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager; import org.apache.kylin.metadata.realization.IRealizationConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,10 +64,17 @@ static List cleanUnusedHBaseTables(HBaseAdmin hbaseAdmin, boolean delete ? config.getHBaseTableNamePrefix() : (namespace + ":" + config.getHBaseTableNamePrefix()); HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); + List allTablesNeedToBeDropped = new ArrayList(); + + boolean hasExtLookupTable = false; for (HTableDescriptor desc : tableDescriptors) { String host = desc.getValue(IRealizationConstants.HTableTag); if (config.getMetadataUrlPrefix().equalsIgnoreCase(host)) { + // check if there are hbase lookup table + if (desc.getTableName().getNameAsString().contains(IRealizationConstants.LookupHbaseStorageLocationPrefix)) { + hasExtLookupTable = true; + } //only take care htables that belongs to self, and created more than 2 days allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString()); } @@ -88,6 +99,11 @@ static List cleanUnusedHBaseTables(HBaseAdmin hbaseAdmin, boolean delete logger.info(allTablesNeedToBeDropped.size() + " HTable(s) to clean up"); + if (hasExtLookupTable) { + List useExtLookupTables = getAllUsedExtLookupTables(); + logger.info("Exclude tables:{}, as they are referred by snapshots.", useExtLookupTables); + allTablesNeedToBeDropped.removeAll(useExtLookupTables); + } if (delete) { // drop tables ExecutorService executorService = Executors.newSingleThreadExecutor(); @@ -115,6 +131,27 @@ static List cleanUnusedHBaseTables(HBaseAdmin hbaseAdmin, boolean delete return allTablesNeedToBeDropped; } + private static List getAllUsedExtLookupTables() throws IOException { + List result = Lists.newArrayList(); + KylinConfig config = KylinConfig.getInstanceFromEnv(); + final Set activeSnapshotSet = ExtTableSnapshotInfoManager.getInstance(config).getAllExtSnapshotResPaths(); + + for (String extSnapshotResource : activeSnapshotSet) { + try { + ExtTableSnapshotInfo extTableSnapshot = ExtTableSnapshotInfoManager.getInstance(config).getSnapshot( + extSnapshotResource); + if (extTableSnapshot != null) { + if (ExtTableSnapshotInfo.STORAGE_TYPE_HBASE.equals(extTableSnapshot.getStorageType())) { + result.add(extTableSnapshot.getStorageLocationIdentifier()); + } + } + } catch (Exception e) { + logger.error("error fetch ext table snapshot:" + extSnapshotResource, e); + } + } + return result; + } + static class DeleteHTableRunnable implements Callable { HBaseAdmin hbaseAdmin; String htableName; diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml index f9c0d71d2e7..c08ae709c60 100644 --- a/server/src/main/resources/kylinSecurity.xml +++ b/server/src/main/resources/kylinSecurity.xml @@ -250,6 +250,7 @@ + @@ -295,6 +296,7 @@ + diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml index 9bf62f00343..4709c08f979 100644 --- a/storage-hbase/pom.xml +++ b/storage-hbase/pom.xml @@ -96,6 +96,12 @@ junit test + + org.powermock + powermock-module-junit4-rule-agent + ${powermock.version} + test + diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java new file mode 100644 index 00000000000..757f6d0d9d8 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase.lookup; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.DimensionDesc; +import org.apache.kylin.cube.model.SnapshotTableDesc; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager; +import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.HadoopShellExecutable; +import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.engine.mr.steps.lookup.LookupExecutableUtil; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.metadata.TableMetadataManager; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableRef; +import org.apache.kylin.source.IReadableTable; +import org.apache.kylin.source.SourceManager; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Sets; + +public class HBaseLookupMRSteps { + protected static final Logger logger = LoggerFactory.getLogger(HBaseLookupMRSteps.class); + private CubeInstance cube; + private JobEngineConfig config; + + public HBaseLookupMRSteps(CubeInstance cube) { + this.cube = cube; + this.config = new JobEngineConfig(cube.getConfig()); + } + + public void addMaterializeLookupTablesSteps(DefaultChainedExecutable jobFlow) { + CubeDesc cubeDesc = cube.getDescriptor(); + Set allLookupTables = Sets.newHashSet(); + for (DimensionDesc dim : cubeDesc.getDimensions()) { + TableRef table = dim.getTableRef(); + if (cubeDesc.getModel().isLookupTable(table)) { + allLookupTables.add(table.getTableIdentity()); + } + } + List snapshotTableDescs = cubeDesc.getSnapshotTableDescList(); + for (SnapshotTableDesc snapshotTableDesc : snapshotTableDescs) { + if (ExtTableSnapshotInfo.STORAGE_TYPE_HBASE.equals(snapshotTableDesc.getStorageType()) + && allLookupTables.contains(snapshotTableDesc.getTableName())) { + addMaterializeLookupTableSteps(jobFlow, snapshotTableDesc.getTableName(), snapshotTableDesc); + } + } + } + + public void addMaterializeLookupTableSteps(DefaultChainedExecutable jobFlow, String tableName, SnapshotTableDesc snapshotTableDesc) { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + ExtTableSnapshotInfoManager extTableSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig); + TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(tableName, cube.getProject()); + IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc); + try { + if (extTableSnapshotInfoManager.hasLatestSnapshot(sourceTable.getSignature(), tableName)) { + logger.info("there is latest snapshot exist for table:{}, skip build snapshot step.", tableName); + return; + } + } catch (IOException ioException) { + throw new RuntimeException(ioException); + } + logger.info("add build snapshot steps for table:{}", tableName); + String snapshotID = genLookupSnapshotID(); + addLookupTableConvertToHFilesStep(jobFlow, tableName, snapshotID); + addLookupTableHFilesBulkLoadStep(jobFlow, tableName, snapshotID); + if (snapshotTableDesc !=null && snapshotTableDesc.isEnableLocalCache()) { + addUpdateSnapshotQueryCacheStep(jobFlow, tableName, snapshotID); + } + } + + private String genLookupSnapshotID() { + return UUID.randomUUID().toString(); + } + + private void addLookupTableConvertToHFilesStep(DefaultChainedExecutable jobFlow, String tableName, String snapshotID) { + MapReduceExecutable createHFilesStep = new MapReduceExecutable(); + createHFilesStep + .setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_CONVERT_HFILE + ":" + tableName); + StringBuilder cmd = new StringBuilder(); + + appendMapReduceParameters(cmd); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, cube.getName()); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, + getLookupTableHFilePath(tableName, jobFlow.getId())); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_TABLE_NAME, tableName); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobFlow.getId()); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_LOOKUP_SNAPSHOT_ID, snapshotID); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, + "Kylin_LookupTable_HFile_Generator_" + tableName + "_Step"); + + createHFilesStep.setMapReduceParams(cmd.toString()); + createHFilesStep.setMapReduceJobClass(LookupTableToHFileJob.class); + createHFilesStep.setCounterSaveAs(BatchConstants.LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX + tableName); + + jobFlow.addTask(createHFilesStep); + } + + private void addLookupTableHFilesBulkLoadStep(DefaultChainedExecutable jobFlow, String tableName, String snapshotID) { + HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable(); + bulkLoadStep.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_BULK_LOAD + ":" + tableName); + + StringBuilder cmd = new StringBuilder(); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, + getLookupTableHFilePath(tableName, jobFlow.getId())); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_TABLE_NAME, tableName); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobFlow.getId()); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_LOOKUP_SNAPSHOT_ID, snapshotID); + + bulkLoadStep.setJobParams(cmd.toString()); + bulkLoadStep.setJobClass(LookupTableHFilesBulkLoadJob.class); + jobFlow.addTask(bulkLoadStep); + } + + private void addUpdateSnapshotQueryCacheStep(DefaultChainedExecutable jobFlow, String tableName, String snapshotID) { + UpdateSnapshotCacheForQueryServersStep updateSnapshotCacheStep = new UpdateSnapshotCacheForQueryServersStep(); + updateSnapshotCacheStep.setName(ExecutableConstants.STEP_NAME_LOOKUP_SNAPSHOT_CACHE_UPDATE + ":" + tableName); + + LookupExecutableUtil.setProjectName(cube.getProject(), updateSnapshotCacheStep.getParams()); + LookupExecutableUtil.setLookupTableName(tableName, updateSnapshotCacheStep.getParams()); + LookupExecutableUtil.setLookupSnapshotID(snapshotID, updateSnapshotCacheStep.getParams()); + jobFlow.addTask(updateSnapshotCacheStep); + } + + private String getLookupTableHFilePath(String tableName, String jobId) { + return HBaseConnection.makeQualifiedPathInHBaseCluster(JobBuilderSupport.getJobWorkingDir(config, jobId) + "/" + + tableName + "/hfile/"); + } + + public void appendMapReduceParameters(StringBuilder buf) { + appendMapReduceParameters(buf, JobEngineConfig.DEFAUL_JOB_CONF_SUFFIX); + } + + public void appendMapReduceParameters(StringBuilder buf, String jobType) { + try { + String jobConf = config.getHadoopJobConfFilePath(jobType); + if (jobConf != null && jobConf.length() > 0) { + buf.append(" -conf ").append(jobConf); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java new file mode 100644 index 00000000000..cf28ed6d131 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.lookup; + +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.model.SnapshotTableDesc; +import org.apache.kylin.engine.mr.ILookupMaterializer; +import org.apache.kylin.job.execution.DefaultChainedExecutable; + +public class HBaseLookupMaterializer implements ILookupMaterializer{ + + @Override + public void materializeLookupTable(DefaultChainedExecutable jobFlow, CubeInstance cube, String lookupTableName) { + HBaseLookupMRSteps lookupMRSteps = new HBaseLookupMRSteps(cube); + SnapshotTableDesc snapshotTableDesc = cube.getDescriptor().getSnapshotTableDesc(lookupTableName); + lookupMRSteps.addMaterializeLookupTableSteps(jobFlow, lookupTableName, snapshotTableDesc); + } + + @Override + public void materializeLookupTablesForCube(DefaultChainedExecutable jobFlow, CubeInstance cube) { + HBaseLookupMRSteps lookupMRSteps = new HBaseLookupMRSteps(cube); + lookupMRSteps.addMaterializeLookupTablesSteps(jobFlow); + } +} diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupProvider.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupProvider.java new file mode 100644 index 00000000000..3e8c2c5d6f3 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupProvider.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.lookup; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo; +import org.apache.kylin.dict.lookup.IExtLookupProvider; +import org.apache.kylin.dict.lookup.IExtLookupTableCache; +import org.apache.kylin.dict.lookup.ILookupTable; +import org.apache.kylin.dict.lookup.cache.RocksDBLookupTableCache; +import org.apache.kylin.engine.mr.ILookupMaterializer; +import org.apache.kylin.metadata.model.TableDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Use HBase as lookup table storage + */ +public class HBaseLookupProvider implements IExtLookupProvider{ + protected static final Logger logger = LoggerFactory.getLogger(HBaseLookupProvider.class); + + + @Override + public ILookupTable getLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) { + return new HBaseLookupTable(tableDesc, extTableSnapshot); + } + + @Override + public IExtLookupTableCache getLocalCache() { + return RocksDBLookupTableCache.getInstance(KylinConfig.getInstanceFromEnv()); + } + + @SuppressWarnings("unchecked") + @Override + public I adaptToBuildEngine(Class engineInterface) { + if (engineInterface == ILookupMaterializer.class) { + return (I) new HBaseLookupMaterializer(); + } else { + throw new RuntimeException("Cannot adapt to " + engineInterface); + } + } +} diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java new file mode 100644 index 00000000000..9ceabd2721d --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.lookup; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; + +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.ShardingHash; +import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.dict.lookup.AbstractLookupRowEncoder; +import org.apache.kylin.metadata.model.TableDesc; + +import com.google.common.collect.Maps; +import org.apache.kylin.storage.hbase.lookup.HBaseLookupRowEncoder.HBaseRow; + +/** + * encode/decode original table row to hBase row + * + */ +public class HBaseLookupRowEncoder extends AbstractLookupRowEncoder { + public static final String CF_STRING = "F"; + public static final byte[] CF = Bytes.toBytes(CF_STRING); + + private int shardNum; + + public HBaseLookupRowEncoder(TableDesc tableDesc, String[] keyColumns, int shardNum) { + super(tableDesc, keyColumns); + this.shardNum = shardNum; + } + + @Override + public HBaseRow encode(String[] row) { + String[] keys = getKeyData(row); + String[] values = getValueData(row); + byte[] rowKey = encodeRowKey(keys); + NavigableMap qualifierValMap = Maps + .newTreeMap(org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR); + for (int i = 0; i < values.length; i++) { + byte[] qualifier = Bytes.toBytes(String.valueOf(valueIndexes[i])); + byte[] byteValue = toBytes(values[i]); + qualifierValMap.put(qualifier, byteValue); + } + return new HBaseRow(rowKey, qualifierValMap); + } + + @Override + public String[] decode(HBaseRow hBaseRow) { + if (hBaseRow == null) { + return null; + } + String[] result = new String[columnsNum]; + fillKeys(hBaseRow.rowKey, result); + fillValues(hBaseRow.qualifierValMap, result); + + return result; + } + + public byte[] encodeRowKey(String[] keys) { + keyByteBuffer.clear(); + for (String key : keys) { + if (key == null) { + throw new IllegalArgumentException("key cannot be null:" + Arrays.toString(keys)); + } + byte[] byteKey = Bytes.toBytes(key); + keyByteBuffer.putShort((short) byteKey.length); + keyByteBuffer.put(byteKey); + } + byte[] result = new byte[RowConstants.ROWKEY_SHARDID_LEN + keyByteBuffer.position()]; + System.arraycopy(keyByteBuffer.array(), 0, result, RowConstants.ROWKEY_SHARDID_LEN, keyByteBuffer.position()); + short shard = ShardingHash.getShard(result, RowConstants.ROWKEY_SHARDID_LEN, result.length, shardNum); + BytesUtil.writeShort(shard, result, 0, RowConstants.ROWKEY_SHARDID_LEN); + return result; + } + + private void fillKeys(byte[] rowKey, String[] result) { + int keyNum = keyIndexes.length; + ByteBuffer byteBuffer = ByteBuffer.wrap(rowKey); + byteBuffer.getShort(); // read shard + for (int i = 0; i < keyNum; i++) { + short keyLen = byteBuffer.getShort(); + byte[] keyBytes = new byte[keyLen]; + byteBuffer.get(keyBytes); + result[keyIndexes[i]] = Bytes.toString(keyBytes); + } + } + + private void fillValues(Map qualifierValMap, String[] result) { + for (Entry qualifierValEntry : qualifierValMap.entrySet()) { + byte[] qualifier = qualifierValEntry.getKey(); + byte[] value = qualifierValEntry.getValue(); + int valIdx = Integer.valueOf(Bytes.toString(qualifier)); + result[valIdx] = fromBytes(value); + } + } + + public static class HBaseRow { + private byte[] rowKey; + private NavigableMap qualifierValMap; + + public HBaseRow(byte[] rowKey, NavigableMap qualifierValMap) { + this.rowKey = rowKey; + this.qualifierValMap = qualifierValMap; + } + + public byte[] getRowKey() { + return rowKey; + } + + public NavigableMap getQualifierValMap() { + return qualifierValMap; + } + } +} diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupTable.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupTable.java new file mode 100644 index 00000000000..78877f7954b --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupTable.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.lookup; + +import java.io.IOException; +import java.util.Iterator; +import java.util.NavigableMap; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Array; +import org.apache.kylin.dict.lookup.ILookupTable; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.apache.kylin.storage.hbase.lookup.HBaseLookupRowEncoder.HBaseRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Use HBase as lookup table storage + */ +public class HBaseLookupTable implements ILookupTable{ + protected static final Logger logger = LoggerFactory.getLogger(HBaseLookupTable.class); + + private TableName lookupTableName; + private Table table; + + private HBaseLookupRowEncoder encoder; + + public HBaseLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) { + String tableName = extTableSnapshot.getStorageLocationIdentifier(); + this.lookupTableName = TableName.valueOf(tableName); + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + Connection connection = HBaseConnection.get(kylinConfig.getStorageUrl()); + try { + table = connection.getTable(lookupTableName); + } catch (IOException e) { + throw new RuntimeException("error when connect HBase", e); + } + + String[] keyColumns = extTableSnapshot.getKeyColumns(); + encoder = new HBaseLookupRowEncoder(tableDesc, keyColumns, extTableSnapshot.getShardNum()); + } + + @Override + public String[] getRow(Array key) { + byte[] encodedKey = encoder.encodeRowKey(key.data); + Get get = new Get(encodedKey); + try { + Result result = table.get(get); + if (result.isEmpty()) { + return null; + } + return encoder.decode(new HBaseRow(result.getRow(), result.getFamilyMap(HBaseLookupRowEncoder.CF))); + } catch (IOException e) { + throw new RuntimeException("error when get row from hBase", e); + } + } + + @Override + public Iterator iterator() { + return new HBaseScanBasedIterator(table); + } + + @Override + public void close() throws IOException{ + table.close(); + } + + private class HBaseScanBasedIterator implements Iterator { + private Iterator scannerIterator; + private long counter; + + public HBaseScanBasedIterator(Table table) { + try { + Scan scan = new Scan(); + scan.setCaching(1000); + ResultScanner scanner = table.getScanner(HBaseLookupRowEncoder.CF); + scannerIterator = scanner.iterator(); + } catch (IOException e) { + logger.error("error when scan HBase", e); + } + } + + @Override + public boolean hasNext() { + return scannerIterator.hasNext(); + } + + @Override + public String[] next() { + counter ++; + if (counter % 100000 == 0) { + logger.info("scanned {} rows from hBase", counter); + } + Result result = scannerIterator.next(); + byte[] rowKey = result.getRow(); + NavigableMap qualifierValMap = result.getFamilyMap(HBaseLookupRowEncoder.CF); + return encoder.decode(new HBaseRow(rowKey, qualifierValMap)); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove is not supported"); + } + } +} diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/KVSortReducerWithDupKeyCheck.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/KVSortReducerWithDupKeyCheck.java new file mode 100644 index 00000000000..3daffa3cd06 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/KVSortReducerWithDupKeyCheck.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.lookup; + +import java.util.TreeSet; + +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Most code from {@link KeyValueSortReducer}, add logic to check whether the row key has duplicated + * if there is duplicated key, throws IllegalStateException + */ +public class KVSortReducerWithDupKeyCheck extends KeyValueSortReducer { + protected void reduce( + ImmutableBytesWritable row, + java.lang.Iterable kvs, + org.apache.hadoop.mapreduce.Reducer.Context context) + throws java.io.IOException, InterruptedException { + TreeSet map = new TreeSet<>(KeyValue.COMPARATOR); + + TreeSet qualifierSet = new TreeSet<>(Bytes.BYTES_COMPARATOR); + for (KeyValue kv : kvs) { + byte[] qualifier = CellUtil.cloneQualifier(kv); + if (qualifierSet.contains(qualifier)) { + throw new IllegalStateException("there is duplicate key:" + row); + } + qualifierSet.add(qualifier); + try { + map.add(kv.clone()); + } catch (CloneNotSupportedException e) { + throw new java.io.IOException(e); + } + } + context.setStatus("Read " + map.getClass()); + int index = 0; + for (KeyValue kv : map) { + context.write(row, kv); + if (++index % 100 == 0) + context.setStatus("Wrote " + index); + } + } +} diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableHFilesBulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableHFilesBulkLoadJob.java new file mode 100644 index 00000000000..5598ed9ed5e --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableHFilesBulkLoadJob.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.lookup; + +import java.io.IOException; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager; +import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableManager; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LookupTableHFilesBulkLoadJob extends AbstractHadoopJob { + + protected static final Logger logger = LoggerFactory.getLogger(LookupTableHFilesBulkLoadJob.class); + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_TABLE_NAME); + options.addOption(OPTION_CUBING_JOB_ID); + options.addOption(OPTION_LOOKUP_SNAPSHOT_ID); + parseOptions(options, args); + + String tableName = getOptionValue(OPTION_TABLE_NAME); + String cubingJobID = getOptionValue(OPTION_CUBING_JOB_ID); + String snapshotID = getOptionValue(OPTION_LOOKUP_SNAPSHOT_ID); + + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + ExecutableManager execMgr = ExecutableManager.getInstance(kylinConfig); + DefaultChainedExecutable job = (DefaultChainedExecutable) execMgr.getJob(cubingJobID); + + ExtTableSnapshotInfoManager extTableSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig); + ExtTableSnapshotInfo snapshot = extTableSnapshotInfoManager.getSnapshot(tableName, snapshotID); + long srcTableRowCnt = Long.valueOf(job.findExtraInfoBackward(BatchConstants.LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX + tableName, "-1")); + logger.info("update table:{} snapshot row count:{}", tableName, srcTableRowCnt); + snapshot.setRowCnt(srcTableRowCnt); + snapshot.setLastBuildTime(System.currentTimeMillis()); + extTableSnapshotInfoManager.updateSnapshot(snapshot); + + String hTableName = snapshot.getStorageLocationIdentifier(); + // e.g + // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/ + // end with "/" + String input = getOptionValue(OPTION_INPUT_PATH); + + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + FsShell shell = new FsShell(conf); + + int exitCode = -1; + int retryCount = 10; + while (exitCode != 0 && retryCount >= 1) { + exitCode = shell.run(new String[] { "-chmod", "-R", "777", input }); + retryCount--; + Thread.sleep(5000); + } + + if (exitCode != 0) { + logger.error("Failed to change the file permissions: " + input); + throw new IOException("Failed to change the file permissions: " + input); + } + + String[] newArgs = new String[2]; + newArgs[0] = input; + newArgs[1] = hTableName; + + logger.debug("Start to run LoadIncrementalHFiles"); + int ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs); + logger.debug("End to run LoadIncrementalHFiles"); + return ret; + } + + public static void main(String[] args) throws Exception { + int exitCode = ToolRunner.run(new LookupTableHFilesBulkLoadJob(), args); + System.exit(exitCode); + } +} diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java new file mode 100644 index 00000000000..aac0108d7f6 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.lookup; + +import java.io.IOException; +import java.util.Random; + +import org.apache.commons.cli.Options; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; +import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.KylinVersion; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo; +import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager; +import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; +import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableManager; +import org.apache.kylin.metadata.TableMetadataManager; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.JoinDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableRef; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.IRealizationConstants; +import org.apache.kylin.source.IReadableTable; +import org.apache.kylin.source.IReadableTable.TableSignature; +import org.apache.kylin.source.SourceManager; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.apache.kylin.storage.hbase.steps.CubeHTableUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LookupTableToHFileJob extends AbstractHadoopJob { + protected static final Logger logger = LoggerFactory.getLogger(LookupTableToHFileJob.class); + + private static String ALPHA_NUM = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + private static int HBASE_TABLE_LENGTH = 10; + + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_TABLE_NAME); + options.addOption(OPTION_CUBING_JOB_ID); + options.addOption(OPTION_LOOKUP_SNAPSHOT_ID); + parseOptions(options, args); + + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); + String tableName = getOptionValue(OPTION_TABLE_NAME); + String cubingJobID = getOptionValue(OPTION_CUBING_JOB_ID); + String lookupSnapshotID = getOptionValue(OPTION_LOOKUP_SNAPSHOT_ID); + + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + CubeManager cubeMgr = CubeManager.getInstance(kylinConfig); + CubeInstance cube = cubeMgr.getCube(cubeName); + + TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(tableName, + cube.getProject()); + + ExtTableSnapshotInfoManager extSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig); + removeSnapshotIfExist(extSnapshotInfoManager, kylinConfig, tableName, lookupSnapshotID); + + IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc); + + logger.info("create HTable for source table snapshot:{}", tableName); + Pair hTableNameAndShard = createHTable(tableName, sourceTable, kylinConfig); + String[] keyColumns = getLookupKeyColumns(cube, tableName); + ExtTableSnapshotInfo snapshot = createSnapshotResource(extSnapshotInfoManager, tableName, lookupSnapshotID, + keyColumns, hTableNameAndShard.getFirst(), hTableNameAndShard.getSecond(), sourceTable); + logger.info("created snapshot information at:{}", snapshot.getResourcePath()); + saveSnapshotInfoToJobContext(kylinConfig, cubingJobID, snapshot); + + job = Job.getInstance(HBaseConfiguration.create(getConf()), getOptionValue(OPTION_JOB_NAME)); + + setJobClasspath(job, cube.getConfig()); + // For separate HBase cluster, note the output is a qualified HDFS path if "kylin.storage.hbase.cluster-fs" is configured, ref HBaseMRSteps.getHFilePath() + HBaseConnection.addHBaseClusterNNHAConfiguration(job.getConfiguration()); + + FileOutputFormat.setOutputPath(job, output); + + IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(tableDesc); + tableInputFormat.configureJob(job); + job.setMapperClass(LookupTableToHFileMapper.class); + + // set job configuration + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_TABLE_NAME, tableName); + // set block replication to 3 for hfiles + job.getConfiguration().set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); + job.getConfiguration().set(BatchConstants.CFG_SHARD_NUM, String.valueOf(hTableNameAndShard.getSecond())); + // add metadata to distributed cache + attachCubeMetadata(cube, job.getConfiguration()); + + Connection conn = getHBaseConnection(kylinConfig); + HTable htable = (HTable) conn.getTable(TableName.valueOf(hTableNameAndShard.getFirst())); + + // Automatic config ! + HFileOutputFormat2.configureIncrementalLoad(job, htable, htable.getRegionLocator()); + + job.setReducerClass(KVSortReducerWithDupKeyCheck.class); + + this.deletePath(job.getConfiguration(), output); + + return waitForCompletion(job); + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + } + + private void removeSnapshotIfExist(ExtTableSnapshotInfoManager extSnapshotInfoManager, KylinConfig kylinConfig, + String tableName, String lookupSnapshotID) throws IOException { + ExtTableSnapshotInfo snapshotInfo = null; + try { + snapshotInfo = extSnapshotInfoManager.getSnapshot(tableName, lookupSnapshotID); + } catch (Exception e) { + // swallow the exception, means not snapshot exist of this snapshot id + } + if (snapshotInfo == null) { + return; + } + logger.info("the table:{} snapshot:{} exist, remove it", tableName, lookupSnapshotID); + extSnapshotInfoManager.removeSnapshot(tableName, lookupSnapshotID); + String hTableName = snapshotInfo.getStorageLocationIdentifier(); + logger.info("remove related HBase table:{} for snapshot:{}", hTableName, lookupSnapshotID); + Connection conn = getHBaseConnection(kylinConfig); + Admin admin = conn.getAdmin(); + admin.deleteTable(TableName.valueOf(hTableName)); + } + + private String[] getLookupKeyColumns(CubeInstance cube, String tableName) { + CubeDesc cubeDesc = cube.getDescriptor(); + DataModelDesc modelDesc = cubeDesc.getModel(); + TableRef lookupTableRef = null; + for (TableRef tableRef : modelDesc.getLookupTables()) { + if (tableRef.getTableIdentity().equalsIgnoreCase(tableName)) { + lookupTableRef = tableRef; + break; + } + } + if (lookupTableRef == null) { + throw new IllegalStateException("cannot find table in model:" + tableName); + } + JoinDesc joinDesc = modelDesc.getJoinByPKSide(lookupTableRef); + TblColRef[] keyColRefs = joinDesc.getPrimaryKeyColumns(); + String[] result = new String[keyColRefs.length]; + for (int i = 0; i < keyColRefs.length; i++) { + result[i] = keyColRefs[i].getName(); + } + return result; + } + + private void saveSnapshotInfoToJobContext(KylinConfig kylinConfig, String jobID, ExtTableSnapshotInfo snapshot) { + ExecutableManager execMgr = ExecutableManager.getInstance(kylinConfig); + DefaultChainedExecutable job = (DefaultChainedExecutable) execMgr.getJob(jobID); + job.addExtraInfo(BatchConstants.LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX + snapshot.getTableName(), + snapshot.getResourcePath()); + } + + /** + * + * @param sourceTableName + * @param sourceTable + * @param kylinConfig + * @return Pair of HTableName and shard number + * @throws IOException + */ + private Pair createHTable(String sourceTableName, IReadableTable sourceTable, + KylinConfig kylinConfig) throws IOException { + TableSignature signature = sourceTable.getSignature(); + int shardNum = calculateShardNum(kylinConfig, signature.getSize()); + Connection conn = getHBaseConnection(kylinConfig); + Admin admin = conn.getAdmin(); + String hTableName = genHTableName(kylinConfig, admin, sourceTableName); + + TableName tableName = TableName.valueOf(hTableName); + HTableDescriptor hTableDesc = new HTableDescriptor(tableName); + hTableDesc.setCompactionEnabled(false); + hTableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName()); + hTableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix()); + hTableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis())); + String commitInfo = KylinVersion.getGitCommitInfo(); + if (!StringUtils.isEmpty(commitInfo)) { + hTableDesc.setValue(IRealizationConstants.HTableGitTag, commitInfo); + } + + HColumnDescriptor cf = CubeHTableUtil.createColumnFamily(kylinConfig, HBaseLookupRowEncoder.CF_STRING, false); + hTableDesc.addFamily(cf); + + try { + if (shardNum > 1) { + admin.createTable(hTableDesc, getSplitsByShardNum(shardNum)); + } else { + admin.createTable(hTableDesc); + } + } finally { + IOUtils.closeQuietly(admin); + } + return new Pair<>(hTableName, shardNum); + } + + private int calculateShardNum(KylinConfig kylinConfig, long dataSize) { + long shardSize = kylinConfig.getExtTableSnapshotShardingMB() * 1024 * 1024; + return dataSize < shardSize ? 1 : (int) (Math.ceil(dataSize / shardSize)); + } + + private byte[][] getSplitsByShardNum(int shardNum) { + byte[][] result = new byte[shardNum - 1][]; + for (int i = 1; i < shardNum; ++i) { + byte[] split = new byte[RowConstants.ROWKEY_SHARDID_LEN]; + BytesUtil.writeUnsigned(i, split, 0, RowConstants.ROWKEY_SHARDID_LEN); + result[i - 1] = split; + } + return result; + } + + private ExtTableSnapshotInfo createSnapshotResource(ExtTableSnapshotInfoManager extSnapshotInfoManager, + String tableName, String snapshotID, String[] keyColumns, String hTableName, int shardNum, + IReadableTable sourceTable) throws IOException { + return extSnapshotInfoManager.createSnapshot(sourceTable.getSignature(), tableName, snapshotID, keyColumns, + shardNum, ExtTableSnapshotInfo.STORAGE_TYPE_HBASE, hTableName); + } + + private String genHTableName(KylinConfig kylinConfig, Admin admin, String tableName) throws IOException { + String namePrefix = kylinConfig.getHBaseTableNamePrefix() + + IRealizationConstants.LookupHbaseStorageLocationPrefix + tableName + "_"; + String namespace = kylinConfig.getHBaseStorageNameSpace(); + String hTableName; + Random ran = new Random(); + do { + StringBuffer sb = new StringBuffer(); + if ((namespace.equals("default") || namespace.equals("")) == false) { + sb.append(namespace).append(":"); + } + sb.append(namePrefix); + for (int i = 0; i < HBASE_TABLE_LENGTH; i++) { + sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length()))); + } + hTableName = sb.toString(); + } while (hTableExists(admin, hTableName)); + + return hTableName; + } + + private boolean hTableExists(Admin admin, String hTableName) throws IOException { + return admin.tableExists(TableName.valueOf(hTableName)); + } + + private Connection getHBaseConnection(KylinConfig kylinConfig) throws IOException { + return HBaseConnection.get(kylinConfig.getStorageUrl()); + } + + public static void main(String[] args) throws Exception { + int exitCode = ToolRunner.run(new LookupTableToHFileJob(), args); + System.exit(exitCode); + } + +} diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java new file mode 100644 index 00000000000..4be9533b08a --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.lookup; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; +import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.metadata.TableMetadataManager; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.JoinDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableRef; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.storage.hbase.lookup.HBaseLookupRowEncoder.HBaseRow; + +public class LookupTableToHFileMapper extends KylinMapper { + ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); + + private String cubeName; + private CubeDesc cubeDesc; + private String tableName; + private int shardNum; + private IMRTableInputFormat lookupTableInputFormat; + private long timestamp = 0; + private HBaseLookupRowEncoder encoder; + + @Override + protected void doSetup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME); + tableName = context.getConfiguration().get(BatchConstants.CFG_TABLE_NAME); + shardNum = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_SHARD_NUM)); + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + + CubeManager cubeMgr = CubeManager.getInstance(config); + cubeDesc = cubeMgr.getCube(cubeName).getDescriptor(); + DataModelDesc modelDesc = cubeDesc.getModel(); + TableDesc tableDesc = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc( + tableName, cubeDesc.getProject()); + TableRef lookupTableRef = null; + for (TableRef tableRef : modelDesc.getLookupTables()) { + if (tableRef.getTableIdentity().equalsIgnoreCase(tableName)) { + lookupTableRef = tableRef; + break; + } + } + JoinDesc joinDesc = modelDesc.getJoinByPKSide(lookupTableRef); + TblColRef[] keyColRefs = joinDesc.getPrimaryKeyColumns(); + String[] keyColumns = new String[keyColRefs.length]; + for (int i = 0; i < keyColRefs.length; i++) { + keyColumns[i] = keyColRefs[i].getName(); + } + encoder = new HBaseLookupRowEncoder(tableDesc, keyColumns, shardNum); + lookupTableInputFormat = MRUtil.getTableInputFormat(tableDesc); + } + + @Override + public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException { + Collection rowCollection = lookupTableInputFormat.parseMapperInput(value); + for (String[] row : rowCollection) { + HBaseRow hBaseRow = encoder.encode(row); + + byte[] rowKey = hBaseRow.getRowKey(); + Map qualifierValMap = hBaseRow.getQualifierValMap(); + outputKey.set(rowKey); + for (Entry qualifierValEntry : qualifierValMap.entrySet()) { + KeyValue outputValue = createKeyValue(rowKey, qualifierValEntry.getKey(), qualifierValEntry.getValue()); + context.write(outputKey, outputValue); + } + } + } + + private KeyValue createKeyValue(byte[] keyBytes, byte[] qualifier, byte[] value) { + return new KeyValue(keyBytes, 0, keyBytes.length, // + HBaseLookupRowEncoder.CF, 0, HBaseLookupRowEncoder.CF.length, // + qualifier, 0, qualifier.length, // + timestamp, KeyValue.Type.Put, // + value, 0, value.length); + } + +} diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java new file mode 100644 index 00000000000..409116d08f8 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.lookup; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.List; + +import com.google.common.collect.Lists; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.restclient.RestClient; +import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState; +import org.apache.kylin.engine.mr.steps.lookup.LookupExecutableUtil; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.job.execution.ExecuteResult.State; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UpdateSnapshotCacheForQueryServersStep extends AbstractExecutable { + private static final Logger logger = LoggerFactory.getLogger(UpdateSnapshotCacheForQueryServersStep.class); + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final String tableName = LookupExecutableUtil.getLookupTableName(this.getParams()); + final String snapshotID = LookupExecutableUtil.getLookupSnapshotID(this.getParams()); + final String projectName = LookupExecutableUtil.getProjectName(this.getParams()); + + final KylinConfig config = KylinConfig.getInstanceFromEnv(); + int checkInterval = 10 * 1000; + int maxCheckTime = 10 * 60 * 1000; + + StringWriter outputWriter = new StringWriter(); + PrintWriter pw = new PrintWriter(outputWriter); + String[] restServers = config.getRestServers(); + List serversNeedCheck = Lists.newArrayList(); + for (String restServer : restServers) { + logger.info("send build lookup table cache request to server: " + restServer); + try { + RestClient restClient = new RestClient(restServer); + restClient.buildLookupSnapshotCache(projectName, tableName, snapshotID); + serversNeedCheck.add(restServer); + } catch (IOException e) { + logger.error("error when send build cache request to rest server:" + restServer, e); + pw.println("cache build fail for rest server:" + restServer); + } + } + if (serversNeedCheck.isEmpty()) { + return new ExecuteResult(State.SUCCEED, outputWriter.toString()); + } + + List completeServers = Lists.newArrayList(); + long startTime = System.currentTimeMillis(); + while ((System.currentTimeMillis() - startTime) < maxCheckTime) { + serversNeedCheck.removeAll(completeServers); + if (serversNeedCheck.isEmpty()) { + break; + } + for (String restServer : serversNeedCheck) { + logger.info("check lookup table cache build status for server: " + restServer); + try { + RestClient restClient = new RestClient(restServer); + String stateName = restClient.getLookupSnapshotCacheState(tableName, snapshotID); + if (!stateName.equals(CacheState.IN_BUILDING.name())) { + completeServers.add(restServer); + pw.println("cache build complete for rest server:" + restServer + " cache state:" + stateName); + } + } catch (IOException e) { + logger.error("error when send build cache request to rest server:" + restServer, e); + } + } + try { + Thread.sleep(checkInterval); + } catch (InterruptedException e) { + logger.error("interrupted", e); + } + } + serversNeedCheck.removeAll(completeServers); + if (!serversNeedCheck.isEmpty()) { + pw.println(); + pw.println("check timeout!"); + pw.println("servers not complete:" + serversNeedCheck); + } + return new ExecuteResult(State.SUCCEED, outputWriter.toString()); + } + +} diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoderTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoderTest.java new file mode 100644 index 00000000000..2b21ae36167 --- /dev/null +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoderTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.lookup; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.NavigableMap; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.metadata.TableMetadataManager; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.storage.hbase.lookup.HBaseLookupRowEncoder.HBaseRow; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class HBaseLookupRowEncoderTest extends LocalFileMetadataTestCase { + private TableDesc tableDesc; + + @Before + public void setup() throws Exception { + this.createTestMetadata(); + TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); + tableDesc = metadataManager.getTableDesc("TEST_COUNTRY", "default"); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Test + public void testEnDeCode() { + HBaseLookupRowEncoder lookupRowEncoder = new HBaseLookupRowEncoder(tableDesc, new String[] { "COUNTRY" }, 1); + String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" }; + HBaseRow hBaseRow = lookupRowEncoder.encode(row); + + assertEquals(6, hBaseRow.getRowKey().length); + assertEquals(3, hBaseRow.getQualifierValMap().size()); + NavigableMap qualifierMap = hBaseRow.getQualifierValMap(); + assertEquals("42.546245", Bytes.toString(qualifierMap.get(Bytes.toBytes("1")))); + assertEquals("1.601554", Bytes.toString(qualifierMap.get(Bytes.toBytes("2")))); + String[] decodeRow = lookupRowEncoder.decode(hBaseRow); + assertArrayEquals(row, decodeRow); + } + + @Test + public void testEnDeCodeWithNullValue() { + HBaseLookupRowEncoder lookupRowEncoder = new HBaseLookupRowEncoder(tableDesc, new String[] { "COUNTRY" }, 1); + String[] row = new String[] { "AD", "42.546245", "1.601554", null }; + HBaseRow hBaseRow = lookupRowEncoder.encode(row); + + assertEquals(6, hBaseRow.getRowKey().length); + assertEquals(3, hBaseRow.getQualifierValMap().size()); + NavigableMap qualifierMap = hBaseRow.getQualifierValMap(); + assertEquals("42.546245", Bytes.toString(qualifierMap.get(Bytes.toBytes("1")))); + assertEquals("1.601554", Bytes.toString(qualifierMap.get(Bytes.toBytes("2")))); + String[] decodeRow = lookupRowEncoder.decode(hBaseRow); + assertNull(decodeRow[3]); + assertArrayEquals(row, decodeRow); + } + + @Test + public void testEnDeCodeWithMultiKeys() { + HBaseLookupRowEncoder lookupRowEncoder = new HBaseLookupRowEncoder(tableDesc, + new String[] { "COUNTRY", "NAME" }, 1); + String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" }; + HBaseRow hBaseRow = lookupRowEncoder.encode(row); + + assertEquals(2, hBaseRow.getQualifierValMap().size()); + NavigableMap qualifierMap = hBaseRow.getQualifierValMap(); + assertEquals("42.546245", Bytes.toString(qualifierMap.get(Bytes.toBytes("1")))); + assertEquals("1.601554", Bytes.toString(qualifierMap.get(Bytes.toBytes("2")))); + String[] decodeRow = lookupRowEncoder.decode(hBaseRow); + assertArrayEquals(row, decodeRow); + } + +} diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJobTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJobTest.java new file mode 100644 index 00000000000..cbd046103f5 --- /dev/null +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJobTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +//package org.apache.kylin.storage.hbase.lookup; +// +//import org.apache.hadoop.conf.Configuration; +//import org.apache.hadoop.mapreduce.Job; +//import org.apache.kylin.common.util.LocalFileMetadataTestCase; +//import org.apache.kylin.cube.CubeSegment; +//import org.apache.kylin.engine.mr.CubingJob; +//import org.apache.kylin.job.engine.JobEngineConfig; +//import org.junit.After; +//import org.junit.Before; +//import org.junit.Rule; +//import org.junit.Test; +//import org.powermock.api.mockito.PowerMockito; +//import org.powermock.core.classloader.annotations.PrepareForTest; +//import org.powermock.modules.junit4.rule.PowerMockRule; +// +//@PrepareForTest({ LookupTableToHFileJob.class, Job.class}) +//public class LookupTableToHFileJobTest extends LocalFileMetadataTestCase { +// +// @Rule +// public PowerMockRule rule = new PowerMockRule(); +// +// @Before +// public void setup() throws Exception { +// createTestMetadata(); +// } +// +// @After +// public void after() throws Exception { +// cleanupTestMetadata(); +// } +// +// @Test +// public void testRun() throws Exception { +// String cubeName = "test_kylin_cube_with_slr_1_new_segment"; +// String segmentID = "198va32a-a33e-4b69-83dd-0bb8b1f8c53b"; +// CubeInstance cubeInstance = CubeManager.getInstance(getTestConfig()).getCube(cubeName); +// CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentID); +// +// Configuration conf = HadoopUtil.getCurrentConfiguration(); +// conf.set("fs.defaultFS", "file:///"); +// conf.set("mapreduce.framework.name", "local"); +// conf.set("mapreduce.application.framework.path", ""); +// conf.set("fs.file.impl.disable.cache", "true"); +// +// FileSystem localFileSystem = new LocalFileSystem(); +// URI uri = URI.create("file:///"); +// localFileSystem.initialize(uri, conf); +// +// Job mockedJob = createMockMRJob(conf); +// PowerMockito.stub(PowerMockito.method(Job.class, "getInstance", Configuration.class, String.class)) +// .toReturn(mockedJob); +// PowerMockito.stub(PowerMockito.method(Job.class, "getInstance", Configuration.class, String.class)) +// .toReturn(mockedJob); +// +// StringBuilder cmd = new StringBuilder(); +// JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, +// "Build_Lookup_Table_For_Segment_20130331080000_20131212080000_Step"); +// JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, +// cubeName); +// JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, +// segmentID); +// JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getOutputPath()); +// JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_TABLE_NAME, "EDW.TEST_SITES"); +// +// LookupTableToHFileJob job = new LookupTableToHFileJob(); +// job.setConf(conf); +// job.setAsync(true); +// +// String[] args = cmd.toString().trim().split("\\s+"); +// job.run(args); +// } +// +// private String getOutputPath() { +// return "_tmp_output"; +// } +// +// private CubingJob createMockCubingJob(CubeSegment cubeSeg) { +// JobEngineConfig jobEngineConfig = new JobEngineConfig(getTestConfig()); +// CubingJob cubingJob = CubingJob.createBuildJob(cubeSeg, "unitTest", jobEngineConfig); +// +// return cubingJob; +// } +// +// private Job createMockMRJob(Configuration conf) throws Exception { +// Job job = PowerMockito.mock(Job.class); +// PowerMockito.when(job.getConfiguration()).thenReturn(conf); +// PowerMockito.doNothing().when(job).submit(); +// return job; +// } + +//} diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java new file mode 100644 index 00000000000..e98762dbabb --- /dev/null +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.lookup; + +import static org.junit.Assert.assertTrue; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.Executable; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.job.impl.threadpool.DefaultContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class UpdateSnapshotCacheForQueryServersStepTest extends LocalFileMetadataTestCase { + private KylinConfig kylinConfig; + @Before + public void setup() throws Exception { + this.createTestMetadata(); + kylinConfig = KylinConfig.getInstanceFromEnv(); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Test + public void testExecute() throws ExecuteException { + UpdateSnapshotCacheForQueryServersStep step = new UpdateSnapshotCacheForQueryServersStep(); + ExecuteResult result = step.doWork(new DefaultContext(Maps.newConcurrentMap(), kylinConfig)); + System.out.println(result.output()); + assertTrue(result.succeed()); + } +}