From a3d3a63f377a35d78aa0ca72199009f621d6bf89 Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Sun, 12 May 2024 22:19:39 +0000 Subject: [PATCH] omp: fix race conditions with elastic scaling --- src/planner/Planner.cpp | 116 ++++++++++++++++++++++--- src/planner/PlannerEndpointHandler.cpp | 21 ++++- 2 files changed, 124 insertions(+), 13 deletions(-) diff --git a/src/planner/Planner.cpp b/src/planner/Planner.cpp index 878bf23df..8ac4149c5 100644 --- a/src/planner/Planner.cpp +++ b/src/planner/Planner.cpp @@ -27,9 +27,50 @@ namespace faabric::planner { // Utility Functions // ---------------------- -static int availableSlots(std::shared_ptr host) +// Helper method to calculate how many available slots in the current host we +// can scale-up to +int availableOpenMpSlots( + int appId, + const std::string& mainHost, + const std::map>& hostMap, + const faabric::batch_scheduler::InFlightReqs& inFlightReqs) { - int availableSlots = host->slots() - host->usedslots(); + // At most, we can scale-up to the host size minus one (the caller thread) + int availableSlots = + hostMap.at(mainHost)->slots() - hostMap.at(mainHost)->usedslots(); + assert(availableSlots <= hostMap.at(mainHost)->slots() - 1); + + // However, we need to discard any in-flight apps that are also running + // in this host. This is to prevent a situation where a co-located app + // elastically scales beyond another's app minimum level of parallelism + for (const auto& [thisAppId, inFlightPair] : inFlightReqs) { + if (appId == thisAppId) { + continue; + } + + // Check if the first message in the decision is scheduled to the + // same host we are + if (inFlightPair.second->hosts.at(0) == mainHost) { + // If so, check if the total OMP num threads is more than the + // current number of messages in flight, and if so subtract the + // difference from the available slots list + int requestedButNotOccupiedSlots = + inFlightPair.first->messages(0).ompnumthreads() - + inFlightPair.first->messages_size(); + + // This value could be smaller than zero if elastically scaled-up + if (requestedButNotOccupiedSlots > 0) { + availableSlots -= requestedButNotOccupiedSlots; + + SPDLOG_DEBUG("Subtracting {} possible slots for app {}'s " + "elastic scale from app {}!", + requestedButNotOccupiedSlots, + appId, + thisAppId); + } + } + } + assert(availableSlots >= 0); return availableSlots; @@ -442,9 +483,10 @@ void Planner::setMessageResult(std::shared_ptr msg) if (it == req->messages().end()) { // Ditto as before. We want to allow setting the message result // more than once without breaking - SPDLOG_DEBUG( - "Setting result for non-existant (or finished) message: {}", - appId); + SPDLOG_DEBUG("Setting result for non-existant (or finished) " + "message: {} (app: {})", + msg->id(), + appId); } else { SPDLOG_DEBUG("Removing message {} from app {}", msg->id(), appId); @@ -777,6 +819,7 @@ Planner::callBatch(std::shared_ptr req) // does not modify it auto hostMapCopy = convertToBatchSchedHostMap(state.hostMap, state.nextEvictedHostIps); + bool isScaleChange = decisionType == faabric::batch_scheduler::DecisionType::SCALE_CHANGE; bool isDistChange = @@ -792,7 +835,10 @@ Planner::callBatch(std::shared_ptr req) auto oldDec = state.inFlightReqs.at(appId).second; auto mainHost = oldDec->hosts.at(0); - int numAvail = availableSlots(state.hostMap.at(mainHost)); + // If there are co-located OpenMP apps, we should never use + // their `ompNumThreads`' slots + int numAvail = availableOpenMpSlots( + appId, mainHost, state.hostMap, state.inFlightReqs); int numRequested = req->messages_size(); int lastMsgIdx = numRequested == 0 ? 0 : req->messages(numRequested - 1).groupidx(); @@ -864,6 +910,38 @@ Planner::callBatch(std::shared_ptr req) bool isOmp = req->messages_size() > 0 && req->messages(0).isomp(); std::shared_ptr knownSizeReq = nullptr; + // For an OpenMP decision, we want to make sure that no in-flight + // tasks are currently in a join phase (from a repeated fork-join) + if (isOmp) { + for (const auto& [thisAppId, inFlightPair] : state.inFlightReqs) { + if (thisAppId == appId) { + continue; + } + + int requestedButNotOccupiedSlots = + inFlightPair.first->messages(0).ompnumthreads() - + inFlightPair.first->messages_size(); + + // TODO: this only works for single host OpenMP requests + if (requestedButNotOccupiedSlots > 0) { + auto mainHost = inFlightPair.second->hosts.at(0); + + // TODO: move to DEBUG + SPDLOG_WARN("WARNING: tried to schedule OpenMP app (appid: {})" + " in host {} while another in-flight OpenMP app" + " (appid: {}) had too few messages in flight " + " ({} < {})", + appId, + mainHost, + thisAppId, + inFlightPair.first->messages_size(), + inFlightPair.first->messages(0).ompnumthreads()); + hostMapCopy.at(mainHost)->usedSlots += + requestedButNotOccupiedSlots; + } + } + } + // Check if there exists a pre-loaded scheduling decision for this app // (e.g. if we want to force a migration). Note that we don't want to check // pre-loaded decisions for dist-change requests @@ -934,6 +1012,26 @@ Planner::callBatch(std::shared_ptr req) return decision; } + bool isSingleHost = decision->isSingleHost(); + if (!isSingleHost && req->singlehosthint()) { + // In an elastic OpenMP execution, it may happen that we try to + // schedule an app, but another one has been elastically scaled + if (isNew && isOmp && req->elasticscalehint()) { + // Let the caller handle that there are not enough slots + return std::make_shared< + faabric::batch_scheduler::SchedulingDecision>( + NOT_ENOUGH_SLOTS_DECISION); + } + + // This is likely a fatal error and a sign that something has gone + // very wrong. We still do not crash the planner + SPDLOG_ERROR( + "User provided single-host hint in BER, but decision is not!"); + + return std::make_shared( + NOT_ENOUGH_SLOTS_DECISION); + } + // If we have managed to schedule a frozen request, un-freeze it by // removing it from the evicted request map if (state.evictedRequests.contains(appId)) { @@ -1016,7 +1114,7 @@ Planner::callBatch(std::shared_ptr req) decision->print(); #endif - // For a NEW MPI decision that was not preloaded we have + // For a NEW MPI/OpenMP decision that was not preloaded we have // preemptively scheduled all MPI messages but now we just need to // return the first one, and preload the rest if ((isMpi || isOmp) && knownSizeReq != nullptr) { @@ -1231,10 +1329,6 @@ void Planner::dispatchSchedulingDecision( } bool isThreads = req->type() == faabric::BatchExecuteRequest::THREADS; - if (!isSingleHost && req->singlehosthint()) { - SPDLOG_ERROR( - "User provided single-host hint in BER, but decision is not!"); - } for (const auto& [hostIp, hostReq] : hostRequests) { SPDLOG_DEBUG("Dispatching {} messages to host {} for execution", diff --git a/src/planner/PlannerEndpointHandler.cpp b/src/planner/PlannerEndpointHandler.cpp index b76385d63..ee612bde0 100644 --- a/src/planner/PlannerEndpointHandler.cpp +++ b/src/planner/PlannerEndpointHandler.cpp @@ -188,8 +188,25 @@ void PlannerEndpointHandler::onRequest( } if (inFlightPair.first->messages(0).isomp()) { - inFlightAppResp->set_size( - inFlightPair.first->messages(0).ompnumthreads()); + // What if we told here the scaled-up size (?) + int numOmpThreads = + inFlightPair.first->messages(0).ompnumthreads(); + + if (inFlightPair.first->elasticscalehint() && + numOmpThreads < inFlightPair.first->messages_size()) { + + SPDLOG_WARN("App {} has elastically scaled-up, so " + "setting size to {} (instead of {})", + inFlightPair.first->appid(), + inFlightPair.first->messages_size(), + numOmpThreads); + + inFlightAppResp->set_size( + inFlightPair.first->messages_size()); + } else { + inFlightAppResp->set_size( + inFlightPair.first->messages(0).ompnumthreads()); + } } for (const auto& hostIp : decision->hosts) {