Skip to content

Commit

Permalink
omp: fix race conditions with elastic scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
csegarragonz committed May 12, 2024
1 parent df2c945 commit a3d3a63
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 13 deletions.
116 changes: 105 additions & 11 deletions src/planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,50 @@ namespace faabric::planner {
// Utility Functions
// ----------------------

static int availableSlots(std::shared_ptr<Host> host)
// Helper method to calculate how many available slots in the current host we
// can scale-up to
int availableOpenMpSlots(
int appId,
const std::string& mainHost,
const std::map<std::string, std::shared_ptr<Host>>& hostMap,
const faabric::batch_scheduler::InFlightReqs& inFlightReqs)
{
int availableSlots = host->slots() - host->usedslots();
// At most, we can scale-up to the host size minus one (the caller thread)
int availableSlots =
hostMap.at(mainHost)->slots() - hostMap.at(mainHost)->usedslots();
assert(availableSlots <= hostMap.at(mainHost)->slots() - 1);

Check warning on line 41 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L39-L41

Added lines #L39 - L41 were not covered by tests

// However, we need to discard any in-flight apps that are also running
// in this host. This is to prevent a situation where a co-located app
// elastically scales beyond another's app minimum level of parallelism
for (const auto& [thisAppId, inFlightPair] : inFlightReqs) {
if (appId == thisAppId) {
continue;

Check warning on line 48 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L46-L48

Added lines #L46 - L48 were not covered by tests
}

// Check if the first message in the decision is scheduled to the
// same host we are
if (inFlightPair.second->hosts.at(0) == mainHost) {

Check warning on line 53 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L53

Added line #L53 was not covered by tests
// If so, check if the total OMP num threads is more than the
// current number of messages in flight, and if so subtract the
// difference from the available slots list
int requestedButNotOccupiedSlots =
inFlightPair.first->messages(0).ompnumthreads() -
inFlightPair.first->messages_size();

Check warning on line 59 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L57-L59

Added lines #L57 - L59 were not covered by tests

// This value could be smaller than zero if elastically scaled-up
if (requestedButNotOccupiedSlots > 0) {
availableSlots -= requestedButNotOccupiedSlots;

Check warning on line 63 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L62-L63

Added lines #L62 - L63 were not covered by tests

SPDLOG_DEBUG("Subtracting {} possible slots for app {}'s "
"elastic scale from app {}!",
requestedButNotOccupiedSlots,
appId,
thisAppId);

Check warning on line 69 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L65-L69

Added lines #L65 - L69 were not covered by tests
}
}
}

assert(availableSlots >= 0);

Check warning on line 74 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L74

Added line #L74 was not covered by tests

return availableSlots;

Check warning on line 76 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L76

Added line #L76 was not covered by tests
Expand Down Expand Up @@ -442,9 +483,10 @@ void Planner::setMessageResult(std::shared_ptr<faabric::Message> msg)
if (it == req->messages().end()) {
// Ditto as before. We want to allow setting the message result
// more than once without breaking
SPDLOG_DEBUG(
"Setting result for non-existant (or finished) message: {}",
appId);
SPDLOG_DEBUG("Setting result for non-existant (or finished) "
"message: {} (app: {})",
msg->id(),
appId);
} else {
SPDLOG_DEBUG("Removing message {} from app {}", msg->id(), appId);

Expand Down Expand Up @@ -777,6 +819,7 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
// does not modify it
auto hostMapCopy =
convertToBatchSchedHostMap(state.hostMap, state.nextEvictedHostIps);

bool isScaleChange =
decisionType == faabric::batch_scheduler::DecisionType::SCALE_CHANGE;
bool isDistChange =
Expand All @@ -792,7 +835,10 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
auto oldDec = state.inFlightReqs.at(appId).second;
auto mainHost = oldDec->hosts.at(0);

Check warning on line 836 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L834-L836

Added lines #L834 - L836 were not covered by tests

int numAvail = availableSlots(state.hostMap.at(mainHost));
// If there are co-located OpenMP apps, we should never use
// their `ompNumThreads`' slots
int numAvail = availableOpenMpSlots(
appId, mainHost, state.hostMap, state.inFlightReqs);
int numRequested = req->messages_size();
int lastMsgIdx =
numRequested == 0 ? 0 : req->messages(numRequested - 1).groupidx();
Expand Down Expand Up @@ -864,6 +910,38 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
bool isOmp = req->messages_size() > 0 && req->messages(0).isomp();
std::shared_ptr<BatchExecuteRequest> knownSizeReq = nullptr;

// For an OpenMP decision, we want to make sure that no in-flight
// tasks are currently in a join phase (from a repeated fork-join)
if (isOmp) {
for (const auto& [thisAppId, inFlightPair] : state.inFlightReqs) {
if (thisAppId == appId) {
continue;

Check warning on line 918 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L916-L918

Added lines #L916 - L918 were not covered by tests
}

int requestedButNotOccupiedSlots =
inFlightPair.first->messages(0).ompnumthreads() -
inFlightPair.first->messages_size();

Check warning on line 923 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L921-L923

Added lines #L921 - L923 were not covered by tests

// TODO: this only works for single host OpenMP requests
if (requestedButNotOccupiedSlots > 0) {
auto mainHost = inFlightPair.second->hosts.at(0);

Check warning on line 927 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L926-L927

Added lines #L926 - L927 were not covered by tests

// TODO: move to DEBUG
SPDLOG_WARN("WARNING: tried to schedule OpenMP app (appid: {})"
" in host {} while another in-flight OpenMP app"
" (appid: {}) had too few messages in flight "
" ({} < {})",
appId,
mainHost,
thisAppId,
inFlightPair.first->messages_size(),
inFlightPair.first->messages(0).ompnumthreads());
hostMapCopy.at(mainHost)->usedSlots +=
requestedButNotOccupiedSlots;

Check warning on line 940 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L930-L940

Added lines #L930 - L940 were not covered by tests
}
}
}

// Check if there exists a pre-loaded scheduling decision for this app
// (e.g. if we want to force a migration). Note that we don't want to check
// pre-loaded decisions for dist-change requests
Expand Down Expand Up @@ -934,6 +1012,26 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
return decision;
}

bool isSingleHost = decision->isSingleHost();
if (!isSingleHost && req->singlehosthint()) {
// In an elastic OpenMP execution, it may happen that we try to
// schedule an app, but another one has been elastically scaled
if (isNew && isOmp && req->elasticscalehint()) {

Check warning on line 1019 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L1019

Added line #L1019 was not covered by tests
// Let the caller handle that there are not enough slots
return std::make_shared<
faabric::batch_scheduler::SchedulingDecision>(
NOT_ENOUGH_SLOTS_DECISION);

Check warning on line 1023 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L1021-L1023

Added lines #L1021 - L1023 were not covered by tests
}

// This is likely a fatal error and a sign that something has gone
// very wrong. We still do not crash the planner
SPDLOG_ERROR(
"User provided single-host hint in BER, but decision is not!");

Check warning on line 1029 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L1028-L1029

Added lines #L1028 - L1029 were not covered by tests

return std::make_shared<faabric::batch_scheduler::SchedulingDecision>(
NOT_ENOUGH_SLOTS_DECISION);

Check warning on line 1032 in src/planner/Planner.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/Planner.cpp#L1031-L1032

Added lines #L1031 - L1032 were not covered by tests
}

// If we have managed to schedule a frozen request, un-freeze it by
// removing it from the evicted request map
if (state.evictedRequests.contains(appId)) {
Expand Down Expand Up @@ -1016,7 +1114,7 @@ Planner::callBatch(std::shared_ptr<BatchExecuteRequest> req)
decision->print();
#endif

// For a NEW MPI decision that was not preloaded we have
// For a NEW MPI/OpenMP decision that was not preloaded we have
// preemptively scheduled all MPI messages but now we just need to
// return the first one, and preload the rest
if ((isMpi || isOmp) && knownSizeReq != nullptr) {
Expand Down Expand Up @@ -1231,10 +1329,6 @@ void Planner::dispatchSchedulingDecision(
}

bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS;
if (!isSingleHost && req->singlehosthint()) {
SPDLOG_ERROR(
"User provided single-host hint in BER, but decision is not!");
}

for (const auto& [hostIp, hostReq] : hostRequests) {
SPDLOG_DEBUG("Dispatching {} messages to host {} for execution",
Expand Down
21 changes: 19 additions & 2 deletions src/planner/PlannerEndpointHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,25 @@ void PlannerEndpointHandler::onRequest(
}

if (inFlightPair.first->messages(0).isomp()) {
inFlightAppResp->set_size(
inFlightPair.first->messages(0).ompnumthreads());
// What if we told here the scaled-up size (?)
int numOmpThreads =
inFlightPair.first->messages(0).ompnumthreads();

Check warning on line 193 in src/planner/PlannerEndpointHandler.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/PlannerEndpointHandler.cpp#L192-L193

Added lines #L192 - L193 were not covered by tests

if (inFlightPair.first->elasticscalehint() &&
numOmpThreads < inFlightPair.first->messages_size()) {

Check warning on line 196 in src/planner/PlannerEndpointHandler.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/PlannerEndpointHandler.cpp#L195-L196

Added lines #L195 - L196 were not covered by tests

SPDLOG_WARN("App {} has elastically scaled-up, so "
"setting size to {} (instead of {})",
inFlightPair.first->appid(),
inFlightPair.first->messages_size(),
numOmpThreads);

Check warning on line 202 in src/planner/PlannerEndpointHandler.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/PlannerEndpointHandler.cpp#L198-L202

Added lines #L198 - L202 were not covered by tests

inFlightAppResp->set_size(
inFlightPair.first->messages_size());
} else {
inFlightAppResp->set_size(
inFlightPair.first->messages(0).ompnumthreads());

Check warning on line 208 in src/planner/PlannerEndpointHandler.cpp

View check run for this annotation

Codecov / codecov/patch

src/planner/PlannerEndpointHandler.cpp#L204-L208

Added lines #L204 - L208 were not covered by tests
}
}

for (const auto& hostIp : decision->hosts) {
Expand Down

0 comments on commit a3d3a63

Please sign in to comment.