diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java index bccb7a369e8..813eca39807 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java @@ -43,7 +43,7 @@ public class FileResourceStore extends ResourceStore { public FileResourceStore(KylinConfig kylinConfig) { super(kylinConfig); root = new File(getPath(kylinConfig)).getAbsoluteFile(); - if (root.exists() == false) + if (!root.exists()) throw new IllegalArgumentException( "File not exist by '" + kylinConfig.getMetadataUrl() + "': " + root.getAbsolutePath()); } @@ -60,7 +60,7 @@ protected boolean existsImpl(String resPath) throws IOException { @Override protected void visitFolderImpl(String folderPath, boolean recursive, VisitFilter filter, boolean loadContent, - Visitor visitor) throws IOException { + Visitor visitor) throws IOException { if (--failVisitFolderCountDown == 0) throw new IOException("for test"); @@ -177,6 +177,19 @@ protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldT return f.lastModified(); } + @Override + protected void updateTimestampImpl(String resPath, long timestamp) throws IOException { + File f = file(resPath); + if (f.exists()) { + // note file timestamp may lose precision for last two digits of timestamp + boolean success = f.setLastModified(timestamp); + if (!success) { + throw new IOException( + "Update resource timestamp failed, resPath:" + resPath + ", timestamp: " + timestamp); + } + } + } + @Override protected void deleteResourceImpl(String resPath) throws IOException { @@ -189,6 +202,20 @@ protected void deleteResourceImpl(String resPath) throws IOException { } } + @Override + protected void deleteResourceImpl(String resPath, long timestamp) throws IOException { + File f = file(resPath); + try { + if (f.exists()) { + long origLastModified = getResourceTimestampImpl(resPath); + if (checkTimeStampBeforeDelete(origLastModified, timestamp)) + FileUtils.forceDelete(f); + } + } catch (FileNotFoundException e) { + // FileNotFoundException is not a problem in case of delete + } + } + @Override protected String getReadableResourcePathImpl(String resPath) { return file(resPath).toString(); diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java index c38a182f0ec..03cab1fb2e0 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java @@ -65,7 +65,7 @@ public HDFSResourceStore(KylinConfig kylinConfig, StorageURL metadataUrl) throws fs = HadoopUtil.getFileSystem(path); Path metadataPath = new Path(path); - if (fs.exists(metadataPath) == false) { + if (!fs.exists(metadataPath)) { logger.warn("Path not exist in HDFS, create it: {}. ", path); createMetaFolder(metadataPath); } @@ -136,7 +136,7 @@ TreeSet getAllFilePath(Path filePath, String resPathPrefix) throws IOExc @Override protected void visitFolderImpl(String folderPath, boolean recursive, VisitFilter filter, boolean loadContent, - Visitor visitor) throws IOException { + Visitor visitor) throws IOException { Path p = getRealHDFSPath(folderPath); if (!fs.exists(p) || !fs.isDirectory(p)) { return; @@ -247,6 +247,18 @@ protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldT return newTS; } + @Override + protected void updateTimestampImpl(String resPath, long timestamp) throws IOException { + try { + Path p = getRealHDFSPath(resPath); + if (fs.exists(p)) { + fs.setTimes(p, timestamp, -1); + } + } catch (Exception e) { + throw new IOException("Update resource timestamp fail", e); + } + } + @Override protected void deleteResourceImpl(String resPath) throws IOException { try { @@ -258,6 +270,21 @@ protected void deleteResourceImpl(String resPath) throws IOException { throw new IOException("Delete resource fail", e); } } + + @Override + protected void deleteResourceImpl(String resPath, long timestamp) throws IOException { + try { + Path p = getRealHDFSPath(resPath); + if (fs.exists(p)) { + long origLastModified = fs.getFileStatus(p).getModificationTime(); + if (checkTimeStampBeforeDelete(origLastModified, timestamp)) + fs.delete(p, true); + } + } catch (Exception e) { + throw new IOException("Delete resource fail", e); + } + } + @Override protected String getReadableResourcePathImpl(String resPath) { return getRealHDFSPath(resPath).toString(); diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceSQL.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceSQL.java index 3dc7b65ecfb..54233ea0df2 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceSQL.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceSQL.java @@ -30,7 +30,8 @@ public class JDBCResourceSQL { final private String metaTableTs; final private String metaTableContent; - public JDBCResourceSQL(String dialect, String tableName, String metaTableKey, String metaTableTs, String metaTableContent) { + public JDBCResourceSQL(String dialect, String tableName, String metaTableKey, String metaTableTs, + String metaTableContent) { this.format = JDBCSqlQueryFormatProvider.createJDBCSqlQueriesFormat(dialect); this.tableName = tableName; this.metaTableKey = metaTableKey; @@ -96,8 +97,7 @@ public String getInsertSql() { return sql; } - @SuppressWarnings("unused") - private String getReplaceSqlWithoutContent() { + public String getReplaceSqlWithoutContent() { final String sql = new MessageFormat(format.getReplaceSqlWithoutContent(), Locale.ROOT) .format(new Object[] { tableName, metaTableTs, metaTableKey }, new StringBuffer(), new FieldPosition(0)) .toString(); diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java index 74c797f1479..ae491fbbc18 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java @@ -128,7 +128,7 @@ public void execute(Connection connection) throws SQLException { pstat = connection.prepareStatement(createIndexSql); pstat.executeUpdate(); } catch (SQLException ex) { - logger.error("Failed to create index on " + META_TABLE_TS, ex); + logger.error("Failed to create index on {}", META_TABLE_TS, ex); } } @@ -171,7 +171,7 @@ private boolean isJsonMetadata(String resourcePath) { @Override protected void visitFolderImpl(final String folderPath, final boolean recursive, final VisitFilter filter, - final boolean loadContent, final Visitor visitor) throws IOException { + final boolean loadContent, final Visitor visitor) throws IOException { try { executeSql(new SqlOperation() { @@ -184,8 +184,8 @@ public void execute(Connection connection) throws SQLException { lookForPrefix = filter.pathPrefix; } - if (isRootPath(folderPath)){ - for (int i=0; i cls = ClassUtil.forName(clsName, ResourceStore.class); @@ -144,7 +144,8 @@ public String getMetaStoreUUID() throws IOException { /** * Collect resources recursively under a folder, return empty list if folder does not exist */ - final public List collectResourceRecursively(final String folderPath, final String suffix) throws IOException { + final public List collectResourceRecursively(final String folderPath, final String suffix) + throws IOException { return new ExponentialBackoffRetry(this).doWithRetry(new Callable>() { @Override public List call() throws Exception { @@ -217,7 +218,7 @@ final public List getAllResources(String fol * NOTE: Exceptions thrown by ContentReader are swallowed in order to load every resource at best effort. */ final public List getAllResources(final String folderPath, - final boolean recursive, final VisitFilter filter, final ContentReader reader) throws IOException { + final boolean recursive, final VisitFilter filter, final ContentReader reader) throws IOException { return new ExponentialBackoffRetry(this).doWithRetry(new Callable>() { @Override @@ -321,7 +322,7 @@ public Long call() throws IOException { * @return bytes written */ final public long putResource(String resPath, T obj, long ts, - Serializer serializer) throws IOException { + Serializer serializer) throws IOException { resPath = norm(resPath); obj.setLastModified(ts); ContentWriter writer = ContentWriter.create(obj, serializer); @@ -355,7 +356,7 @@ final public long putResource(String resPath, InputStream content, long ts) thro } private void putResourceCheckpoint(String resPath, ContentWriter content, long ts) throws IOException { - logger.trace("Directly saving resource " + resPath + " (Store " + kylinConfig.getMetadataUrl() + ")"); + logger.trace("Directly saving resource {} (Store {})", resPath, kylinConfig.getMetadataUrl()); beforeChange(resPath); putResourceWithRetry(resPath, content, ts); } @@ -377,8 +378,8 @@ public Object call() throws IOException { /** * check & set, overwrite a resource */ - final public void checkAndPutResource(String resPath, T obj, Serializer serializer) - throws IOException, WriteConflictException { + final public void checkAndPutResource(String resPath, T obj, + Serializer serializer) throws IOException, WriteConflictException { checkAndPutResource(resPath, obj, System.currentTimeMillis(), serializer); } @@ -386,9 +387,8 @@ final public void checkAndPutResource(String re * check & set, overwrite a resource */ final public void checkAndPutResource(String resPath, T obj, long newTS, - Serializer serializer) throws IOException, WriteConflictException { + Serializer serializer) throws IOException, WriteConflictException { resPath = norm(resPath); - //logger.debug("Saving resource " + resPath + " (Store " + kylinConfig.getMetadataUrl() + ")"); long oldTS = obj.getLastModified(); obj.setLastModified(newTS); @@ -404,10 +404,7 @@ final public void checkAndPutResource(String re obj.setLastModified(confirmedTS); // update again the confirmed TS //return confirmedTS; - } catch (IOException e) { - obj.setLastModified(oldTS); // roll back TS when write fail - throw e; - } catch (RuntimeException e) { + } catch (IOException | RuntimeException e) { obj.setLastModified(oldTS); // roll back TS when write fail throw e; } @@ -436,7 +433,7 @@ abstract protected long checkAndPutResourceImpl(String resPath, byte[] content, throws IOException, WriteConflictException; private long checkAndPutResourceWithRetry(final String resPath, final byte[] content, final long oldTS, - final long newTS) throws IOException, WriteConflictException { + final long newTS) throws IOException, WriteConflictException { ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this); return retry.doWithRetry(new Callable() { @Override @@ -446,21 +443,61 @@ public Long call() throws IOException { }); } + /** + * update resource timestamp to timestamp + */ + final public void updateTimestamp(String resPath, long timestamp) throws IOException { + logger.trace("Updating resource: {} with timestamp {} (Store {})", resPath, timestamp, + kylinConfig.getMetadataUrl()); + updateTimestampCheckPoint(norm(resPath), timestamp); + } + + private void updateTimestampCheckPoint(String resPath, long timestamp) throws IOException { + beforeChange(resPath); + updateTimestampWithRetry(resPath, timestamp); + } + + private void updateTimestampWithRetry(final String resPath, final long timestamp) throws IOException { + ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this); + retry.doWithRetry(new Callable() { + @Override + public Object call() throws IOException { + updateTimestampImpl(resPath, timestamp); + return null; + } + }); + } + + abstract protected void updateTimestampImpl(String resPath, long timestamp) throws IOException; + /** * delete a resource, does nothing on a folder */ final public void deleteResource(String resPath) throws IOException { - logger.trace("Deleting resource " + resPath + " (Store " + kylinConfig.getMetadataUrl() + ")"); + logger.trace("Deleting resource {} (Store {})", resPath, kylinConfig.getMetadataUrl()); deleteResourceCheckpoint(norm(resPath)); } + final public void deleteResource(String resPath, long timestamp) throws IOException { + logger.trace("Deleting resource {} within timestamp {} (Store {})", resPath, timestamp, + kylinConfig.getMetadataUrl()); + deleteResourceCheckpoint(norm(resPath), timestamp); + } + private void deleteResourceCheckpoint(String resPath) throws IOException { beforeChange(resPath); deleteResourceWithRetry(resPath); } + private void deleteResourceCheckpoint(String resPath, long timestamp) throws IOException { + beforeChange(resPath); + deleteResourceWithRetry(resPath, timestamp); + } + abstract protected void deleteResourceImpl(String resPath) throws IOException; + abstract protected void deleteResourceImpl(String resPath, long timestamp) throws IOException; + private void deleteResourceWithRetry(final String resPath) throws IOException { ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this); retry.doWithRetry(new Callable() { @@ -472,6 +509,26 @@ public Object call() throws IOException { }); } + private void deleteResourceWithRetry(final String resPath, final long timestamp) throws IOException { + ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this); + retry.doWithRetry(new Callable() { + @Override + public Object call() throws IOException { + deleteResourceImpl(resPath, timestamp); + return null; + } + }); + } + + protected boolean checkTimeStampBeforeDelete(long originLastModified, long timestamp) { + // note here is originLastModified may be 0 + // 0 means resource doesn't exists in general, it's safe to pass the check + boolean passCheck = originLastModified <= timestamp; + logger.trace("check timestamp before delete: {}, [originLastModified: {}, timestamp: {}]", passCheck, + originLastModified, timestamp); + return passCheck; + } + /** * called by ExponentialBackoffRetry, to check if an exception is due to unreachable server and worth retry */ @@ -516,7 +573,7 @@ private String norm(String resPath) { resPath = resPath.substring(1); while (resPath.endsWith("/")) resPath = resPath.substring(0, resPath.length() - 1); - if (resPath.startsWith("/") == false) + if (!resPath.startsWith("/")) resPath = "/" + resPath; return resPath; } @@ -572,7 +629,7 @@ public void rollback() { checkThread(); for (String resPath : origResData.keySet()) { - logger.debug("Rollbacking " + resPath); + logger.debug("Rollbacking {}", resPath); try { byte[] data = origResData.get(resPath); Long ts = origResTimestamp.get(resPath); @@ -665,7 +722,8 @@ final public void visitFolder(String folderPath, boolean recursive, Visitor visi * Visit all resource under a folder (optionally recursively), without loading the content of resource. * Low level API, DON'T support ExponentialBackoffRetry, caller should do necessary retry */ - final public void visitFolder(String folderPath, boolean recursive, VisitFilter filter, Visitor visitor) throws IOException { + final public void visitFolder(String folderPath, boolean recursive, VisitFilter filter, Visitor visitor) + throws IOException { visitFolderInner(folderPath, recursive, filter, false, visitor); } @@ -673,12 +731,14 @@ final public void visitFolder(String folderPath, boolean recursive, VisitFilter * Visit all resource and their content under a folder (optionally recursively). * Low level API, DON'T support ExponentialBackoffRetry, caller should do necessary retry */ - final public void visitFolderAndContent(String folderPath, boolean recursive, VisitFilter filter, Visitor visitor) throws IOException { + final public void visitFolderAndContent(String folderPath, boolean recursive, VisitFilter filter, Visitor visitor) + throws IOException { visitFolderInner(folderPath, recursive, filter, true, visitor); } // Low level API, DON'T support ExponentialBackoffRetry, caller should do necessary retry - private void visitFolderInner(String folderPath, boolean recursive, VisitFilter filter, boolean loadContent, Visitor visitor) throws IOException { + private void visitFolderInner(String folderPath, boolean recursive, VisitFilter filter, boolean loadContent, + Visitor visitor) throws IOException { if (filter == null) filter = new VisitFilter(); @@ -702,7 +762,7 @@ private void visitFolderInner(String folderPath, boolean recursive, VisitFilter * NOTE: Broken content exception should be wrapped by RawResource, and return to caller to decide how to handle. */ abstract protected void visitFolderImpl(String folderPath, boolean recursive, VisitFilter filter, - boolean loadContent, Visitor visitor) throws IOException; + boolean loadContent, Visitor visitor) throws IOException; public static String dumpResources(KylinConfig kylinConfig, Collection dumpList) throws IOException { File tmp = File.createTempFile("kylin_job_meta", ""); @@ -731,7 +791,7 @@ public static String dumpResources(KylinConfig kylinConfig, Collection d metaDirURI = "file://" + metaDirURI; else metaDirURI = "file:///" + metaDirURI; - logger.info("meta dir is: " + metaDirURI); + logger.info("meta dir is: {}", metaDirURI); return metaDirURI; } diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java index 828a9355f67..8cd181a6cc2 100644 --- a/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/persistence/ResourceStoreTest.java @@ -63,6 +63,7 @@ public static String mockUrl(String tag, KylinConfig kylinConfig) { private static void testAStore(ResourceStore store) throws IOException { testBasics(store); testGetAllResources(store); + testUpdateResourceTimestamp(store); } private static void testPerformance(ResourceStore store) throws IOException { @@ -113,13 +114,25 @@ private static void testBasics(ResourceStore store) throws IOException { String dir2 = "/table"; String path2 = "/table/_test.json"; StringEntity content2 = new StringEntity("something"); + String dir3 = "/model_desc"; + String path3 = "/model_desc/_test.json"; + StringEntity content3 = new StringEntity("test check timestamp before delete"); // cleanup legacy if any store.deleteResource(path1); store.deleteResource(path2); + store.deleteResource(path3); StringEntity t; + // get non-exist + assertNull(store.getResource(path1)); + assertNull(store.getResource(path1, StringEntity.serializer)); + assertNull(store.getResource(path2)); + assertNull(store.getResource(path2, StringEntity.serializer)); + assertNull(store.getResource(path3)); + assertNull(store.getResource(path3, StringEntity.serializer)); + // put/get store.checkAndPutResource(path1, content1, StringEntity.serializer); assertTrue(store.exists(path1)); @@ -144,39 +157,86 @@ private static void testBasics(ResourceStore store) throws IOException { // expected } + // put path3 + store.checkAndPutResource(path3, content3, StringEntity.serializer); + assertTrue(store.exists(path3)); + t = store.getResource(path3, StringEntity.serializer); + assertEquals(content3, t); + // list - NavigableSet list = null; + NavigableSet list; list = store.listResources(dir1); - System.out.println(list); assertTrue(list.contains(path1)); - assertTrue(list.contains(path2) == false); + assertTrue(!list.contains(path2)); + assertTrue(!list.contains(path3)); list = store.listResources(dir2); assertTrue(list.contains(path2)); - assertTrue(list.contains(path1) == false); + assertTrue(!list.contains(path1)); + assertTrue(!list.contains(path3)); + + list = store.listResources(dir3); + assertTrue(list.contains(path3)); + assertTrue(!list.contains(path1)); + assertTrue(!list.contains(path2)); list = store.listResources("/"); assertTrue(list.contains(dir1)); assertTrue(list.contains(dir2)); - assertTrue(list.contains(path1) == false); - assertTrue(list.contains(path2) == false); + assertTrue(list.contains(dir3)); + assertTrue(!list.contains(path1)); + assertTrue(!list.contains(path2)); + assertTrue(!list.contains(path3)); list = store.listResources(path1); assertNull(list); list = store.listResources(path2); assertNull(list); + list = store.listResources(path3); + assertNull(list); // delete/exist store.deleteResource(path1); - assertTrue(store.exists(path1) == false); + assertTrue(!store.exists(path1)); list = store.listResources(dir1); - assertTrue(list == null || list.contains(path1) == false); + assertTrue(list == null || !list.contains(path1)); store.deleteResource(path2); - assertTrue(store.exists(path2) == false); + assertTrue(!store.exists(path2)); list = store.listResources(dir2); - assertTrue(list == null || list.contains(path2) == false); + assertTrue(list == null || !list.contains(path2)); + + long origLastModified = store.getResourceTimestamp(path3); + long beforeLastModified = origLastModified - 100; + + // beforeLastModified < origLastModified ==> not delete expected + store.deleteResource(path3, beforeLastModified); + assertTrue(store.exists(path3)); + list = store.listResources(dir3); + assertTrue(list != null && list.contains(path3)); + + // beforeLastModified = origLastModified ==> delete expected + store.deleteResource(path3, origLastModified); + assertTrue(!store.exists(path3)); + list = store.listResources(dir3); + assertTrue(list == null || !list.contains(path3)); + + // put again + content3 = new StringEntity("test check timestamp before delete new"); + store.checkAndPutResource(path3, content3, StringEntity.serializer); + assertTrue(store.exists(path3)); + t = store.getResource(path3, StringEntity.serializer); + assertEquals(content3, t); + + origLastModified = store.getResourceTimestamp(path3); + long afterLastModified = origLastModified + 100; + + // afterLastModified > origLastModified ==> delete expected + store.deleteResource(path3, afterLastModified); + assertTrue(!store.exists(path3)); + list = store.listResources(dir3); + assertTrue(list == null || !list.contains(path3)); } private static long testWritePerformance(ResourceStore store) throws IOException { @@ -208,4 +268,44 @@ public static String replaceMetadataUrl(KylinConfig kylinConfig, String newUrl) return oldUrl; } + private static void testUpdateResourceTimestamp(ResourceStore store) throws IOException { + String dir1 = "/cube"; + String path1 = "/cube/_test.json"; + StringEntity content1 = new StringEntity("test update timestamp"); + // cleanup legacy if any + store.deleteResource(path1); + + // get non-exist + assertNull(store.getResource(path1)); + assertNull(store.getResource(path1, StringEntity.serializer)); + + // put/get + StringEntity t; + store.checkAndPutResource(path1, content1, StringEntity.serializer); + assertTrue(store.exists(path1)); + t = store.getResource(path1, StringEntity.serializer); + assertEquals(content1, t); + + long oldTS = store.getResourceTimestamp(path1); + long newTS = oldTS + 1000; + // update timestamp to newTS + store.updateTimestamp(path1, newTS); + + long updatedTS = store.getResourceTimestamp(path1); + assertEquals(updatedTS, newTS); + + newTS = 0L; + store.updateTimestamp(path1, newTS); + updatedTS = store.getResourceTimestamp(path1); + assertEquals(updatedTS, newTS); + + // delete/exist + NavigableSet list; + store.deleteResource(path1); + assertTrue(!store.exists(path1)); + list = store.listResources(dir1); + assertTrue(list == null || !list.contains(path1)); + + } + } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index b2af6561d1c..bd9832fda12 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -330,7 +330,8 @@ public CubeInstance updateCubeSegStatus(CubeSegment seg, SegmentStatusEnum statu } } - public CubeInstance updateCubeLookupSnapshot(CubeInstance cube, String lookupTableName, String newSnapshotResPath) throws IOException { + public CubeInstance updateCubeLookupSnapshot(CubeInstance cube, String lookupTableName, String newSnapshotResPath) + throws IOException { try (AutoLock lock = cubeMapLock.lockForWrite()) { cube = cube.latestCopyForWrite(); @@ -419,7 +420,7 @@ private void setCubeMember(CubeInstance cube, CubeUpdate update) { } if (update.getUpdateTableSnapshotPath() != null) { - for(Map.Entry lookupSnapshotPathEntry : update.getUpdateTableSnapshotPath().entrySet()) { + for (Map.Entry lookupSnapshotPathEntry : update.getUpdateTableSnapshotPath().entrySet()) { cube.putSnapshotResPath(lookupSnapshotPathEntry.getKey(), lookupSnapshotPathEntry.getValue()); } } @@ -444,7 +445,8 @@ private void processToUpdateSegments(CubeUpdate update, Segments ne } } - private void processToRemoveSegments(CubeUpdate update, Segments newSegs, List toRemoveResources) { + private void processToRemoveSegments(CubeUpdate update, Segments newSegs, + List toRemoveResources) { Iterator iterator = newSegs.iterator(); while (iterator.hasNext()) { CubeSegment currentSeg = iterator.next(); @@ -460,7 +462,7 @@ private void processToRemoveSegments(CubeUpdate update, Segments ne } // for test - CubeInstance reloadCube(String cubeName) { + public CubeInstance reloadCube(String cubeName) { try (AutoLock lock = cubeMapLock.lockForWrite()) { return crud.reload(cubeName); } @@ -522,7 +524,8 @@ public ILookupTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) { } } - private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join, SnapshotTableDesc snapshotTableDesc) { + private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join, + SnapshotTableDesc snapshotTableDesc) { String tableName = join.getPKSide().getTableIdentity(); String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc); String[] pkCols = join.getPrimaryKey(); @@ -537,11 +540,12 @@ private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join, } } - private ILookupTable getExtLookupTable(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) { + private ILookupTable getExtLookupTable(CubeSegment cubeSegment, String tableName, + SnapshotTableDesc snapshotTableDesc) { String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc); - ExtTableSnapshotInfo extTableSnapshot = ExtTableSnapshotInfoManager.getInstance(config).getSnapshot( - snapshotResPath); + ExtTableSnapshotInfo extTableSnapshot = ExtTableSnapshotInfoManager.getInstance(config) + .getSnapshot(snapshotResPath); TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject()); return LookupProviderFactory.getExtLookupTable(tableDesc, extTableSnapshot); } @@ -874,8 +878,7 @@ private SegmentRange getOffsetCubeSegRange(CubeInstance cubeCopy, TSRange tsRang if (pair == null) throw new IllegalArgumentException( "Find no segments to merge by " + tsRange + " for cube " + cubeCopy); - segRange = new SegmentRange(pair.getFirst().getSegRange().start, - pair.getSecond().getSegRange().end); + segRange = new SegmentRange(pair.getFirst().getSegRange().start, pair.getSecond().getSegRange().end); } return segRange; } @@ -931,9 +934,8 @@ public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegCopy) cubeCopy.toString(), newSegCopy.toString())); if (StringUtils.isBlank(newSegCopy.getLastBuildJobID())) - throw new IllegalStateException( - String.format(Locale.ROOT, "For cube %s, segment %s missing LastBuildJobID", - cubeCopy.toString(), newSegCopy.toString())); + throw new IllegalStateException(String.format(Locale.ROOT, + "For cube %s, segment %s missing LastBuildJobID", cubeCopy.toString(), newSegCopy.toString())); if (isReady(newSegCopy) == true) { logger.warn("For cube {}, segment {} state should be NEW but is READY", cubeCopy, newSegCopy); @@ -985,9 +987,9 @@ public void promoteCheckpointOptimizeSegments(CubeInstance cube, Map CubeSegment[] optSegCopy = cubeCopy.regetSegments(optimizedSegments); if (cubeCopy.getSegments().size() != optSegCopy.length * 2) { - throw new IllegalStateException( - String.format(Locale.ROOT, "For cube %s, every READY segment should be optimized and all segments should be READY before optimizing", - cubeCopy.toString())); + throw new IllegalStateException(String.format(Locale.ROOT, + "For cube %s, every READY segment should be optimized and all segments should be READY before optimizing", + cubeCopy.toString())); } CubeSegment[] originalSegments = new CubeSegment[optSegCopy.length]; @@ -1001,15 +1003,14 @@ public void promoteCheckpointOptimizeSegments(CubeInstance cube, Map cubeCopy.toString(), seg.toString())); if (StringUtils.isBlank(seg.getLastBuildJobID())) - throw new IllegalStateException( - String.format(Locale.ROOT, "For cube %s, segment %s missing LastBuildJobID", - cubeCopy.toString(), seg.toString())); + throw new IllegalStateException(String.format(Locale.ROOT, + "For cube %s, segment %s missing LastBuildJobID", cubeCopy.toString(), seg.toString())); seg.setStatus(SegmentStatusEnum.READY); } - logger.info("Promoting cube {}, new segments {}, to remove segments {}", - cubeCopy, Arrays.toString(optSegCopy), originalSegments); + logger.info("Promoting cube {}, new segments {}, to remove segments {}", cubeCopy, + Arrays.toString(optSegCopy), originalSegments); CubeUpdate update = new CubeUpdate(cubeCopy); update.setToRemoveSegs(originalSegments) // @@ -1026,9 +1027,9 @@ private void validateNewSegments(CubeInstance cube, CubeSegment newSegments) { List tobe = cube.calculateToBeSegments(newSegments); List newList = Arrays.asList(newSegments); if (tobe.containsAll(newList) == false) { - throw new IllegalStateException( - String.format(Locale.ROOT, "For cube %s, the new segments %s do not fit in its current %s; the resulted tobe is %s", - cube.toString(), newList.toString(), cube.getSegments().toString(), tobe.toString())); + throw new IllegalStateException(String.format(Locale.ROOT, + "For cube %s, the new segments %s do not fit in its current %s; the resulted tobe is %s", + cube.toString(), newList.toString(), cube.getSegments().toString(), tobe.toString())); } } @@ -1171,7 +1172,8 @@ public Dictionary getDictionary(CubeSegment cubeSeg, TblColRef col) { return (Dictionary) info.getDictionaryObject(); } - public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid) throws IOException { + public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid) + throws IOException { // work on copy instead of cached objects CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid()); diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java index 1bec02ae877..67b7bdb894b 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java @@ -20,18 +20,24 @@ import java.io.IOException; import java.util.Locale; +import java.util.Map; import java.util.Set; import org.apache.hadoop.io.IOUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.DimensionDesc; +import org.apache.kylin.dict.DictionaryInfo; +import org.apache.kylin.dict.DictionaryInfoSerializer; import org.apache.kylin.dict.DictionaryProvider; import org.apache.kylin.dict.DistinctColumnValuesProvider; import org.apache.kylin.dict.lookup.ILookupTable; +import org.apache.kylin.dict.lookup.SnapshotTable; +import org.apache.kylin.dict.lookup.SnapshotTableSerializer; import org.apache.kylin.metadata.model.JoinDesc; import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; @@ -43,7 +49,8 @@ public class DictionaryGeneratorCLI { - private DictionaryGeneratorCLI(){} + private DictionaryGeneratorCLI() { + } private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class); @@ -52,7 +59,26 @@ public static void processSegment(KylinConfig config, String cubeName, String se CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); CubeSegment segment = cube.getSegmentById(segmentID); - processSegment(config, segment, uuid, factTableValueProvider, dictProvider); + int retryTime = 0; + while (retryTime < 3) { + if (retryTime > 0) { + logger.info("Rebuild dictionary and snapshot for Cube: {}, Segment: {}, {} times.", cubeName, segmentID, + retryTime); + } + + processSegment(config, segment, uuid, factTableValueProvider, dictProvider); + + if (isAllDictsAndSnapshotsReady(config, cubeName, segmentID)) { + break; + } + retryTime++; + } + + if (retryTime >= 3) { + logger.error("Not all dictionaries and snapshots ready for cube segment: {}", segmentID); + } else { + logger.info("Succeed to build all dictionaries and snapshots for cube segment: {}", segmentID); + } } private static void processSegment(KylinConfig config, CubeSegment cubeSeg, String uuid, @@ -113,4 +139,51 @@ private static void processSegment(KylinConfig config, CubeSegment cubeSeg, Stri } } + private static boolean isAllDictsAndSnapshotsReady(KylinConfig config, String cubeName, String segmentID) { + CubeInstance cube = CubeManager.getInstance(config).reloadCube(cubeName); + CubeSegment segment = cube.getSegmentById(segmentID); + ResourceStore store = ResourceStore.getStore(config); + + // check dicts + logger.info("Begin to check if all dictionaries exist of Segment: {}", segmentID); + Map dictionaries = segment.getDictionaries(); + for (Map.Entry entry : dictionaries.entrySet()) { + String dictResPath = entry.getValue(); + String dictKey = entry.getKey(); + try { + DictionaryInfo dictInfo = store.getResource(dictResPath, DictionaryInfoSerializer.INFO_SERIALIZER); + if (dictInfo == null) { + logger.warn("Dictionary=[key: {}, resource path: {}] doesn't exist in resource store", dictKey, + dictResPath); + return false; + } + } catch (IOException e) { + logger.warn("Dictionary=[key: {}, path: {}] failed to check, details: {}", dictKey, dictResPath, e); + return false; + } + } + + // check snapshots + logger.info("Begin to check if all snapshots exist of Segment: {}", segmentID); + Map snapshots = segment.getSnapshots(); + for (Map.Entry entry : snapshots.entrySet()) { + String snapshotKey = entry.getKey(); + String snapshotResPath = entry.getValue(); + try { + SnapshotTable snapshot = store.getResource(snapshotResPath, SnapshotTableSerializer.INFO_SERIALIZER); + if (snapshot == null) { + logger.info("SnapshotTable=[key: {}, resource path: {}] doesn't exist in resource store", + snapshotKey, snapshotResPath); + return false; + } + } catch (IOException e) { + logger.warn("SnapshotTable=[key: {}, resource path: {}] failed to check, details: {}", snapshotKey, + snapshotResPath, e); + return false; + } + } + + logger.info("All dictionaries and snapshots exist checking succeed for Cube Segment: {}", segmentID); + return true; + } } diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java index 016700f130d..ffee105c7ff 100755 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -18,12 +18,13 @@ package org.apache.kylin.dict; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.List; +import java.util.NavigableSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.ClassUtil; @@ -36,11 +37,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.List; -import java.util.NavigableSet; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.collect.Lists; public class DictionaryManager { @@ -69,8 +71,8 @@ private DictionaryManager(KylinConfig config) { .removalListener(new RemovalListener() { @Override public void onRemoval(RemovalNotification notification) { - DictionaryManager.logger.info("Dict with resource path " + notification.getKey() - + " is removed due to " + notification.getCause()); + DictionaryManager.logger.info("Dict with resource path {} is removed due to {}", + notification.getKey(), notification.getCause()); } })// .maximumSize(config.getCachedDictMaxEntrySize())// @@ -134,10 +136,11 @@ public DictionaryInfo trySaveNewDict(Dictionary newDict, DictionaryInfo largestDictInfo = getDictionaryInfo(largestDictInfo.getResourcePath()); Dictionary largestDictObject = largestDictInfo.getDictionaryObject(); if (largestDictObject.contains(newDict)) { - logger.info("dictionary content " + newDict + ", is contained by dictionary at " + largestDictInfo.getResourcePath()); + logger.info("dictionary content {}, is contained by dictionary at {}", newDict, + largestDictInfo.getResourcePath()); return largestDictInfo; } else if (newDict.contains(largestDictObject)) { - logger.info("dictionary content " + newDict + " is by far the largest, save it"); + logger.info("dictionary content {} is by far the largest, save it", newDict); return saveNewDict(newDictInfo); } else { logger.info("merge dict and save..."); @@ -148,17 +151,18 @@ public DictionaryInfo trySaveNewDict(Dictionary newDict, DictionaryInfo return saveNewDict(newDictInfo); } } else { - String dupDict = checkDupByContent(newDictInfo, newDict); + DictionaryInfo dupDict = checkDupByContent(newDictInfo, newDict); if (dupDict != null) { - logger.info("Identical dictionary content, reuse existing dictionary at " + dupDict); - return getDictionaryInfo(dupDict); + logger.info("Identical dictionary content, reuse existing dictionary at {}", dupDict.getResourcePath()); + dupDict = updateExistingDictLastModifiedTime(dupDict.getResourcePath()); + return dupDict; } return saveNewDict(newDictInfo); } } - private String checkDupByContent(DictionaryInfo dictInfo, Dictionary dict) throws IOException { + private DictionaryInfo checkDupByContent(DictionaryInfo dictInfo, Dictionary dict) throws IOException { ResourceStore store = getStore(); NavigableSet existings = store.listResources(dictInfo.getResourceDir()); if (existings == null) @@ -170,19 +174,33 @@ private String checkDupByContent(DictionaryInfo dictInfo, Dictionary dic } for (String existing : existings) { - DictionaryInfo existingInfo = getDictionaryInfo(existing); - if (existingInfo != null) { - if ((config.isDictResuable() && existingInfo.getDictionaryObject().contains(dict)) - || dict.equals(existingInfo.getDictionaryObject())) { - return existing; + try { + if (existing.endsWith(".dict")) { + DictionaryInfo existingInfo = getDictionaryInfo(existing); + if (existingInfo != null && dict.equals(existingInfo.getDictionaryObject())) { + return existingInfo; + } } - + } catch (Exception ex) { + logger.error("Tolerate exception checking dup dictionary " + existing, ex); } } return null; } + private DictionaryInfo updateExistingDictLastModifiedTime(String dictPath) throws IOException { + ResourceStore store = getStore(); + if (StringUtils.isBlank(dictPath)) + return NONE_INDICATOR; + long now = System.currentTimeMillis(); + store.updateTimestamp(dictPath, now); + logger.info("Update dictionary {} lastModifiedTime to {}", dictPath, now); + DictionaryInfo dictInfo = load(dictPath, true); + updateDictCache(dictInfo); + return dictInfo; + } + private void initDictInfo(Dictionary newDict, DictionaryInfo newDictInfo) { newDictInfo.setCardinality(newDict.getSize()); newDictInfo.setDictionaryObject(newDict); @@ -190,16 +208,14 @@ private void initDictInfo(Dictionary newDict, DictionaryInfo newDictInfo } private DictionaryInfo saveNewDict(DictionaryInfo newDictInfo) throws IOException { - save(newDictInfo); - dictCache.put(newDictInfo.getResourcePath(), newDictInfo); - + updateDictCache(newDictInfo); return newDictInfo; } public DictionaryInfo mergeDictionary(List dicts) throws IOException { - if (dicts.size() == 0) + if (dicts.isEmpty()) return null; if (dicts.size() == 1) @@ -209,7 +225,7 @@ public DictionaryInfo mergeDictionary(List dicts) throws IOExcep * AppendTrieDictionary needn't merge * more than one AppendTrieDictionary will generate when user use {@link SegmentAppendTrieDictBuilder} */ - for (DictionaryInfo dict: dicts) { + for (DictionaryInfo dict : dicts) { if (dict.getDictionaryClass().equals(AppendTrieDictionary.class.getName())) { return dict; } @@ -224,7 +240,8 @@ public DictionaryInfo mergeDictionary(List dicts) throws IOExcep } else { if (!firstDictInfo.isDictOnSameColumn(info)) { // don't throw exception, just output warning as legacy cube segment may build dict on PK - logger.warn("Merging dictionaries are not structurally equal : " + firstDictInfo.getResourcePath() + " and " + info.getResourcePath()); + logger.warn("Merging dictionaries are not structurally equal : {} and {}", + firstDictInfo.getResourcePath(), info.getResourcePath()); } } totalSize += info.getInput().getSize(); @@ -241,12 +258,6 @@ public DictionaryInfo mergeDictionary(List dicts) throws IOExcep signature.setLastModifiedTime(System.currentTimeMillis()); signature.setPath("merged_with_no_original_path"); - // String dupDict = checkDupByInfo(newDictInfo); - // if (dupDict != null) { - // logger.info("Identical dictionary input " + newDictInfo.getInput() + ", reuse existing dictionary at " + dupDict); - // return getDictionaryInfo(dupDict); - // } - //check for cases where merging dicts are actually same boolean identicalSourceDicts = true; for (int i = 1; i < dicts.size(); ++i) { @@ -260,7 +271,8 @@ public DictionaryInfo mergeDictionary(List dicts) throws IOExcep logger.info("Use one of the merging dictionaries directly"); return dicts.get(0); } else { - Dictionary newDict = DictionaryGenerator.mergeDictionaries(DataType.getType(newDictInfo.getDataType()), dicts); + Dictionary newDict = DictionaryGenerator + .mergeDictionaries(DataType.getType(newDictInfo.getDataType()), dicts); return trySaveNewDict(newDict, newDictInfo); } } @@ -269,33 +281,38 @@ public DictionaryInfo buildDictionary(TblColRef col, IReadableTable inpTable) th return buildDictionary(col, inpTable, null); } - public DictionaryInfo buildDictionary(TblColRef col, IReadableTable inpTable, String builderClass) throws IOException { - if (inpTable.exists() == false) + public DictionaryInfo buildDictionary(TblColRef col, IReadableTable inpTable, String builderClass) + throws IOException { + if (!inpTable.exists()) return null; - logger.info("building dictionary for " + col); + logger.info("building dictionary for {}", col); DictionaryInfo dictInfo = createDictionaryInfo(col, inpTable); String dupInfo = checkDupByInfo(dictInfo); if (dupInfo != null) { - logger.info("Identical dictionary input " + dictInfo.getInput() + ", reuse existing dictionary at " + dupInfo); - return getDictionaryInfo(dupInfo); + logger.info("Identical dictionary input {}, reuse existing dictionary at {}", dictInfo.getInput(), dupInfo); + DictionaryInfo dupDictInfo = updateExistingDictLastModifiedTime(dupInfo); + return dupDictInfo; } - logger.info("Building dictionary object " + JsonUtil.writeValueAsString(dictInfo)); + logger.info("Building dictionary object {}", JsonUtil.writeValueAsString(dictInfo)); Dictionary dictionary; dictionary = buildDictFromReadableTable(inpTable, dictInfo, builderClass, col); return trySaveNewDict(dictionary, dictInfo); } - private Dictionary buildDictFromReadableTable(IReadableTable inpTable, DictionaryInfo dictInfo, String builderClass, TblColRef col) throws IOException { + private Dictionary buildDictFromReadableTable(IReadableTable inpTable, DictionaryInfo dictInfo, + String builderClass, TblColRef col) throws IOException { Dictionary dictionary; IDictionaryValueEnumerator columnValueEnumerator = null; try { - columnValueEnumerator = new TableColumnValueEnumerator(inpTable.getReader(), dictInfo.getSourceColumnIndex()); + columnValueEnumerator = new TableColumnValueEnumerator(inpTable.getReader(), + dictInfo.getSourceColumnIndex()); if (builderClass == null) { - dictionary = DictionaryGenerator.buildDictionary(DataType.getType(dictInfo.getDataType()), columnValueEnumerator); + dictionary = DictionaryGenerator.buildDictionary(DataType.getType(dictInfo.getDataType()), + columnValueEnumerator); } else { IDictionaryBuilder builder = (IDictionaryBuilder) ClassUtil.newInstance(builderClass); dictionary = DictionaryGenerator.buildDictionary(builder, dictInfo, columnValueEnumerator); @@ -309,12 +326,14 @@ private Dictionary buildDictFromReadableTable(IReadableTable inpTable, D return dictionary; } - public DictionaryInfo saveDictionary(TblColRef col, IReadableTable inpTable, Dictionary dictionary) throws IOException { + public DictionaryInfo saveDictionary(TblColRef col, IReadableTable inpTable, Dictionary dictionary) + throws IOException { DictionaryInfo dictInfo = createDictionaryInfo(col, inpTable); String dupInfo = checkDupByInfo(dictInfo); if (dupInfo != null) { - logger.info("Identical dictionary input " + dictInfo.getInput() + ", reuse existing dictionary at " + dupInfo); - return getDictionaryInfo(dupInfo); + logger.info("Identical dictionary input {}, reuse existing dictionary at {}", dictInfo.getInput(), dupInfo); + DictionaryInfo dupDictInfo = updateExistingDictLastModifiedTime(dupInfo); + return dupDictInfo; } return trySaveNewDict(dictionary, dictInfo); @@ -331,7 +350,8 @@ private DictionaryInfo createDictionaryInfo(TblColRef col, IReadableTable inpTab private String checkDupByInfo(DictionaryInfo dictInfo) throws IOException { final ResourceStore store = getStore(); - final List allResources = store.getAllResources(dictInfo.getResourceDir(), DictionaryInfoSerializer.INFO_SERIALIZER); + final List allResources = store.getAllResources(dictInfo.getResourceDir(), + DictionaryInfoSerializer.INFO_SERIALIZER); TableSignature input = dictInfo.getInput(); @@ -345,7 +365,8 @@ private String checkDupByInfo(DictionaryInfo dictInfo) throws IOException { private DictionaryInfo findLargestDictInfo(DictionaryInfo dictInfo) throws IOException { final ResourceStore store = getStore(); - final List allResources = store.getAllResources(dictInfo.getResourceDir(), DictionaryInfoSerializer.INFO_SERIALIZER); + final List allResources = store.getAllResources(dictInfo.getResourceDir(), + DictionaryInfoSerializer.INFO_SERIALIZER); DictionaryInfo largestDict = null; for (DictionaryInfo dictionaryInfo : allResources) { @@ -362,7 +383,7 @@ private DictionaryInfo findLargestDictInfo(DictionaryInfo dictInfo) throws IOExc } public void removeDictionary(String resourcePath) throws IOException { - logger.info("Remvoing dict: " + resourcePath); + logger.info("Remvoing dict: {}", resourcePath); ResourceStore store = getStore(); store.deleteResource(resourcePath); dictCache.invalidate(resourcePath); @@ -385,7 +406,7 @@ public void removeDictionaries(String srcTable, String srcCol) throws IOExceptio void save(DictionaryInfo dict) throws IOException { ResourceStore store = getStore(); String path = dict.getResourcePath(); - logger.info("Saving dictionary at " + path); + logger.info("Saving dictionary at {}", path); store.putBigResource(path, dict, System.currentTimeMillis(), DictionaryInfoSerializer.FULL_SERIALIZER); } @@ -393,11 +414,19 @@ void save(DictionaryInfo dict) throws IOException { DictionaryInfo load(String resourcePath, boolean loadDictObj) throws IOException { ResourceStore store = getStore(); - logger.info("DictionaryManager(" + System.identityHashCode(this) + ") loading DictionaryInfo(loadDictObj:" + loadDictObj + ") at " + resourcePath); - DictionaryInfo info = store.getResource(resourcePath, loadDictObj ? DictionaryInfoSerializer.FULL_SERIALIZER : DictionaryInfoSerializer.INFO_SERIALIZER); + if (loadDictObj) { + logger.info("Loading dictionary at {}", resourcePath); + } + + DictionaryInfo info = store.getResource(resourcePath, + loadDictObj ? DictionaryInfoSerializer.FULL_SERIALIZER : DictionaryInfoSerializer.INFO_SERIALIZER); return info; } + private void updateDictCache(DictionaryInfo newDictInfo) { + dictCache.put(newDictInfo.getResourcePath(), newDictInfo); + } + private ResourceStore getStore() { return ResourceStore.getStore(config); } diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java index 8e63989706a..8f68fb0df0c 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java @@ -24,7 +24,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import com.google.common.collect.Lists; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.metadata.TableMetadataManager; @@ -39,6 +38,9 @@ import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; +import com.google.common.collect.Lists; /** * @author yangli9 @@ -68,7 +70,8 @@ private SnapshotManager(KylinConfig config) { this.snapshotCache = CacheBuilder.newBuilder().removalListener(new RemovalListener() { @Override public void onRemoval(RemovalNotification notification) { - SnapshotManager.logger.info("Snapshot with resource path " + notification.getKey() + " is removed due to " + notification.getCause()); + SnapshotManager.logger.info("Snapshot with resource path {} is removed due to {}", + notification.getKey(), notification.getCause()); } }).maximumSize(config.getCachedSnapshotMaxEntrySize())// .expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader() { @@ -88,8 +91,7 @@ public SnapshotTable getSnapshotTable(String resourcePath) throws IOException { try { SnapshotTable r = snapshotCache.get(resourcePath); if (r == null) { - r = load(resourcePath, true); - snapshotCache.put(resourcePath, r); + r = loadAndUpdateLocalCache(resourcePath); } return r; } catch (ExecutionException e) { @@ -97,6 +99,12 @@ public SnapshotTable getSnapshotTable(String resourcePath) throws IOException { } } + private SnapshotTable loadAndUpdateLocalCache(String snapshotResPath) throws IOException { + SnapshotTable snapshotTable = load(snapshotResPath, true); + snapshotCache.put(snapshotTable.getResourcePath(), snapshotTable); + return snapshotTable; + } + public List getSnapshots(String tableName, TableSignature sourceTableSignature) throws IOException { List result = Lists.newArrayList(); String tableSnapshotsPath = SnapshotTable.getResourceDir(tableName); @@ -115,34 +123,34 @@ public void removeSnapshot(String resourcePath) throws IOException { snapshotCache.invalidate(resourcePath); } - public SnapshotTable buildSnapshot(IReadableTable table, TableDesc tableDesc, KylinConfig cubeConfig) throws IOException { + public SnapshotTable buildSnapshot(IReadableTable table, TableDesc tableDesc, KylinConfig cubeConfig) + throws IOException { SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity()); snapshot.updateRandomUuid(); + Interner pool = Interners.newWeakInterner(); - String dup = checkDupByInfo(snapshot); - if (dup != null) { - logger.info("Identical input " + table.getSignature() + ", reuse existing snapshot at " + dup); - return getSnapshotTable(dup); - } + synchronized (pool.intern(tableDesc.getIdentity())) { + SnapshotTable reusableSnapshot = getReusableSnapShot(table, snapshot, tableDesc, cubeConfig); + if (reusableSnapshot != null) + return updateDictLastModifiedTime(reusableSnapshot.getResourcePath()); - if ((float) snapshot.getSignature().getSize() / 1024 / 1024 > cubeConfig.getTableSnapshotMaxMB()) { - throw new IllegalStateException("Table snapshot should be no greater than " + cubeConfig.getTableSnapshotMaxMB() // - + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize()); + snapshot.takeSnapshot(table, tableDesc); + return trySaveNewSnapshot(snapshot); } - - snapshot.takeSnapshot(table, tableDesc); - - return trySaveNewSnapshot(snapshot); } - public SnapshotTable rebuildSnapshot(IReadableTable table, TableDesc tableDesc, String overwriteUUID) throws IOException { + public SnapshotTable rebuildSnapshot(IReadableTable table, TableDesc tableDesc, String overwriteUUID) + throws IOException { SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity()); snapshot.setUuid(overwriteUUID); - snapshot.takeSnapshot(table, tableDesc); - SnapshotTable existing = getSnapshotTable(snapshot.getResourcePath()); - snapshot.setLastModified(existing.getLastModified()); + try { + SnapshotTable existing = getSnapshotTable(snapshot.getResourcePath()); + snapshot.setLastModified(existing.getLastModified()); + } catch (Exception ex) { + logger.error("Error reading {}, delete it and save rebuild", snapshot.getResourcePath(), ex); + } save(snapshot); snapshotCache.put(snapshot.getResourcePath(), snapshot); @@ -150,12 +158,30 @@ public SnapshotTable rebuildSnapshot(IReadableTable table, TableDesc tableDesc, return snapshot; } + private SnapshotTable getReusableSnapShot(IReadableTable table, SnapshotTable snapshot, TableDesc tableDesc, + KylinConfig cubeConfig) throws IOException { + String dup = checkDupByInfo(snapshot); + + if ((float) snapshot.getSignature().getSize() / 1024 / 1024 > cubeConfig.getTableSnapshotMaxMB()) { + throw new IllegalStateException( + "Table snapshot should be no greater than " + cubeConfig.getTableSnapshotMaxMB() // + + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize()); + } + + if (dup != null) { + logger.info("Identical input {}, reuse existing snapshot at {}", table.getSignature(), dup); + return getSnapshotTable(dup); + } else { + return null; + } + } + public SnapshotTable trySaveNewSnapshot(SnapshotTable snapshotTable) throws IOException { String dupTable = checkDupByContent(snapshotTable); if (dupTable != null) { - logger.info("Identical snapshot content " + snapshotTable + ", reuse existing snapshot at " + dupTable); - return getSnapshotTable(dupTable); + logger.info("Identical snapshot content {}, reuse existing snapshot at {}", snapshotTable, dupTable); + return updateDictLastModifiedTime(dupTable); } save(snapshotTable); @@ -198,6 +224,16 @@ private String checkDupByContent(SnapshotTable snapshot) throws IOException { return null; } + private SnapshotTable updateDictLastModifiedTime(String snapshotPath) throws IOException { + ResourceStore store = getStore(); + long now = System.currentTimeMillis(); + store.updateTimestamp(snapshotPath, now); + logger.info("Update snapshotTable {} lastModifiedTime to {}", snapshotPath, now); + + // update cache + return loadAndUpdateLocalCache(snapshotPath); + } + private void save(SnapshotTable snapshot) throws IOException { ResourceStore store = getStore(); String path = snapshot.getResourcePath(); @@ -205,13 +241,14 @@ private void save(SnapshotTable snapshot) throws IOException { } private SnapshotTable load(String resourcePath, boolean loadData) throws IOException { - logger.info("Loading snapshotTable from " + resourcePath + ", with loadData: " + loadData); + logger.info("Loading snapshotTable from {}, with loadData: {}", resourcePath, loadData); ResourceStore store = getStore(); - SnapshotTable table = store.getResource(resourcePath, loadData ? SnapshotTableSerializer.FULL_SERIALIZER : SnapshotTableSerializer.INFO_SERIALIZER); + SnapshotTable table = store.getResource(resourcePath, + loadData ? SnapshotTableSerializer.FULL_SERIALIZER : SnapshotTableSerializer.INFO_SERIALIZER); if (loadData) - logger.debug("Loaded snapshot at " + resourcePath); + logger.debug("Loaded snapshot at {}", resourcePath); return table; } diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java index 6a86e3351c8..bf97cc96d4f 100755 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryManagerTest.java @@ -19,6 +19,7 @@ package org.apache.kylin.dict; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -45,9 +46,8 @@ public void tearDown() { cleanupTestMetadata(); } - @Test - public void testBuildSaveDictionary() throws IOException { + public void testBuildSaveDictionary() throws IOException, InterruptedException { KylinConfig config = KylinConfig.getInstanceFromEnv(); DictionaryManager dictMgr = DictionaryManager.getInstance(config); DataModelManager metaMgr = DataModelManager.getInstance(config); @@ -57,25 +57,48 @@ public void testBuildSaveDictionary() throws IOException { // non-exist input returns null; DictionaryInfo nullInfo = dictMgr.buildDictionary(col, MockupReadableTable.newNonExistTable("/a/path")); assertEquals(null, nullInfo); - - DictionaryInfo info1 = dictMgr.buildDictionary(col, MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", "3")); + + DictionaryInfo info1 = dictMgr.buildDictionary(col, + MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", "3")); assertEquals(3, info1.getDictionaryObject().getSize()); + long info1LastModified = info1.getLastModified(); + // same input returns same dict - DictionaryInfo info2 = dictMgr.buildDictionary(col, MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", "3")); - assertTrue(info1 == info2); - + // sleep 1 second to avoid file resource store timestamp precision lost when update + Thread.sleep(1000); + DictionaryInfo info2 = dictMgr.buildDictionary(col, + MockupReadableTable.newSingleColumnTable("/a/path", "1", "2", "3")); + assertTrue(info1 != info2); + assertEquals(info1.getResourcePath(), info2.getResourcePath()); + + // update last modified when reused dict + long info2LastModified = info2.getLastModified(); + assertTrue(info2LastModified > info1LastModified); + // same input values (different path) returns same dict - DictionaryInfo info3 = dictMgr.buildDictionary(col, MockupReadableTable.newSingleColumnTable("/a/different/path", "1", "2", "3")); - assertTrue(info1 == info3); - + // sleep 1 second to avoid file resource store timestamp precision lost when update + Thread.sleep(1000); + DictionaryInfo info3 = dictMgr.buildDictionary(col, + MockupReadableTable.newSingleColumnTable("/a/different/path", "1", "2", "3")); + assertTrue(info1 != info3); + assertTrue(info2 != info3); + assertEquals(info1.getResourcePath(), info3.getResourcePath()); + assertEquals(info2.getResourcePath(), info3.getResourcePath()); + + // update last modified when reused dict + long info3LastModified = info3.getLastModified(); + assertTrue(info3LastModified > info2LastModified); + // save dictionary works in spite of non-exist table - Dictionary dict = DictionaryGenerator.buildDictionary(col.getType(), new IterableDictionaryValueEnumerator("1", "2", "3")); + Dictionary dict = DictionaryGenerator.buildDictionary(col.getType(), + new IterableDictionaryValueEnumerator("1", "2", "3")); DictionaryInfo info4 = dictMgr.saveDictionary(col, MockupReadableTable.newNonExistTable("/a/path"), dict); - assertTrue(info1 == info4); - - Dictionary dict2 = DictionaryGenerator.buildDictionary(col.getType(), new IterableDictionaryValueEnumerator("1", "2", "3", "4")); + assertEquals(info1.getResourcePath(), info4.getResourcePath()); + + Dictionary dict2 = DictionaryGenerator.buildDictionary(col.getType(), + new IterableDictionaryValueEnumerator("1", "2", "3", "4")); DictionaryInfo info5 = dictMgr.saveDictionary(col, MockupReadableTable.newNonExistTable("/a/path"), dict2); - assertTrue(info1 != info5); + assertNotEquals(info1.getResourcePath(), info5.getResourcePath()); } } diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/SnapshotManagerTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/SnapshotManagerTest.java new file mode 100644 index 00000000000..ab6ce13e5ba --- /dev/null +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/SnapshotManagerTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.dict.lookup; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.dict.MockupReadableTable; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.IReadableTable; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class SnapshotManagerTest extends LocalFileMetadataTestCase { + + private KylinConfig kylinConfig; + private SnapshotManager snapshotManager; + List expect; + List dif; + // test data for scd1 + List contentAtTime1; + List contentAtTime2; + List contentAtTime3; + + @Before + public void setup() throws Exception { + this.createTestMetadata(); + String[] s1 = new String[] { "1", "CN" }; + String[] s2 = new String[] { "2", "NA" }; + String[] s3 = new String[] { "3", "NA" }; + String[] s4 = new String[] { "4", "KR" }; + String[] s5 = new String[] { "5", "JP" }; + String[] s6 = new String[] { "6", "CA" }; + expect = Lists.newArrayList(s1, s2, s3, s4, s5); + dif = Lists.newArrayList(s1, s2, s3, s4, s6); + + contentAtTime1 = Lists.newArrayList(s1, s2, s3, s4, s5, s6); + String[] s22 = new String[] { "2", "SP" }; + contentAtTime2 = Lists.newArrayList(s1, s22, s3, s4, s5); + String[] s23 = new String[] { "2", "US" }; + contentAtTime3 = Lists.newArrayList(s1, s23, s3, s4, s5); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + private TableDesc genTableDesc(String tableName) { + TableDesc table = TableDesc.mockup(tableName); + ColumnDesc desc1 = new ColumnDesc("1", "id", "string", null, null, null, null); + desc1.setId("1"); + desc1.setDatatype("long"); + ColumnDesc desc2 = new ColumnDesc("2", "country", "string", null, null, null, null); + desc2.setId("2"); + desc2.setDatatype("string"); + ColumnDesc[] columns = { desc1, desc2 }; + table.setColumns(columns); + table.init(kylinConfig, "default"); + return table; + } + + private IReadableTable genTable(String path, List content) { + IReadableTable.TableSignature signature = new IReadableTable.TableSignature(path, content.size(), 0); + return new MockupReadableTable(content, signature, true); + } + + @Test + public void testCheckByContent() throws IOException, InterruptedException { + runTestCase(); + } + + public void runTestCase() throws IOException, InterruptedException { + kylinConfig = KylinConfig.getInstanceFromEnv(); + snapshotManager = SnapshotManager.getInstance(kylinConfig); + SnapshotTable origin = snapshotManager.buildSnapshot(genTable("./origin", expect), genTableDesc("TEST_TABLE"), + kylinConfig); + + // sleep 1 second to avoid file resource store precision lost + Thread.sleep(1000); + SnapshotTable dup = snapshotManager.buildSnapshot(genTable("./dup", expect), genTableDesc("TEST_TABLE"), + kylinConfig); + // assert same snapshot file + Assert.assertEquals(origin.getUuid(), dup.getUuid()); + Assert.assertEquals(origin.getResourcePath(), dup.getResourcePath()); + + // assert the file has been updated + long originLastModified = origin.getLastModified(); + long dupLastModified = dup.getLastModified(); + Assert.assertTrue(dupLastModified > originLastModified); + + SnapshotTable actual = snapshotManager.getSnapshotTable(origin.getResourcePath()); + IReadableTable.TableReader reader = actual.getReader(); + Assert.assertEquals(expect.size(), actual.getRowCount()); + int i = 0; + while (reader.next()) { + Assert.assertEquals(stringJoin(expect.get(i++)), stringJoin(reader.getRow())); + } + + SnapshotTable difTable = snapshotManager.buildSnapshot(genTable("./dif", dif), genTableDesc("TEST_TABLE"), + kylinConfig); + Assert.assertNotEquals(origin.getUuid(), difTable.getUuid()); + } + + @Test + public void testBuildSameSnapshotSameTime() throws InterruptedException, IOException { + final int threadCount = 3; + final ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + final CountDownLatch countDownLatch = new CountDownLatch(threadCount); + final TableDesc tableDesc = genTableDesc("TEST_TABLE"); + + kylinConfig = KylinConfig.getInstanceFromEnv(); + snapshotManager = SnapshotManager.getInstance(kylinConfig); + ResourceStore store = ResourceStore.getStore(kylinConfig); + + for (int i = 0; i < threadCount; ++i) { + executorService.submit(new Runnable() { + @Override + public void run() { + try { + snapshotManager.buildSnapshot(genTable("./origin", expect), tableDesc, kylinConfig); + } catch (IOException e) { + Assert.fail(); + } finally { + countDownLatch.countDown(); + } + } + }); + } + countDownLatch.await(); + Assert.assertEquals(1, store.listResources("/table_snapshot/NULL.TEST_TABLE").size()); + } + + private String stringJoin(String[] strings) { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < strings.length; i++) { + builder.append(strings[i]); + if (i < strings.length - 1) { + builder.append(","); + } + } + return builder.toString(); + } +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java index aeb7b120fcc..6ab4976c095 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java @@ -91,13 +91,15 @@ public Dictionary getDictionary(TblColRef col) throws IOException { } FileSystem fs = HadoopUtil.getWorkingFileSystem(); - Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX); + Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, + col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX); if (dictFile == null) { - logger.info("Dict for '" + col.getName() + "' not pre-built."); + logger.info("Dict for '{}' not pre-built.", col.getName()); return null; } - try (SequenceFile.Reader reader = new SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(), SequenceFile.Reader.file(dictFile))) { + try (SequenceFile.Reader reader = new SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(), + SequenceFile.Reader.file(dictFile))) { NullWritable key = NullWritable.get(); ArrayPrimitiveWritable value = new ArrayPrimitiveWritable(); reader.next(key, value); @@ -107,7 +109,7 @@ public Dictionary getDictionary(TblColRef col) throws IOException { String dictClassName = is.readUTF(); Dictionary dict = (Dictionary) ClassUtil.newInstance(dictClassName); dict.readFields(is); - logger.info("DictionaryProvider read dict from file: " + dictFile); + logger.info("DictionaryProvider read dict from file: {}", dictFile); return dict; } } diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java index dd84bd6c893..e432cdb978f 100755 --- a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java @@ -19,7 +19,7 @@ package org.apache.kylin.cube; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotEquals; import java.io.BufferedWriter; import java.io.File; @@ -73,14 +73,17 @@ public void basic() throws Exception { DictionaryInfo info1 = dictMgr.buildDictionary(col, mockupData.getDistinctValuesFor(col)); System.out.println(JsonUtil.writeValueAsIndentString(info1)); + Thread.sleep(1000); + DictionaryInfo info2 = dictMgr.buildDictionary(col, mockupData.getDistinctValuesFor(col)); System.out.println(JsonUtil.writeValueAsIndentString(info2)); // test check duplicate - assertTrue(info1.getUuid() == info2.getUuid()); - assertTrue(info1 == dictMgr.getDictionaryInfo(info1.getResourcePath())); - assertTrue(info2 == dictMgr.getDictionaryInfo(info2.getResourcePath())); - assertTrue(info1.getDictionaryObject() == info2.getDictionaryObject()); + assertEquals(info1.getUuid(), info2.getUuid()); + assertEquals(info1.getResourcePath(), info1.getResourcePath()); + assertNotEquals(info1.getLastModified(), info2.getLastModified()); + assertNotEquals(info1, info2); + assertEquals(info1.getDictionaryObject(), info2.getDictionaryObject()); // verify dictionary entries @SuppressWarnings("unchecked") diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java index e11fe74c876..360818ba92c 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java +++ b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java @@ -20,13 +20,14 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.NavigableSet; import java.util.Set; import java.util.TreeSet; import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -39,12 +40,14 @@ import org.apache.kylin.dict.DictionaryInfo; import org.apache.kylin.dict.DictionaryInfoSerializer; import org.apache.kylin.job.dao.ExecutableDao; +import org.apache.kylin.job.dao.ExecutableOutputPO; import org.apache.kylin.job.dao.ExecutablePO; +import org.apache.kylin.job.exception.PersistentException; import org.apache.kylin.job.execution.ExecutableState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; public class MetadataCleanupJob { @@ -56,37 +59,39 @@ public class MetadataCleanupJob { // ============================================================================ final KylinConfig config; - - private List garbageResources = Collections.emptyList(); - + + private Map garbageResources = Maps.newHashMap(); + private ResourceStore store; + public MetadataCleanupJob() { this(KylinConfig.getInstanceFromEnv()); } - + public MetadataCleanupJob(KylinConfig config) { this.config = config; + this.store = ResourceStore.getStore(config); } - - public List getGarbageResources() { + + public Map getGarbageResources() { return garbageResources; } // function entrance - public List cleanup(boolean delete, int jobOutdatedDays) throws Exception { + public Map cleanup(boolean delete, int jobOutdatedDays) throws Exception { CubeManager cubeManager = CubeManager.getInstance(config); - ResourceStore store = ResourceStore.getStore(config); long newResourceTimeCut = System.currentTimeMillis() - NEW_RESOURCE_THREADSHOLD_MS; FileSystem fs = HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration()); - List toDeleteCandidates = Lists.newArrayList(); + Map toDeleteCandidates = Maps.newHashMap(); // two level resources, snapshot tables and cube statistics for (String resourceRoot : new String[] { ResourceStore.SNAPSHOT_RESOURCE_ROOT, - ResourceStore.CUBE_STATISTICS_ROOT, ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT}) { + ResourceStore.CUBE_STATISTICS_ROOT, ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT }) { for (String dir : noNull(store.listResources(resourceRoot))) { for (String res : noNull(store.listResources(dir))) { - if (store.getResourceTimestamp(res) < newResourceTimeCut) - toDeleteCandidates.add(res); + long timestamp = getTimestamp(res); + if (timestamp < newResourceTimeCut) + toDeleteCandidates.put(res, timestamp); } } } @@ -94,14 +99,18 @@ public List cleanup(boolean delete, int jobOutdatedDays) throws Exceptio // find all of the global dictionaries in HDFS try { FileStatus[] fStatus = new FileStatus[0]; - fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict/dict"))); - fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/SegmentDict/dict"))); + fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path( + KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict/dict"))); + fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path( + KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/SegmentDict/dict"))); for (FileStatus status : fStatus) { String path = status.getPath().toString(); FileStatus[] globalDicts = fs.listStatus(new Path(path)); for (FileStatus globalDict : globalDicts) { String globalDictPath = globalDict.getPath().toString(); - toDeleteCandidates.add(globalDictPath); + long timestamp = getTimestamp(globalDict); + if (timestamp < newResourceTimeCut) + toDeleteCandidates.put(globalDictPath, timestamp); } } } catch (FileNotFoundException e) { @@ -113,8 +122,9 @@ public List cleanup(boolean delete, int jobOutdatedDays) throws Exceptio for (String dir : noNull(store.listResources(resourceRoot))) { for (String dir2 : noNull(store.listResources(dir))) { for (String res : noNull(store.listResources(dir2))) { - if (store.getResourceTimestamp(res) < newResourceTimeCut) - toDeleteCandidates.add(res); + long timestamp = getTimestamp(res); + if (timestamp < newResourceTimeCut) + toDeleteCandidates.put(res, timestamp); } } } @@ -130,13 +140,16 @@ public List cleanup(boolean delete, int jobOutdatedDays) throws Exceptio activeResources.add(segment.getStatisticsResourcePath()); for (String dictPath : segment.getDictionaryPaths()) { DictionaryInfo dictInfo = store.getResource(dictPath, DictionaryInfoSerializer.FULL_SERIALIZER); - if ("org.apache.kylin.dict.AppendTrieDictionary".equals(dictInfo != null ? dictInfo.getDictionaryClass() : null)){ + if ("org.apache.kylin.dict.AppendTrieDictionary" + .equals(dictInfo != null ? dictInfo.getDictionaryClass() : null)) { String dictObj = dictInfo.getDictionaryObject().toString(); String basedir = dictObj.substring(dictObj.indexOf("(") + 1, dictObj.indexOf(")") - 1); - if (basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict")) { + if (basedir.startsWith( + KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict")) { activeResources.add(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + dictInfo.getResourceDir()); - } else if (basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/SegmentDict")) { + } else if (basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + + "/resources/SegmentDict")) { activeResources.add(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/SegmentDict" + dictInfo.getResourceDir()); } @@ -144,7 +157,7 @@ public List cleanup(boolean delete, int jobOutdatedDays) throws Exceptio } } } - toDeleteCandidates.removeAll(activeResources); + toDeleteCandidates.keySet().removeAll(activeResources); // delete old and completed jobs long outdatedJobTimeCut = System.currentTimeMillis() - jobOutdatedDays * 24 * 3600 * 1000L; @@ -152,49 +165,77 @@ public List cleanup(boolean delete, int jobOutdatedDays) throws Exceptio List allExecutable = executableDao.getJobs(); for (ExecutablePO executable : allExecutable) { long lastModified = executable.getLastModified(); - String jobStatus = executableDao.getJobOutput(executable.getUuid()).getStatus(); - - if (lastModified < outdatedJobTimeCut && (ExecutableState.SUCCEED.toString().equals(jobStatus) - || ExecutableState.DISCARDED.toString().equals(jobStatus))) { - toDeleteCandidates.add(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + executable.getUuid()); - toDeleteCandidates.add(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + executable.getUuid()); + if (lastModified < outdatedJobTimeCut && isJobComplete(executableDao, executable)) { + String jobResPath = ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + executable.getUuid(); + String jobOutputResPath = ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + executable.getUuid(); + long outputLastModified = getTimestamp(jobOutputResPath); + toDeleteCandidates.put(jobResPath, lastModified); + toDeleteCandidates.put(jobOutputResPath, outputLastModified); - for (ExecutablePO task : executable.getTasks()) { - toDeleteCandidates.add(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + task.getUuid()); + List tasks = executable.getTasks(); + if (tasks != null && !tasks.isEmpty()) { + for (ExecutablePO task : executable.getTasks()) { + String taskId = task.getUuid(); + if (StringUtils.isNotBlank(taskId)) { + String resPath = ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + task.getUuid(); + long timestamp = getTimestamp(resPath); + toDeleteCandidates.put(resPath, timestamp); + } + } } } } - + garbageResources = cleanupConclude(delete, toDeleteCandidates); return garbageResources; } - private List cleanupConclude(boolean delete, List toDeleteResources) throws IOException { + private boolean isJobComplete(ExecutableDao executableDao, ExecutablePO job) { + String jobId = job.getUuid(); + boolean isComplete = false; + try { + ExecutableOutputPO output = executableDao.getJobOutput(jobId); + String status = output.getStatus(); + if (StringUtils.equals(status, ExecutableState.SUCCEED.toString()) + || StringUtils.equals(status, ExecutableState.DISCARDED.toString())) { + isComplete = true; + } + } catch (PersistentException e) { + logger.error("Get job output failed for job uuid: {}", jobId, e); + isComplete = true; // job output broken --> will be treat as complete + } + + return isComplete; + } + + private Map cleanupConclude(boolean delete, Map toDeleteResources) throws IOException { if (toDeleteResources.isEmpty()) { logger.info("No metadata resource to clean up"); return toDeleteResources; } - - logger.info(toDeleteResources.size() + " metadata resource to clean up"); + + logger.info("{} metadata resource to clean up", toDeleteResources.size()); if (delete) { ResourceStore store = ResourceStore.getStore(config); FileSystem fs = HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration()); - for (String res : toDeleteResources) { - logger.info("Deleting metadata " + res); + for (String res : toDeleteResources.keySet()) { + long timestamp = toDeleteResources.get(res); + logger.info("Deleting metadata=[resource_path: {}, timestamp: {}]", res, timestamp); try { if (res.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory())) { fs.delete(new Path(res), true); } else { - store.deleteResource(res); + store.deleteResource(res, timestamp); } } catch (IOException e) { - logger.error("Failed to delete resource " + res, e); + logger.error("Failed to delete metadata=[resource_path: {}, timestamp: {}] ", res, timestamp, e); } } } else { - for (String res : toDeleteResources) { - logger.info("Dry run, pending delete metadata " + res); + for (String res : toDeleteResources.keySet()) { + long timestamp = toDeleteResources.get(res); + logger.info("Dry run, pending delete metadata=[resource_path: {}, timestamp: {}] ", res, timestamp); } } return toDeleteResources; @@ -204,4 +245,17 @@ private NavigableSet noNull(NavigableSet list) { return (list == null) ? new TreeSet() : list; } + private long getTimestamp(String resPath) { + long timestamp = Long.MAX_VALUE; + try { + timestamp = store.getResourceTimestamp(resPath); + } catch (IOException e) { + logger.warn("Failed to get resource timestamp from remote resource store, details:{}", e); + } + return timestamp; + } + + private long getTimestamp(FileStatus filestatus) { + return filestatus.getModificationTime(); + } } diff --git a/server-base/src/test/java/org/apache/kylin/rest/job/MetadataCleanupJobTest.java b/server-base/src/test/java/org/apache/kylin/rest/job/MetadataCleanupJobTest.java index fa85a1e782a..ad819e6b6fc 100644 --- a/server-base/src/test/java/org/apache/kylin/rest/job/MetadataCleanupJobTest.java +++ b/server-base/src/test/java/org/apache/kylin/rest/job/MetadataCleanupJobTest.java @@ -25,7 +25,7 @@ import java.io.File; import java.io.IOException; import java.util.Collection; -import java.util.List; +import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.kylin.common.util.LocalFileMetadataTestCase; @@ -42,18 +42,22 @@ public void after() throws Exception { @Test public void testCleanUp() throws Exception { - staticCreateTestMetadata(false, new ResetTimeHook(1, "src/test/resources/test_meta")); + // file resource store may lose timestamp precision with millis second, set last modified as 2000 + staticCreateTestMetadata(false, new ResetTimeHook(2000, "src/test/resources/test_meta")); MetadataCleanupJob metadataCleanupJob = new MetadataCleanupJob(); - List cleanupList = metadataCleanupJob.cleanup(false, 30); - Assert.assertEquals(7, cleanupList.size()); + Map cleanupMap = metadataCleanupJob.cleanup(false, 30); + Assert.assertEquals(7, cleanupMap.size()); + for (long timestamp : cleanupMap.values()) { + Assert.assertEquals(2000, timestamp); + } } @Test public void testNotCleanUp() throws Exception { staticCreateTestMetadata(false, new ResetTimeHook(System.currentTimeMillis(), "src/test/resources/test_meta")); MetadataCleanupJob metadataCleanupJob = new MetadataCleanupJob(); - List cleanupList = metadataCleanupJob.cleanup(false, 30); - Assert.assertEquals(0, cleanupList.size()); + Map cleanupMap = metadataCleanupJob.cleanup(false, 30); + Assert.assertEquals(0, cleanupMap.size()); } private class ResetTimeHook extends LocalFileMetadataTestCase.OverlayMetaHook { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index 14c5ea7cf0d..ecd698ff251 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -147,7 +147,7 @@ public String getMetaStoreUUID() throws IOException { @Override protected void visitFolderImpl(String folderPath, final boolean recursive, VisitFilter filter, - final boolean loadContent, final Visitor visitor) throws IOException { + final boolean loadContent, final Visitor visitor) throws IOException { visitFolder(folderPath, filter, loadContent, new FolderVisitor() { @Override @@ -167,7 +167,8 @@ public void visit(String childPath, String fullPath, Result hbaseResult) throws }); } - private void visitFolder(String folderPath, VisitFilter filter, boolean loadContent, FolderVisitor visitor) throws IOException { + private void visitFolder(String folderPath, VisitFilter filter, boolean loadContent, FolderVisitor visitor) + throws IOException { assert folderPath.startsWith("/"); String folderPrefix = folderPath.endsWith("/") ? folderPath : folderPath + "/"; @@ -244,7 +245,7 @@ private FilterList generateTimeFilterList(VisitFilter visitFilter) { CompareFilter.CompareOp.LESS, Bytes.toBytes(visitFilter.lastModEndExclusive)); filterList.addFilter(timeEndFilter); } - return filterList.getFilters().size() == 0 ? null : filterList; + return filterList.getFilters().isEmpty() ? null : filterList; } private InputStream getInputStream(String resPath, Result r) throws IOException { @@ -352,18 +353,40 @@ protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldT } @Override - protected void deleteResourceImpl(String resPath) throws IOException { + protected void updateTimestampImpl(String resPath, long timestamp) throws IOException { Table table = getConnection().getTable(TableName.valueOf(tableName)); try { - boolean hdfsResourceExist = false; - Result result = internalGetFromHTable(table, resPath, true, false); - if (result != null) { - byte[] value = result.getValue(B_FAMILY, B_COLUMN); - if (value != null && value.length == 0) { - hdfsResourceExist = true; - } + boolean hdfsResourceExist = isHdfsResourceExist(table, resPath); + long oldTS = getResourceLastModified(table, resPath); + + byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS); + byte[] row = Bytes.toBytes(resPath); + Put put = new Put(row); + put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(timestamp)); + + boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put); + logger.trace("Update row {} from oldTs: {}, to newTs: {}, operation result: {}", resPath, oldTS, timestamp, + ok); + if (!ok) { + long real = getResourceTimestampImpl(resPath); + throw new WriteConflictException( + "Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real); } + if (hdfsResourceExist) { // update timestamp in hdfs + updateTimestampPushdown(resPath, timestamp); + } + } finally { + IOUtils.closeQuietly(table); + } + } + + @Override + protected void deleteResourceImpl(String resPath) throws IOException { + Table table = getConnection().getTable(TableName.valueOf(tableName)); + try { + boolean hdfsResourceExist = isHdfsResourceExist(table, resPath); + Delete del = new Delete(Bytes.toBytes(resPath)); table.delete(del); @@ -375,6 +398,43 @@ protected void deleteResourceImpl(String resPath) throws IOException { } } + @Override + protected void deleteResourceImpl(String resPath, long timestamp) throws IOException { + Table table = getConnection().getTable(TableName.valueOf(tableName)); + try { + boolean hdfsResourceExist = isHdfsResourceExist(table, resPath); + long origLastModified = getResourceLastModified(table, resPath); + if (checkTimeStampBeforeDelete(origLastModified, timestamp)) { + Delete del = new Delete(Bytes.toBytes(resPath)); + table.delete(del); + + if (hdfsResourceExist) { // remove hdfs cell value + deletePushdown(resPath); + } + } + } finally { + IOUtils.closeQuietly(table); + } + } + + // to avoid get Table twice time to improve delete performance + private long getResourceLastModified(Table table, String resPath) throws IOException { + return getTimestamp(internalGetFromHTable(table, resPath, false, true)); + } + + private boolean isHdfsResourceExist(Table table, String resPath) throws IOException { + boolean hdfsResourceExist = false; + Result result = internalGetFromHTable(table, resPath, true, false); + if (result != null) { + byte[] contentVal = result.getValue(B_FAMILY, B_COLUMN); + if (contentVal != null && contentVal.length == 0) { + hdfsResourceExist = true; + } + } + + return hdfsResourceExist; + } + @Override protected String getReadableResourcePathImpl(String resPath) { return tableName + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();