Skip to content

Commit

Permalink
[Alter] Allow submitting alter jobs when table is unstable
Browse files Browse the repository at this point in the history
Alter job will wait table to be stable before running.
  • Loading branch information
WingsGo authored and morningman committed Jan 18, 2020
1 parent ae01804 commit 92d8f6a
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 94 deletions.
5 changes: 3 additions & 2 deletions fe/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,9 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
if (olapTable.getState() != OlapTableState.NORMAL) {
throw new DdlException("Table[" + table.getName() + "]'s state is not NORMAL. Do not allow doing ALTER ops");
}

if (needTableStable) {

// schema change job will wait until table become stable
if (needTableStable && !hasSchemaChange && !hasAddMaterializedView) {
// check if all tablets are healthy, and no tablet is in tablet scheduler
boolean isStable = olapTable.isStable(Catalog.getCurrentSystemInfo(),
Catalog.getCurrentCatalog().getTabletScheduler(),
Expand Down
11 changes: 10 additions & 1 deletion fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,17 @@ protected void runPendingJob() throws AlterCancelException {
if (tbl == null) {
throw new AlterCancelException("Table " + tableId + " does not exist");
}
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);

boolean isStable = tbl.isStable(Catalog.getCurrentSystemInfo(),
Catalog.getCurrentCatalog().getTabletScheduler(),
db.getClusterName());
if (!isStable) {
errMsg = "table is unstable";
LOG.warn("doing rollup job: " + jobId + " while table is not stable.");
return;
}

Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
Partition partition = tbl.getPartition(partitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1310,12 +1310,12 @@ private void getOldAlterJobInfos(Database db, List<List<Comparable>> schemaChang
public void process(List<AlterClause> alterClauses, String clusterName, Database db, OlapTable olapTable)
throws UserException {
// index id -> index schema
Map<Long, LinkedList<Column>> indexSchemaMap = new HashMap<Long, LinkedList<Column>>();
Map<Long, LinkedList<Column>> indexSchemaMap = new HashMap<>();
for (Map.Entry<Long, List<Column>> entry : olapTable.getIndexIdToSchema().entrySet()) {
indexSchemaMap.put(entry.getKey(), new LinkedList<Column>(entry.getValue()));
indexSchemaMap.put(entry.getKey(), new LinkedList<>(entry.getValue()));
}
List<Index> newIndexes = olapTable.getCopiedIndexes();
Map<String, String> propertyMap = new HashMap<String, String>();
Map<String, String> propertyMap = new HashMap<>();
for (AlterClause alterClause : alterClauses) {
// get properties
Map<String, String> properties = alterClause.getProperties();
Expand Down
14 changes: 12 additions & 2 deletions fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,23 @@ protected void runPendingJob() throws AlterCancelException {
totalReplicaNum += tablet.getReplicas().size();
}
}
MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>(totalReplicaNum);
MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<>(totalReplicaNum);
db.readLock();
try {
OlapTable tbl = (OlapTable) db.getTable(tableId);
if (tbl == null) {
throw new AlterCancelException("Table " + tableId + " does not exist");
}
}

boolean isStable = tbl.isStable(Catalog.getCurrentSystemInfo(),
Catalog.getCurrentCatalog().getTabletScheduler(),
db.getClusterName());
if (!isStable) {
errMsg = "table is unstable";
LOG.warn("doing schema change job: " + jobId + " while table is not stable.");
return;
}

Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
Expand Down
153 changes: 70 additions & 83 deletions fe/src/main/java/org/apache/doris/clone/TabletChecker.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ public int hashCode() {
}
}

public static class RepairTabletInfo {
public long dbId;
public long tblId;
public List<Long> partIds;

public RepairTabletInfo(Long dbId, Long tblId, List<Long> partIds) {
this.dbId = dbId;
this.tblId = tblId;
this.partIds = partIds;
}
}

public TabletChecker(Catalog catalog, SystemInfoService infoService, TabletScheduler tabletScheduler,
TabletSchedulerStat stat) {
super("tablet checker", CHECK_INTERVAL_MS);
Expand All @@ -109,42 +121,42 @@ public TabletChecker(Catalog catalog, SystemInfoService infoService, TabletSched
this.stat = stat;
}

public void addPrios(long dbId, long tblId, List<Long> partitionIds, long timeoutMs) {
Preconditions.checkArgument(!partitionIds.isEmpty());
private void addPrios(RepairTabletInfo repairTabletInfo, long timeoutMs) {
Preconditions.checkArgument(!repairTabletInfo.partIds.isEmpty());
long currentTime = System.currentTimeMillis();
synchronized (prios) {
Set<PrioPart> parts = prios.get(dbId, tblId);
Set<PrioPart> parts = prios.get(repairTabletInfo.dbId, repairTabletInfo.tblId);
if (parts == null) {
parts = Sets.newHashSet();
prios.put(dbId, tblId, parts);
prios.put(repairTabletInfo.dbId, repairTabletInfo.tblId, parts);
}

for (long partId : partitionIds) {
for (long partId : repairTabletInfo.partIds) {
PrioPart prioPart = new PrioPart(partId, currentTime, timeoutMs);
parts.add(prioPart);
}
}

// we also need to change the priority of tablets which are already in
tabletScheduler.changeTabletsPriorityToVeryHigh(dbId, tblId, partitionIds);
tabletScheduler.changeTabletsPriorityToVeryHigh(repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds);
}

private void removePrios(long dbId, long tblId, List<Long> partitionIds) {
Preconditions.checkArgument(!partitionIds.isEmpty());
private void removePrios(RepairTabletInfo repairTabletInfo) {
Preconditions.checkArgument(!repairTabletInfo.partIds.isEmpty());
synchronized (prios) {
Map<Long, Set<PrioPart>> tblMap = prios.row(dbId);
Map<Long, Set<PrioPart>> tblMap = prios.row(repairTabletInfo.dbId);
if (tblMap == null) {
return;
}
Set<PrioPart> parts = tblMap.get(tblId);
Set<PrioPart> parts = tblMap.get(repairTabletInfo.tblId);
if (parts == null) {
return;
}
for (long partId : partitionIds) {
for (long partId : repairTabletInfo.partIds) {
parts.remove(new PrioPart(partId, -1, -1));
}
if (parts.isEmpty()) {
tblMap.remove(tblId);
tblMap.remove(repairTabletInfo.tblId);
}
}

Expand Down Expand Up @@ -271,7 +283,8 @@ private void checkTablets() {
// priorities.
LOG.debug("partition is healthy, remove from prios: {}-{}-{}",
db.getId(), olapTbl.getId(), partition.getId());
removePrios(db.getId(), olapTbl.getId(), Lists.newArrayList(partition.getId()));
removePrios(new RepairTabletInfo(db.getId(),
olapTbl.getId(), Lists.newArrayList(partition.getId())));
}
} // partitions
} // tables
Expand Down Expand Up @@ -356,71 +369,72 @@ private void removePriosIfNecessary() {
* when being scheduled.
*/
public void repairTable(AdminRepairTableStmt stmt) throws DdlException {
Catalog catalog = Catalog.getCurrentCatalog();
Database db = catalog.getDb(stmt.getDbName());
if (db == null) {
throw new DdlException("Database " + stmt.getDbName() + " does not exist");
}
RepairTabletInfo repairTabletInfo = getRepairTabletInfo(stmt.getDbName(), stmt.getTblName(), stmt.getPartitions());
addPrios(repairTabletInfo, stmt.getTimeoutS());
LOG.info("repair database: {}, table: {}, partition: {}", repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds);
}

long dbId = db.getId();
long tblId = -1;
List<Long> partIds = Lists.newArrayList();
db.readLock();
try {
Table tbl = db.getTable(stmt.getTblName());
if (tbl == null || tbl.getType() != TableType.OLAP) {
throw new DdlException("Table does not exist or is not OLAP table: " + stmt.getTblName());
/*
* handle ADMIN CANCEL REPAIR TABLE stmt send by user.
* This operation will remove the specified partitions from 'prios'
*/
public void cancelRepairTable(AdminCancelRepairTableStmt stmt) throws DdlException {
RepairTabletInfo repairTabletInfo = getRepairTabletInfo(stmt.getDbName(), stmt.getTblName(), stmt.getPartitions());
removePrios(repairTabletInfo);
LOG.info("cancel repair database: {}, table: {}, partition: {}", repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds);
}

public int getPrioPartitionNum() {
int count = 0;
synchronized (prios) {
for (Set<PrioPart> set : prios.values()) {
count += set.size();
}
}
return count;
}

tblId = tbl.getId();
OlapTable olapTable = (OlapTable) tbl;
if (stmt.getPartitions().isEmpty()) {
partIds = olapTable.getPartitions().stream().map(p -> p.getId()).collect(Collectors.toList());
} else {
for (String partName : stmt.getPartitions()) {
Partition partition = olapTable.getPartition(partName);
if (partition == null) {
throw new DdlException("Partition does not exist: " + partName);
}
partIds.add(partition.getId());
public List<List<String>> getPriosInfo() {
List<List<String>> infos = Lists.newArrayList();
synchronized (prios) {
for (Cell<Long, Long, Set<PrioPart>> cell : prios.cellSet()) {
for (PrioPart part : cell.getValue()) {
List<String> row = Lists.newArrayList();
row.add(cell.getRowKey().toString());
row.add(cell.getColumnKey().toString());
row.add(String.valueOf(part.partId));
row.add(String.valueOf(part.timeoutMs - (System.currentTimeMillis() - part.addTime)));
infos.add(row);
}
}
} finally {
db.readUnlock();
}

Preconditions.checkState(tblId != -1);
addPrios(dbId, tblId, partIds, stmt.getTimeoutS() * 1000);
LOG.info("repair database: {}, table: {}, partition: {}", dbId, tblId, partIds);
return infos;
}

/*
* handle ADMIN CANCEL REPAIR TABLE stmt send by user.
* This operation will remove the specified partitions from 'prios'
*/
public void cancelRepairTable(AdminCancelRepairTableStmt stmt) throws DdlException {
public static RepairTabletInfo getRepairTabletInfo(String dbName, String tblName, List<String> partitions) throws DdlException {
Catalog catalog = Catalog.getCurrentCatalog();
Database db = catalog.getDb(stmt.getDbName());
Database db = catalog.getDb(dbName);
if (db == null) {
throw new DdlException("Database " + stmt.getDbName() + " does not exist");
throw new DdlException("Database " + dbName + " does not exist");
}

long dbId = db.getId();
long tblId = -1;
List<Long> partIds = Lists.newArrayList();
db.readLock();
try {
Table tbl = db.getTable(stmt.getTblName());
Table tbl = db.getTable(tblName);
if (tbl == null || tbl.getType() != TableType.OLAP) {
throw new DdlException("Table does not exist or is not OLAP table: " + stmt.getTblName());
throw new DdlException("Table does not exist or is not OLAP table: " + tblName);
}

tblId = tbl.getId();
OlapTable olapTable = (OlapTable) tbl;
if (stmt.getPartitions().isEmpty()) {
partIds = olapTable.getPartitions().stream().map(p -> p.getId()).collect(Collectors.toList());

if (partitions == null || partitions.isEmpty()) {
partIds = olapTable.getPartitions().stream().map(Partition::getId).collect(Collectors.toList());
} else {
for (String partName : stmt.getPartitions()) {
for (String partName : partitions) {
Partition partition = olapTable.getPartition(partName);
if (partition == null) {
throw new DdlException("Partition does not exist: " + partName);
Expand All @@ -433,34 +447,7 @@ public void cancelRepairTable(AdminCancelRepairTableStmt stmt) throws DdlExcepti
}

Preconditions.checkState(tblId != -1);
removePrios(dbId, tblId, partIds);
LOG.info("cancel repair database: {}, table: {}, partition: {}", dbId, tblId, partIds);
}

public int getPrioPartitionNum() {
int count = 0;
synchronized (prios) {
for (Set<PrioPart> set : prios.values()) {
count += set.size();
}
}
return count;
}

public List<List<String>> getPriosInfo() {
List<List<String>> infos = Lists.newArrayList();
synchronized (prios) {
for (Cell<Long, Long, Set<PrioPart>> cell : prios.cellSet()) {
for (PrioPart part : cell.getValue()) {
List<String> row = Lists.newArrayList();
row.add(cell.getRowKey().toString());
row.add(cell.getColumnKey().toString());
row.add(String.valueOf(part.partId));
row.add(String.valueOf(part.timeoutMs - (System.currentTimeMillis() - part.addTime)));
infos.add(row);
}
}
}
return infos;
return new RepairTabletInfo(dbId, tblId, partIds);
}
}
Loading

0 comments on commit 92d8f6a

Please sign in to comment.