diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java index 9584a6f1fa154fd..24bd2355c56b436 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java @@ -33,11 +33,14 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * ExecutionProfile is used to collect profile of a complete query plan(including query or load). @@ -72,6 +75,12 @@ public class ExecutionProfile { // fragmentId -> dummy value private MarkedCountDownLatch profileFragmentDoneSignal; + // fragmentId -> The number of BE without 'done. + private Map befragmentDone; + + // lock befragmentDone + private ReadWriteLock lock; + // use to merge profile from multi be private List>> multiBeProfile = null; @@ -231,8 +240,20 @@ private boolean enablePipelineX() { public void markFragments(int fragments) { profileFragmentDoneSignal = new MarkedCountDownLatch<>(fragments); + lock = new ReentrantReadWriteLock(); + befragmentDone = new HashMap<>(); for (int fragmentId = 0; fragmentId < fragments; fragmentId++) { profileFragmentDoneSignal.addMark(fragmentId, -1L /* value is meaningless */); + befragmentDone.put(fragmentId, 0); + } + } + + public void addFragments(int fragmentId) { + lock.writeLock().lock(); + try { + befragmentDone.put(fragmentId, befragmentDone.get(fragmentId) + 1); + } finally { + lock.writeLock().unlock(); } } @@ -282,8 +303,16 @@ public void markOneInstanceDone(TUniqueId fragmentInstanceId) { public void markOneFragmentDone(int fragmentId) { if (profileFragmentDoneSignal != null) { - if (!profileFragmentDoneSignal.markedCountDown(fragmentId, -1L)) { - LOG.warn("Mark fragment {} done failed", fragmentId); + lock.writeLock().lock(); + try { + befragmentDone.put(fragmentId, befragmentDone.get(fragmentId) - 1); + if (befragmentDone.get(fragmentId) == 0) { + if (!profileFragmentDoneSignal.markedCountDown(fragmentId, -1L)) { + LOG.warn("Mark fragment {} done failed", fragmentId); + } + } + } finally { + lock.writeLock().unlock(); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 5e2d0ef85c8674f..165583b9e686bc3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -3196,6 +3196,9 @@ public PipelineExecContext(PlanFragmentId fragmentId, int profileFragmentId, this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime(); this.enablePipelineX = enablePipelineX; this.executionProfile = executionProfile; + if (enablePipelineX) { + executionProfile.addFragments(profileFragmentId); + } } public Stream profileStream() {