Skip to content

Executor decision caching and single-host optimisations #222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class Executor
faabric::Message& msg,
bool createIfNotExists = false);

void updateMainThreadSnapshot();

virtual std::span<uint8_t> getMemoryView();

protected:
Expand All @@ -106,11 +108,7 @@ class Executor

// ---- Application threads ----
std::shared_mutex threadExecutionMutex;
std::unordered_map<std::string, int> cachedGroupIds;
std::unordered_map<std::string, std::vector<std::string>>
cachedDecisionHosts;
std::vector<char> dirtyRegions;

void deleteMainThreadSnapshot(const faabric::Message& msg);

// ---- Function execution thread pool ----
Expand All @@ -133,6 +131,11 @@ class Scheduler
public:
Scheduler();

faabric::util::SchedulingDecision makeSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingTopologyHint topologyHint =
faabric::util::SchedulingTopologyHint::NONE);

void callFunction(faabric::Message& msg, bool forceLocal = false);

faabric::util::SchedulingDecision callFunctions(
Expand Down Expand Up @@ -176,6 +179,9 @@ class Scheduler

void setThreadResultLocally(uint32_t msgId, int32_t returnValue);

std::vector<std::pair<uint32_t, int32_t>> awaitThreadResults(
std::shared_ptr<faabric::BatchExecuteRequest> req);

int32_t awaitThreadResult(uint32_t messageId);

void registerThread(uint32_t msgId);
Expand Down Expand Up @@ -281,14 +287,15 @@ class Scheduler

std::unordered_map<std::string, std::set<std::string>> registeredHosts;

faabric::util::SchedulingDecision makeSchedulingDecision(
faabric::util::SchedulingDecision doSchedulingDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingTopologyHint topologyHint);

faabric::util::SchedulingDecision doCallFunctions(
std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& decision,
faabric::util::FullLock& lock);
faabric::util::FullLock& lock,
faabric::util::SchedulingTopologyHint topologyHint);

std::shared_ptr<Executor> claimExecutor(
faabric::Message& msg,
Expand Down
60 changes: 55 additions & 5 deletions include/faabric/util/scheduling.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <vector>

#include <faabric/proto/faabric.pb.h>
#include <faabric/util/locks.h>

namespace faabric::util {

Expand Down Expand Up @@ -33,6 +34,8 @@ class SchedulingDecision

std::string returnHost;

bool isSingleHost();

void addMessage(const std::string& host, const faabric::Message& msg);

void addMessage(const std::string& host, int32_t messageId, int32_t appIdx);
Expand All @@ -45,8 +48,8 @@ class SchedulingDecision

// Scheduling topology hints help the scheduler decide which host to assign new
// requests in a batch.
// - NORMAL: bin-packs requests to slots in hosts starting from the master
// host, and overloadds the master if it runs out of resources.
// - NONE: bin-packs requests to slots in hosts starting from the master
// host, and overloadds the master if it runs out of resources.
// - FORCE_LOCAL: force local execution irrespective of the available
// resources.
// - NEVER_ALONE: never allocates a single (non-master) request to a host
Expand All @@ -55,7 +58,8 @@ class SchedulingDecision
// migration opportunities to appear.
enum SchedulingTopologyHint
{
NORMAL,
NONE,
CACHED,
FORCE_LOCAL,
NEVER_ALONE,
UNDERFULL,
Expand All @@ -65,20 +69,66 @@ enum SchedulingTopologyHint
// around
const std::unordered_map<std::string, SchedulingTopologyHint>
strToTopologyHint = {
{ "NORMAL", SchedulingTopologyHint::NORMAL },
{ "NONE", SchedulingTopologyHint::NONE },
{ "CACHED", SchedulingTopologyHint::CACHED },
{ "FORCE_LOCAL", SchedulingTopologyHint::FORCE_LOCAL },
{ "NEVER_ALONE", SchedulingTopologyHint::NEVER_ALONE },
{ "UNDERFULL", SchedulingTopologyHint::UNDERFULL },
};

const std::unordered_map<SchedulingTopologyHint, std::string>
topologyHintToStr = {
{ SchedulingTopologyHint::NORMAL, "NORMAL" },
{ SchedulingTopologyHint::NONE, "NONE" },
{ SchedulingTopologyHint::CACHED, "CACHED" },
{ SchedulingTopologyHint::FORCE_LOCAL, "FORCE_LOCAL" },
{ SchedulingTopologyHint::NEVER_ALONE, "NEVER_ALONE" },
{ SchedulingTopologyHint::UNDERFULL, "UNDERFULL" },
};

/**
* A record of a decision already taken for the given size of batch request
* for the given function. This doesn't contain the messages themselves,
* just the hosts and group ID that was used.
*/
class CachedDecision
{
public:
CachedDecision(const std::vector<std::string>& hostsIn, int groupIdIn);

std::vector<std::string> getHosts() { return hosts; }

int getGroupId() const { return groupId; }

private:
std::vector<std::string> hosts;
int groupId = 0;
};

/**
* Repository for cached scheduling decisions. Object is not thread safe as we
* assume only a single executor will be caching decisions for a given function
* and size of batch request on one host at a time.
*/
class DecisionCache
{
public:
std::shared_ptr<CachedDecision> getCachedDecision(
std::shared_ptr<faabric::BatchExecuteRequest> req);

void addCachedDecision(std::shared_ptr<faabric::BatchExecuteRequest> req,
faabric::util::SchedulingDecision& decision);

void clear();

private:
std::string getCacheKey(std::shared_ptr<faabric::BatchExecuteRequest> req);

std::unordered_map<std::string, std::shared_ptr<CachedDecision>>
cachedDecisions;
};

DecisionCache& getSchedulingDecisionCache();

// Migration strategies help the scheduler decide wether the scheduling decision
// for a batch request could be changed with the new set of available resources.
// - BIN_PACK: sort hosts by the number of functions from the batch they are
Expand Down
4 changes: 4 additions & 0 deletions src/proto/faabric.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ message BatchExecuteRequest {
// Arbitrary context for this batch
int32 subType = 6;
bytes contextData = 7;

// Flag set by the scheduler when this batch is all executing on a single
// host
bool singleHost = 8;
}

message HostResources {
Expand Down
Loading