Skip to content

Commit

Permalink
KYLIN-4291 Parallel segment building may causes WriteConflictException
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangayqian authored and hit-lacus committed Jun 15, 2020
1 parent 7d0abb1 commit ce000fd
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.WriteConflictException;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.JsonUtil;
Expand Down Expand Up @@ -193,12 +194,24 @@ private DictionaryInfo updateExistingDictLastModifiedTime(String dictPath) throw
ResourceStore store = getStore();
if (StringUtils.isBlank(dictPath))
return NONE_INDICATOR;
long now = System.currentTimeMillis();
store.updateTimestamp(dictPath, now);
logger.info("Update dictionary {} lastModifiedTime to {}", dictPath, now);
DictionaryInfo dictInfo = load(dictPath, true);
updateDictCache(dictInfo);
return dictInfo;

int retry = 7;
while (retry-- > 0) {
try {
long now = System.currentTimeMillis();
store.updateTimestamp(dictPath, now);
logger.info("Update dictionary {} lastModifiedTime to {}", dictPath, now);
return loadAndUpdateLocalCache(dictPath);
} catch (WriteConflictException e) {
if (retry <= 0) {
logger.error("Retry is out, till got error, abandoning...", e);
throw e;
}
logger.warn("Write conflict to update dictionary " + dictPath + " retry remaining " + retry
+ ", will retry...");
}
}
return loadAndUpdateLocalCache(dictPath);
}

private void initDictInfo(Dictionary<String> newDict, DictionaryInfo newDictInfo) {
Expand Down Expand Up @@ -411,6 +424,12 @@ void save(DictionaryInfo dict) throws IOException {
store.putBigResource(path, dict, System.currentTimeMillis(), DictionaryInfoSerializer.FULL_SERIALIZER);
}

private DictionaryInfo loadAndUpdateLocalCache(String dictPath) throws IOException {
DictionaryInfo dictInfo = load(dictPath, true);
updateDictCache(dictInfo);
return dictInfo;
}

DictionaryInfo load(String resourcePath, boolean loadDictObj) throws IOException {
ResourceStore store = getStore();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.WriteConflictException;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.IReadableTable;
Expand Down Expand Up @@ -236,10 +237,23 @@ private String checkDupByContent(SnapshotTable snapshot) throws IOException {

private SnapshotTable updateDictLastModifiedTime(String snapshotPath) throws IOException {
ResourceStore store = getStore();
long now = System.currentTimeMillis();
store.updateTimestamp(snapshotPath, now);
logger.info("Update snapshotTable {} lastModifiedTime to {}", snapshotPath, now);

int retry = 7;
while (retry-- > 0) {
try {
long now = System.currentTimeMillis();
store.updateTimestamp(snapshotPath, now);
logger.info("Update snapshotTable {} lastModifiedTime to {}", snapshotPath, now);
return loadAndUpdateLocalCache(snapshotPath);
} catch (WriteConflictException e) {
if (retry <= 0) {
logger.error("Retry is out, till got error, abandoning...", e);
throw e;
}
logger.warn("Write conflict to update snapshotTable " + snapshotPath + " retry remaining " + retry
+ ", will retry...");
}
}
// update cache
return loadAndUpdateLocalCache(snapshotPath);
}
Expand Down

0 comments on commit ce000fd

Please sign in to comment.