Skip to content

Commit

Permalink
[improvement](disk balance) Prevent duplicate disk balance tasks afte… (
Browse files Browse the repository at this point in the history
  • Loading branch information
deardeng authored Nov 11, 2023
1 parent 557693c commit 76a8b59
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
10 changes: 7 additions & 3 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1924,6 +1924,10 @@ void StorageMediumMigrateTaskPool::_storage_medium_migrate_worker_thread_callbac
EngineStorageMigrationTask engine_task(tablet, dest_store);
status = _env->storage_engine()->execute_task(&engine_task);
}
// fe should ignore this err
if (status.is<FILE_ALREADY_EXIST>()) {
status = Status::OK();
}
if (!status.ok()) {
LOG_WARNING("failed to migrate storage medium")
.tag("signature", agent_task_req.signature)
Expand Down Expand Up @@ -1986,8 +1990,9 @@ Status StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMedium
*dest_store = stores[0];
}
if (tablet->data_dir()->path() == (*dest_store)->path()) {
return Status::InternalError("tablet is already on specified path {}",
tablet->data_dir()->path());
LOG_WARNING("tablet is already on specified path").tag("path", tablet->data_dir()->path());
return Status::Error<FILE_ALREADY_EXIST, false>("tablet is already on specified path: {}",
tablet->data_dir()->path());
}

// check local disk capacity
Expand All @@ -1996,7 +2001,6 @@ Status StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMedium
return Status::InternalError("reach the capacity limit of path {}, tablet_size={}",
(*dest_store)->path(), tablet_size);
}

return Status::OK();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
Set<Long> pathHigh = Sets.newHashSet();
// we only select tablets from available high load path
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium);
// check if BE has low and high paths for balance after reclassification
if (!checkAndReclassifyPaths(pathLow, pathMid, pathHigh)) {
continue;
}
Expand Down Expand Up @@ -382,5 +381,6 @@ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws SchedException {
if (!setDest) {
throw new SchedException(Status.UNRECOVERABLE, "unable to find low load path");
}
LOG.info("dx test out completeSchedCtx");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
Expand Down Expand Up @@ -608,6 +609,15 @@ private void checkDiskBalanceLastSuccTime(long beId, long pathHash) throws Sched
}
}

public void updateDestPathHash(TabletSchedCtx tabletCtx) {
// find dest replica
Optional<Replica> destReplica = tabletCtx.getReplicas()
.stream().filter(replica -> replica.getBackendId() == tabletCtx.getDestBackendId()).findAny();
if (destReplica.isPresent() && tabletCtx.getDestPathHash() != -1) {
destReplica.get().setPathHash(tabletCtx.getDestPathHash());
}
}

public void updateDiskBalanceLastSuccTime(long beId, long pathHash) {
PathSlot pathSlot = backendsWorkingSlots.get(beId);
if (pathSlot == null) {
Expand Down Expand Up @@ -1630,6 +1640,7 @@ public boolean finishStorageMediaMigrationTask(StorageMediaMigrationTask migrati
// if we have a success task, then stat must be refreshed before schedule a new task
updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash());
updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash());
updateDestPathHash(tabletCtx);
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, Status.FINISHED, "finished");
} else {
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
Expand Down

0 comments on commit 76a8b59

Please sign in to comment.