diff --git a/fe/src/main/java/org/apache/doris/catalog/Database.java b/fe/src/main/java/org/apache/doris/catalog/Database.java index 4444568b9a7036..69fc072ca04ece 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/src/main/java/org/apache/doris/catalog/Database.java @@ -311,18 +311,13 @@ public void dropTable(String tableName) { } public List getTables() { - List
tables = new ArrayList
(idToTable.values()); - return tables; + return new ArrayList
(idToTable.values()); } public Set getTableNamesWithLock() { readLock(); try { - Set tableNames = new HashSet(); - for (String name : this.nameToTable.keySet()) { - tableNames.add(name); - } - return tableNames; + return new HashSet(this.nameToTable.keySet()); } finally { readUnlock(); } diff --git a/fe/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java b/fe/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java index 8d9f9fb70fc63a..b033c9942ea0f2 100644 --- a/fe/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java +++ b/fe/src/main/java/org/apache/doris/consistency/CheckConsistencyJob.java @@ -204,8 +204,7 @@ public boolean sendTasks() { if (maxDataSize > 0) { timeoutMs = maxDataSize / 1000 / 1000 / 1000 * CHECK_CONSISTENCT_TIME_COST_PER_GIGABYTE_MS; } - timeoutMs = timeoutMs < Config.check_consistency_default_timeout_second * 1000L - ? Config.check_consistency_default_timeout_second * 1000L : timeoutMs; + timeoutMs = Math.max(timeoutMs, Config.check_consistency_default_timeout_second * 1000L); state = JobState.RUNNING; } @@ -289,8 +288,8 @@ public synchronized int tryFinishJob() { // check if schema has changed if (checkedSchemaHash != olapTable.getSchemaHashByIndexId(tabletMeta.getIndexId())) { - LOG.info("index[{}]'s schema hash has been changed. [{} -> {}]. retry", checkedSchemaHash, - olapTable.getSchemaHashByIndexId(tabletMeta.getIndexId())); + LOG.info("index[{}]'s schema hash has been changed. [{} -> {}]. retry", tabletMeta.getIndexId(), + checkedSchemaHash, olapTable.getSchemaHashByIndexId(tabletMeta.getIndexId())); return -1; } diff --git a/fe/src/main/java/org/apache/doris/consistency/ConsistencyChecker.java b/fe/src/main/java/org/apache/doris/consistency/ConsistencyChecker.java index 023958a92514c9..27610afd3f75ae 100644 --- a/fe/src/main/java/org/apache/doris/consistency/ConsistencyChecker.java +++ b/fe/src/main/java/org/apache/doris/consistency/ConsistencyChecker.java @@ -17,6 +17,7 @@ package org.apache.doris.consistency; +import com.google.common.collect.Lists; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.MaterializedIndex; @@ -54,12 +55,8 @@ public class ConsistencyChecker extends MasterDaemon { private static final int MAX_JOB_NUM = 100; - public static final Comparator COMPARATOR = new Comparator() { - @Override - public int compare(MetaObject first, MetaObject second) { - return Long.signum(first.getLastCheckTime() - second.getLastCheckTime()); - } - }; + private static final Comparator COMPARATOR = + (first, second) -> Long.signum(first.getLastCheckTime() - second.getLastCheckTime()); // tabletId -> job private Map jobs; @@ -112,18 +109,13 @@ private boolean initWorkTime() { @Override protected void runAfterCatalogReady() { - if (itsTime()) { - // for each round. try chose enough new tablets to check - // only add new job when it's work time - while (getJobNum() < MAX_JOB_NUM) { - long chosenTabletId = chooseTablet(); - if (chosenTabletId == -1L) { - LOG.info("no tablet is chosen to check consistency"); - break; - } else { - CheckConsistencyJob job = new CheckConsistencyJob(chosenTabletId); - addJob(job); - } + // for each round. try chose enough new tablets to check + // only add new job when it's work time + if (itsTime() && getJobNum() == 0) { + List chosenTabletIds = chooseTablets(); + for(Long tabletId: chosenTabletIds) { + CheckConsistencyJob job = new CheckConsistencyJob(tabletId); + addJob(job); } } @@ -239,14 +231,16 @@ private int getJobNum() { * we use a priority queue to sort db/table/partition/index/tablet by 'lastCheckTime'. * chose a tablet which has the smallest 'lastCheckTime'. */ - private long chooseTablet() { + private List chooseTablets() { Catalog catalog = Catalog.getInstance(); MetaObject chosenOne = null; + List chosenTablets = Lists.newArrayList(); + // sort dbs List dbIds = catalog.getDbIds(); if (dbIds.isEmpty()) { - return -1L; + return chosenTablets; } Queue dbQueue = new PriorityQueue(dbIds.size(), COMPARATOR); for (Long dbId : dbIds) { @@ -269,8 +263,9 @@ private long chooseTablet() { db.readLock(); try { // sort tables - Queue tableQueue = new PriorityQueue(1, COMPARATOR); - for (Table table : db.getTables()) { + List
tables = db.getTables(); + Queue tableQueue = new PriorityQueue(tables.size(), COMPARATOR); + for (Table table : tables) { if (table.getType() != TableType.OLAP) { continue; } @@ -282,7 +277,7 @@ private long chooseTablet() { // sort partitions Queue partitionQueue = - new PriorityQueue(1, COMPARATOR); + new PriorityQueue<>(table.getPartitions().size(), COMPARATOR); for (Partition partition : table.getPartitions()) { // check partition's replication num. if 1 replication. skip if (table.getPartitionInfo().getReplicationNum(partition.getId()) == (short) 1) { @@ -303,21 +298,16 @@ private long chooseTablet() { Partition partition = (Partition) chosenOne; // sort materializedIndices - Queue indexQueue = - new PriorityQueue(1, COMPARATOR); - for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) { - indexQueue.add(index); - } + List visibleIndexs = partition.getMaterializedIndices(IndexExtState.VISIBLE); + Queue indexQueue = new PriorityQueue(visibleIndexs.size(), COMPARATOR); + indexQueue.addAll(visibleIndexs); while ((chosenOne = indexQueue.poll()) != null) { MaterializedIndex index = (MaterializedIndex) chosenOne; // sort tablets - Queue tabletQueue = - new PriorityQueue(1, COMPARATOR); - for (Tablet oneTablet : index.getTablets()) { - tabletQueue.add(oneTablet); - } + Queue tabletQueue = new PriorityQueue(index.getTablets().size(), COMPARATOR); + tabletQueue.addAll(index.getTablets()); while ((chosenOne = tabletQueue.poll()) != null) { Tablet tablet = (Tablet) chosenOne; @@ -334,17 +324,20 @@ private long chooseTablet() { LOG.debug("tablet[{}]'s version[{}-{}] has been checked. ignore", chosenTabletId, tablet.getCheckedVersion(), tablet.getCheckedVersionHash()); - continue; } } else { LOG.info("chose tablet[{}-{}-{}-{}-{}] to check consistency", db.getId(), table.getId(), partition.getId(), index.getId(), chosenTabletId); - return chosenTabletId; + chosenTablets.add(chosenTabletId); } } // end while tabletQueue } // end while indexQueue - } // end while partitionQueue + + if (chosenTablets.size() >= MAX_JOB_NUM) { + return chosenTablets; + } + } // end while partitionQueue } // end while tableQueue } finally { db.readUnlock(); @@ -354,7 +347,7 @@ private long chooseTablet() { jobsLock.readLock().unlock(); } - return -1L; + return chosenTablets; } public void handleFinishedConsistencyCheck(CheckConsistencyTask task, long checksum) {