Skip to content

Commit

Permalink
[fix](pipelineX) fix multi be may be missing profiles apache#29914
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored and seawinde committed Jan 15, 2024
1 parent 1313852 commit cc6e419
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* ExecutionProfile is used to collect profile of a complete query plan(including query or load).
Expand Down Expand Up @@ -72,6 +75,12 @@ public class ExecutionProfile {
// fragmentId -> dummy value
private MarkedCountDownLatch<Integer, Long> profileFragmentDoneSignal;

// fragmentId -> The number of BE without 'done.
private Map<Integer, Integer> befragmentDone;

// lock befragmentDone
private ReadWriteLock lock;

// use to merge profile from multi be
private List<Map<TNetworkAddress, List<RuntimeProfile>>> multiBeProfile = null;

Expand Down Expand Up @@ -231,8 +240,20 @@ private boolean enablePipelineX() {

public void markFragments(int fragments) {
profileFragmentDoneSignal = new MarkedCountDownLatch<>(fragments);
lock = new ReentrantReadWriteLock();
befragmentDone = new HashMap<>();
for (int fragmentId = 0; fragmentId < fragments; fragmentId++) {
profileFragmentDoneSignal.addMark(fragmentId, -1L /* value is meaningless */);
befragmentDone.put(fragmentId, 0);
}
}

public void addFragments(int fragmentId) {
lock.writeLock().lock();
try {
befragmentDone.put(fragmentId, befragmentDone.get(fragmentId) + 1);
} finally {
lock.writeLock().unlock();
}
}

Expand Down Expand Up @@ -282,8 +303,16 @@ public void markOneInstanceDone(TUniqueId fragmentInstanceId) {

public void markOneFragmentDone(int fragmentId) {
if (profileFragmentDoneSignal != null) {
if (!profileFragmentDoneSignal.markedCountDown(fragmentId, -1L)) {
LOG.warn("Mark fragment {} done failed", fragmentId);
lock.writeLock().lock();
try {
befragmentDone.put(fragmentId, befragmentDone.get(fragmentId) - 1);
if (befragmentDone.get(fragmentId) == 0) {
if (!profileFragmentDoneSignal.markedCountDown(fragmentId, -1L)) {
LOG.warn("Mark fragment {} done failed", fragmentId);
}
}
} finally {
lock.writeLock().unlock();
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -3196,6 +3196,9 @@ public PipelineExecContext(PlanFragmentId fragmentId, int profileFragmentId,
this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime();
this.enablePipelineX = enablePipelineX;
this.executionProfile = executionProfile;
if (enablePipelineX) {
executionProfile.addFragments(profileFragmentId);
}
}

public Stream<RuntimeProfile> profileStream() {
Expand Down

0 comments on commit cc6e419

Please sign in to comment.