Skip to content

Commit

Permalink
Choose tablets in ConsistencyChecker in batch (apache#2736)
Browse files Browse the repository at this point in the history
  • Loading branch information
kangkaisen authored and imay committed Jan 11, 2020
1 parent 089b358 commit e00343b
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 48 deletions.
9 changes: 2 additions & 7 deletions fe/src/main/java/org/apache/doris/catalog/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,18 +311,13 @@ public void dropTable(String tableName) {
}

public List<Table> getTables() {
List<Table> tables = new ArrayList<Table>(idToTable.values());
return tables;
return new ArrayList<Table>(idToTable.values());
}

public Set<String> getTableNamesWithLock() {
readLock();
try {
Set<String> tableNames = new HashSet<String>();
for (String name : this.nameToTable.keySet()) {
tableNames.add(name);
}
return tableNames;
return new HashSet<String>(this.nameToTable.keySet());
} finally {
readUnlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,7 @@ public boolean sendTasks() {
if (maxDataSize > 0) {
timeoutMs = maxDataSize / 1000 / 1000 / 1000 * CHECK_CONSISTENCT_TIME_COST_PER_GIGABYTE_MS;
}
timeoutMs = timeoutMs < Config.check_consistency_default_timeout_second * 1000L
? Config.check_consistency_default_timeout_second * 1000L : timeoutMs;
timeoutMs = Math.max(timeoutMs, Config.check_consistency_default_timeout_second * 1000L);
state = JobState.RUNNING;
}

Expand Down Expand Up @@ -289,8 +288,8 @@ public synchronized int tryFinishJob() {

// check if schema has changed
if (checkedSchemaHash != olapTable.getSchemaHashByIndexId(tabletMeta.getIndexId())) {
LOG.info("index[{}]'s schema hash has been changed. [{} -> {}]. retry", checkedSchemaHash,
olapTable.getSchemaHashByIndexId(tabletMeta.getIndexId()));
LOG.info("index[{}]'s schema hash has been changed. [{} -> {}]. retry", tabletMeta.getIndexId(),
checkedSchemaHash, olapTable.getSchemaHashByIndexId(tabletMeta.getIndexId()));
return -1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.consistency;

import com.google.common.collect.Lists;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
Expand Down Expand Up @@ -54,12 +55,8 @@ public class ConsistencyChecker extends MasterDaemon {

private static final int MAX_JOB_NUM = 100;

public static final Comparator<MetaObject> COMPARATOR = new Comparator<MetaObject>() {
@Override
public int compare(MetaObject first, MetaObject second) {
return Long.signum(first.getLastCheckTime() - second.getLastCheckTime());
}
};
private static final Comparator<MetaObject> COMPARATOR =
(first, second) -> Long.signum(first.getLastCheckTime() - second.getLastCheckTime());

// tabletId -> job
private Map<Long, CheckConsistencyJob> jobs;
Expand Down Expand Up @@ -112,18 +109,13 @@ private boolean initWorkTime() {

@Override
protected void runAfterCatalogReady() {
if (itsTime()) {
// for each round. try chose enough new tablets to check
// only add new job when it's work time
while (getJobNum() < MAX_JOB_NUM) {
long chosenTabletId = chooseTablet();
if (chosenTabletId == -1L) {
LOG.info("no tablet is chosen to check consistency");
break;
} else {
CheckConsistencyJob job = new CheckConsistencyJob(chosenTabletId);
addJob(job);
}
// for each round. try chose enough new tablets to check
// only add new job when it's work time
if (itsTime() && getJobNum() == 0) {
List<Long> chosenTabletIds = chooseTablets();
for(Long tabletId: chosenTabletIds) {
CheckConsistencyJob job = new CheckConsistencyJob(tabletId);
addJob(job);
}
}

Expand Down Expand Up @@ -239,14 +231,16 @@ private int getJobNum() {
* we use a priority queue to sort db/table/partition/index/tablet by 'lastCheckTime'.
* chose a tablet which has the smallest 'lastCheckTime'.
*/
private long chooseTablet() {
private List<Long> chooseTablets() {
Catalog catalog = Catalog.getInstance();
MetaObject chosenOne = null;

List<Long> chosenTablets = Lists.newArrayList();

// sort dbs
List<Long> dbIds = catalog.getDbIds();
if (dbIds.isEmpty()) {
return -1L;
return chosenTablets;
}
Queue<MetaObject> dbQueue = new PriorityQueue<MetaObject>(dbIds.size(), COMPARATOR);
for (Long dbId : dbIds) {
Expand All @@ -269,8 +263,9 @@ private long chooseTablet() {
db.readLock();
try {
// sort tables
Queue<MetaObject> tableQueue = new PriorityQueue<MetaObject>(1, COMPARATOR);
for (Table table : db.getTables()) {
List<Table> tables = db.getTables();
Queue<MetaObject> tableQueue = new PriorityQueue<MetaObject>(tables.size(), COMPARATOR);
for (Table table : tables) {
if (table.getType() != TableType.OLAP) {
continue;
}
Expand All @@ -282,7 +277,7 @@ private long chooseTablet() {

// sort partitions
Queue<MetaObject> partitionQueue =
new PriorityQueue<MetaObject>(1, COMPARATOR);
new PriorityQueue<>(table.getPartitions().size(), COMPARATOR);
for (Partition partition : table.getPartitions()) {
// check partition's replication num. if 1 replication. skip
if (table.getPartitionInfo().getReplicationNum(partition.getId()) == (short) 1) {
Expand All @@ -303,21 +298,16 @@ private long chooseTablet() {
Partition partition = (Partition) chosenOne;

// sort materializedIndices
Queue<MetaObject> indexQueue =
new PriorityQueue<MetaObject>(1, COMPARATOR);
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
indexQueue.add(index);
}
List<MaterializedIndex> visibleIndexs = partition.getMaterializedIndices(IndexExtState.VISIBLE);
Queue<MetaObject> indexQueue = new PriorityQueue<MetaObject>(visibleIndexs.size(), COMPARATOR);
indexQueue.addAll(visibleIndexs);

while ((chosenOne = indexQueue.poll()) != null) {
MaterializedIndex index = (MaterializedIndex) chosenOne;

// sort tablets
Queue<MetaObject> tabletQueue =
new PriorityQueue<MetaObject>(1, COMPARATOR);
for (Tablet oneTablet : index.getTablets()) {
tabletQueue.add(oneTablet);
}
Queue<MetaObject> tabletQueue = new PriorityQueue<MetaObject>(index.getTablets().size(), COMPARATOR);
tabletQueue.addAll(index.getTablets());

while ((chosenOne = tabletQueue.poll()) != null) {
Tablet tablet = (Tablet) chosenOne;
Expand All @@ -334,17 +324,20 @@ private long chooseTablet() {
LOG.debug("tablet[{}]'s version[{}-{}] has been checked. ignore",
chosenTabletId, tablet.getCheckedVersion(),
tablet.getCheckedVersionHash());
continue;
}
} else {
LOG.info("chose tablet[{}-{}-{}-{}-{}] to check consistency", db.getId(),
table.getId(), partition.getId(), index.getId(), chosenTabletId);

return chosenTabletId;
chosenTablets.add(chosenTabletId);
}
} // end while tabletQueue
} // end while indexQueue
} // end while partitionQueue

if (chosenTablets.size() >= MAX_JOB_NUM) {
return chosenTablets;
}
} // end while partitionQueue
} // end while tableQueue
} finally {
db.readUnlock();
Expand All @@ -354,7 +347,7 @@ private long chooseTablet() {
jobsLock.readLock().unlock();
}

return -1L;
return chosenTablets;
}

public void handleFinishedConsistencyCheck(CheckConsistencyTask task, long checksum) {
Expand Down

0 comments on commit e00343b

Please sign in to comment.