Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1506][FOLLOWUP] InFlightRequestTracker should not reset totalInflightReqs for cleaning up to avoid negative totalInflightReqs for limitZeroInFlight #2725

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class InFlightRequestTracker {
private final int maxInFlightReqsTotal;
private final LongAdder totalInflightReqs = new LongAdder();

private volatile boolean cleaned = false;

public InFlightRequestTracker(CelebornConf conf, PushState pushState) {
this.waitInflightTimeoutMs = conf.clientPushLimitInFlightTimeoutMs();
this.delta = conf.clientPushLimitInFlightSleepDeltaMs();
Expand All @@ -67,11 +69,10 @@ public void addBatch(int batchId, String hostAndPushPort) {

public void removeBatch(int batchId, String hostAndPushPort) {
Set<Integer> batchIdSet = inflightBatchesPerAddress.get(hostAndPushPort);
// TODO: Need to debug why batchIdSet will be null.
if (batchIdSet != null) {
batchIdSet.remove(batchId);
} else {
logger.warn("BatchIdSet of {} is null.", hostAndPushPort);
logger.info("Batches of {} in flight is null.", hostAndPushPort);
}
totalInflightReqs.decrement();
}
Expand All @@ -97,19 +98,24 @@ public boolean limitMaxInFlight(String hostAndPushPort) throws IOException {
pushStrategy.limitPushSpeed(pushState, hostAndPushPort);
int currentMaxReqsInFlight = pushStrategy.getCurrentMaxReqsInFlight(hostAndPushPort);

Set batchIdSet = getBatchIdSetByAddressPair(hostAndPushPort);
Set<Integer> batchIdSet = getBatchIdSetByAddressPair(hostAndPushPort);
long times = waitInflightTimeoutMs / delta;
try {
while (times > 0) {
if (totalInflightReqs.sum() <= maxInFlightReqsTotal
&& batchIdSet.size() <= currentMaxReqsInFlight) {
break;
}
if (pushState.exception.get() != null) {
throw pushState.exception.get();
if (cleaned) {
// MapEnd cleans up push state, which does not exceed the max requests in flight limit.
return false;
} else {
if (totalInflightReqs.sum() <= maxInFlightReqsTotal
&& batchIdSet.size() <= currentMaxReqsInFlight) {
break;
}
if (pushState.exception.get() != null) {
throw pushState.exception.get();
}
Thread.sleep(delta);
times--;
}
Thread.sleep(delta);
times--;
}
} catch (InterruptedException e) {
pushState.exception.set(new CelebornIOException(e));
Expand All @@ -118,10 +124,12 @@ public boolean limitMaxInFlight(String hostAndPushPort) throws IOException {
if (times <= 0) {
logger.warn(
"After waiting for {} ms, "
+ "there are still {} batches in flight "
+ "for hostAndPushPort {}, "
+ "there are still {} requests in flight (limit: {}): "
+ "{} batches for hostAndPushPort {}, "
+ "which exceeds the current limit {}.",
waitInflightTimeoutMs,
totalInflightReqs.sum(),
maxInFlightReqsTotal,
batchIdSet.size(),
hostAndPushPort,
currentMaxReqsInFlight);
Expand All @@ -142,14 +150,19 @@ public boolean limitZeroInFlight() throws IOException {

try {
while (times > 0) {
if (totalInflightReqs.sum() == 0) {
break;
if (cleaned) {
// MapEnd cleans up push state, which does not exceed the zero requests in flight limit.
return false;
} else {
if (totalInflightReqs.sum() == 0) {
break;
}
if (pushState.exception.get() != null) {
throw pushState.exception.get();
}
Thread.sleep(delta);
times--;
}
if (pushState.exception.get() != null) {
throw pushState.exception.get();
}
Thread.sleep(delta);
times--;
}
} catch (InterruptedException e) {
pushState.exception.set(new CelebornIOException(e));
Expand All @@ -158,9 +171,10 @@ public boolean limitZeroInFlight() throws IOException {
if (times <= 0) {
logger.error(
"After waiting for {} ms, "
+ "there are still {} in flight, "
+ "there are still {} requests in flight: {}, "
+ "which exceeds the current limit 0.",
waitInflightTimeoutMs,
totalInflightReqs.sum(),
inflightBatchesPerAddress.entrySet().stream()
.filter(c -> !c.getValue().isEmpty())
.map(c -> c.getValue().size() + " batches for hostAndPushPort " + c.getKey())
Expand All @@ -184,11 +198,12 @@ protected int nextBatchId() {
}

public void cleanup() {
if (!inflightBatchesPerAddress.isEmpty()) {
logger.warn("Clear {}", this.getClass().getSimpleName());
inflightBatchesPerAddress.clear();
totalInflightReqs.reset();
}
logger.info(
"Cleanup {} requests and {} batches in flight.",
totalInflightReqs.sum(),
inflightBatchesPerAddress.size());
cleaned = true;
inflightBatchesPerAddress.clear();
pushStrategy.clear();
}
}
Loading