Skip to content

Commit

Permalink
#KYLIN-3977 Aviod mistaken deleting dicts by storage cleanup while bu…
Browse files Browse the repository at this point in the history
…ilding jobs are running

* #KYLIN-3977, Aviod mistaken deleting dicts by storage cleanup while building jobs are running
  • Loading branch information
ZhengshuaiPENG authored and nichunen committed May 15, 2019
1 parent 6efcd79 commit bb44678
Show file tree
Hide file tree
Showing 18 changed files with 959 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class FileResourceStore extends ResourceStore {
public FileResourceStore(KylinConfig kylinConfig) {
super(kylinConfig);
root = new File(getPath(kylinConfig)).getAbsoluteFile();
if (root.exists() == false)
if (!root.exists())
throw new IllegalArgumentException(
"File not exist by '" + kylinConfig.getMetadataUrl() + "': " + root.getAbsolutePath());
}
Expand All @@ -60,7 +60,7 @@ protected boolean existsImpl(String resPath) throws IOException {

@Override
protected void visitFolderImpl(String folderPath, boolean recursive, VisitFilter filter, boolean loadContent,
Visitor visitor) throws IOException {
Visitor visitor) throws IOException {
if (--failVisitFolderCountDown == 0)
throw new IOException("for test");

Expand Down Expand Up @@ -177,6 +177,19 @@ protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldT
return f.lastModified();
}

@Override
protected void updateTimestampImpl(String resPath, long timestamp) throws IOException {
File f = file(resPath);
if (f.exists()) {
// note file timestamp may lose precision for last two digits of timestamp
boolean success = f.setLastModified(timestamp);
if (!success) {
throw new IOException(
"Update resource timestamp failed, resPath:" + resPath + ", timestamp: " + timestamp);
}
}
}

@Override
protected void deleteResourceImpl(String resPath) throws IOException {

Expand All @@ -189,6 +202,20 @@ protected void deleteResourceImpl(String resPath) throws IOException {
}
}

@Override
protected void deleteResourceImpl(String resPath, long timestamp) throws IOException {
File f = file(resPath);
try {
if (f.exists()) {
long origLastModified = getResourceTimestampImpl(resPath);
if (checkTimeStampBeforeDelete(origLastModified, timestamp))
FileUtils.forceDelete(f);
}
} catch (FileNotFoundException e) {
// FileNotFoundException is not a problem in case of delete
}
}

@Override
protected String getReadableResourcePathImpl(String resPath) {
return file(resPath).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public HDFSResourceStore(KylinConfig kylinConfig, StorageURL metadataUrl) throws

fs = HadoopUtil.getFileSystem(path);
Path metadataPath = new Path(path);
if (fs.exists(metadataPath) == false) {
if (!fs.exists(metadataPath)) {
logger.warn("Path not exist in HDFS, create it: {}. ", path);
createMetaFolder(metadataPath);
}
Expand Down Expand Up @@ -136,7 +136,7 @@ TreeSet<String> getAllFilePath(Path filePath, String resPathPrefix) throws IOExc

@Override
protected void visitFolderImpl(String folderPath, boolean recursive, VisitFilter filter, boolean loadContent,
Visitor visitor) throws IOException {
Visitor visitor) throws IOException {
Path p = getRealHDFSPath(folderPath);
if (!fs.exists(p) || !fs.isDirectory(p)) {
return;
Expand Down Expand Up @@ -247,6 +247,18 @@ protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldT
return newTS;
}

@Override
protected void updateTimestampImpl(String resPath, long timestamp) throws IOException {
try {
Path p = getRealHDFSPath(resPath);
if (fs.exists(p)) {
fs.setTimes(p, timestamp, -1);
}
} catch (Exception e) {
throw new IOException("Update resource timestamp fail", e);
}
}

@Override
protected void deleteResourceImpl(String resPath) throws IOException {
try {
Expand All @@ -258,6 +270,21 @@ protected void deleteResourceImpl(String resPath) throws IOException {
throw new IOException("Delete resource fail", e);
}
}

@Override
protected void deleteResourceImpl(String resPath, long timestamp) throws IOException {
try {
Path p = getRealHDFSPath(resPath);
if (fs.exists(p)) {
long origLastModified = fs.getFileStatus(p).getModificationTime();
if (checkTimeStampBeforeDelete(origLastModified, timestamp))
fs.delete(p, true);
}
} catch (Exception e) {
throw new IOException("Delete resource fail", e);
}
}

@Override
protected String getReadableResourcePathImpl(String resPath) {
return getRealHDFSPath(resPath).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public class JDBCResourceSQL {
final private String metaTableTs;
final private String metaTableContent;

public JDBCResourceSQL(String dialect, String tableName, String metaTableKey, String metaTableTs, String metaTableContent) {
public JDBCResourceSQL(String dialect, String tableName, String metaTableKey, String metaTableTs,
String metaTableContent) {
this.format = JDBCSqlQueryFormatProvider.createJDBCSqlQueriesFormat(dialect);
this.tableName = tableName;
this.metaTableKey = metaTableKey;
Expand Down Expand Up @@ -96,8 +97,7 @@ public String getInsertSql() {
return sql;
}

@SuppressWarnings("unused")
private String getReplaceSqlWithoutContent() {
public String getReplaceSqlWithoutContent() {
final String sql = new MessageFormat(format.getReplaceSqlWithoutContent(), Locale.ROOT)
.format(new Object[] { tableName, metaTableTs, metaTableKey }, new StringBuffer(), new FieldPosition(0))
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void execute(Connection connection) throws SQLException {
pstat = connection.prepareStatement(createIndexSql);
pstat.executeUpdate();
} catch (SQLException ex) {
logger.error("Failed to create index on " + META_TABLE_TS, ex);
logger.error("Failed to create index on {}", META_TABLE_TS, ex);
}
}

Expand Down Expand Up @@ -171,7 +171,7 @@ private boolean isJsonMetadata(String resourcePath) {

@Override
protected void visitFolderImpl(final String folderPath, final boolean recursive, final VisitFilter filter,
final boolean loadContent, final Visitor visitor) throws IOException {
final boolean loadContent, final Visitor visitor) throws IOException {

try {
executeSql(new SqlOperation() {
Expand All @@ -184,8 +184,8 @@ public void execute(Connection connection) throws SQLException {
lookForPrefix = filter.pathPrefix;
}

if (isRootPath(folderPath)){
for (int i=0; i<tableNames.length; i++){
if (isRootPath(folderPath)) {
for (int i = 0; i < tableNames.length; i++) {
final String tableName = tableNames[i];
JDBCResourceSQL sqls = getJDBCResourceSQL(tableName);
String sql = sqls.getAllResourceSqlString(loadContent);
Expand All @@ -212,7 +212,7 @@ public void execute(Connection connection) throws SQLException {
}
}
}
}else{
} else {
JDBCResourceSQL sqls = getJDBCResourceSQL(getMetaTableName(folderPath));
String sql = sqls.getAllResourceSqlString(loadContent);
pstat = connection.prepareStatement(sql);
Expand Down Expand Up @@ -502,8 +502,7 @@ public void execute(Connection connection) throws SQLException, IOException {
pushdown.close();
}
} else {
pstat2.setBinaryStream(1,
new BufferedInputStream(new ByteArrayInputStream(content)));
pstat2.setBinaryStream(1, new BufferedInputStream(new ByteArrayInputStream(content)));
pstat2.setString(2, resPath);
pstat2.executeUpdate();
}
Expand All @@ -516,6 +515,33 @@ public void execute(Connection connection) throws SQLException, IOException {
});
}

@Override
protected void updateTimestampImpl(final String resPath, final long timestamp) throws IOException {
try {
boolean skipHdfs = isJsonMetadata(resPath);
JDBCResourceSQL sqls = getJDBCResourceSQL(getMetaTableName(resPath));
executeSql(new SqlOperation() {
@Override
public void execute(Connection connection) throws SQLException {
pstat = connection.prepareStatement(sqls.getReplaceSqlWithoutContent());
pstat.setLong(1, timestamp);
pstat.setString(2, resPath);
pstat.executeUpdate();
}
});

if (!skipHdfs) {
try {
updateTimestampPushdown(resPath, timestamp);
} catch (Throwable e) {
throw new SQLException(e);
}
}
} catch (SQLException e) {
throw new IOException(e);
}
}

@Override
protected void deleteResourceImpl(final String resPath) throws IOException {
try {
Expand Down Expand Up @@ -543,6 +569,14 @@ public void execute(Connection connection) throws SQLException {
}
}

@Override
protected void deleteResourceImpl(String resPath, long timestamp) throws IOException {
// considering deletePushDown operation, check timestamp at the beginning
long origLastModified = getResourceTimestampImpl(resPath);
if (checkTimeStampBeforeDelete(origLastModified, timestamp))
deleteResourceImpl(resPath);
}

@Override
protected String getReadableResourcePathImpl(String resPath) {
return metadataIdentifier + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,19 @@ private void deletePushdownFile(Path path) throws IOException {
logger.debug("{} is not exists in the file system.", path);
}
}

protected void updateTimestampPushdown(String resPath, long timestamp) throws IOException {
updateTimestampPushdownFile(pushdownPath(resPath), timestamp);
}

private void updateTimestampPushdownFile(Path path, long timestamp) throws IOException {
FileSystem fileSystem = pushdownFS();

if (fileSystem.exists(path)) {
fileSystem.setTimes(path, timestamp, -1);
logger.debug("Update temp file timestamp success. Temp file: {} .", path);
} else {
logger.debug("{} is not exists in the file system.", path);
}
}
}
Loading

0 comments on commit bb44678

Please sign in to comment.