From ce000fda5bc550b83241489818a2ce9dd657470e Mon Sep 17 00:00:00 2001 From: "yaqian.zhang" <598593183@qq.com> Date: Mon, 13 Jan 2020 17:36:35 +0800 Subject: [PATCH] KYLIN-4291 Parallel segment building may causes WriteConflictException --- .../apache/kylin/dict/DictionaryManager.java | 31 +++++++++++++++---- .../kylin/dict/lookup/SnapshotManager.java | 20 ++++++++++-- 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java index ffee105c7ff..6b23a9ec51a 100755 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.WriteConflictException; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.JsonUtil; @@ -193,12 +194,24 @@ private DictionaryInfo updateExistingDictLastModifiedTime(String dictPath) throw ResourceStore store = getStore(); if (StringUtils.isBlank(dictPath)) return NONE_INDICATOR; - long now = System.currentTimeMillis(); - store.updateTimestamp(dictPath, now); - logger.info("Update dictionary {} lastModifiedTime to {}", dictPath, now); - DictionaryInfo dictInfo = load(dictPath, true); - updateDictCache(dictInfo); - return dictInfo; + + int retry = 7; + while (retry-- > 0) { + try { + long now = System.currentTimeMillis(); + store.updateTimestamp(dictPath, now); + logger.info("Update dictionary {} lastModifiedTime to {}", dictPath, now); + return loadAndUpdateLocalCache(dictPath); + } catch (WriteConflictException e) { + if (retry <= 0) { + logger.error("Retry is out, till got error, abandoning...", e); + throw e; + } + logger.warn("Write conflict to update dictionary " + dictPath + " retry remaining " + retry + + ", will retry..."); + } + } + return loadAndUpdateLocalCache(dictPath); } private void initDictInfo(Dictionary newDict, DictionaryInfo newDictInfo) { @@ -411,6 +424,12 @@ void save(DictionaryInfo dict) throws IOException { store.putBigResource(path, dict, System.currentTimeMillis(), DictionaryInfoSerializer.FULL_SERIALIZER); } + private DictionaryInfo loadAndUpdateLocalCache(String dictPath) throws IOException { + DictionaryInfo dictInfo = load(dictPath, true); + updateDictCache(dictInfo); + return dictInfo; + } + DictionaryInfo load(String resourcePath, boolean loadDictObj) throws IOException { ResourceStore store = getStore(); diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java index 76a3df9a1f1..9d591b503a9 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java @@ -27,6 +27,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.WriteConflictException; import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.source.IReadableTable; @@ -236,10 +237,23 @@ private String checkDupByContent(SnapshotTable snapshot) throws IOException { private SnapshotTable updateDictLastModifiedTime(String snapshotPath) throws IOException { ResourceStore store = getStore(); - long now = System.currentTimeMillis(); - store.updateTimestamp(snapshotPath, now); - logger.info("Update snapshotTable {} lastModifiedTime to {}", snapshotPath, now); + int retry = 7; + while (retry-- > 0) { + try { + long now = System.currentTimeMillis(); + store.updateTimestamp(snapshotPath, now); + logger.info("Update snapshotTable {} lastModifiedTime to {}", snapshotPath, now); + return loadAndUpdateLocalCache(snapshotPath); + } catch (WriteConflictException e) { + if (retry <= 0) { + logger.error("Retry is out, till got error, abandoning...", e); + throw e; + } + logger.warn("Write conflict to update snapshotTable " + snapshotPath + " retry remaining " + retry + + ", will retry..."); + } + } // update cache return loadAndUpdateLocalCache(snapshotPath); }