Skip to content

Commit 754ee92

Browse files
committed
scheduler: add logic for check for migration opportunities method
1 parent 4eef2d3 commit 754ee92

File tree

6 files changed

+486
-5
lines changed

6 files changed

+486
-5
lines changed

include/faabric/scheduler/Scheduler.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222

2323
namespace faabric::scheduler {
2424

25+
typedef std::pair<std::shared_ptr<BatchExecuteRequest>,
26+
std::shared_ptr<faabric::util::SchedulingDecision>>
27+
InFlightPair;
28+
2529
class Scheduler;
2630

2731
Scheduler& getScheduler();
@@ -223,7 +227,12 @@ class Scheduler
223227
// ----------------------------------
224228
// Function Migration
225229
// ----------------------------------
226-
void checkForMigrationOpportunities();
230+
void checkForMigrationOpportunities(
231+
faabric::util::MigrationStrategy =
232+
faabric::util::MigrationStrategy::BIN_PACK);
233+
234+
std::shared_ptr<faabric::PendingMigrations> canAppBeMigrated(
235+
uint32_t appId);
227236

228237
private:
229238
std::string thisHost;
@@ -295,6 +304,11 @@ class Scheduler
295304

296305
// ---- Point-to-point ----
297306
faabric::transport::PointToPointBroker& broker;
307+
308+
// ---- Function migration ----
309+
std::unordered_map<uint32_t, InFlightPair> inFlightRequests;
310+
std::unordered_map<uint32_t, std::shared_ptr<faabric::PendingMigrations>>
311+
pendingMigrations;
298312
};
299313

300314
}

include/faabric/util/scheduling.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,16 @@ enum SchedulingTopologyHint
5454
FORCE_LOCAL,
5555
NEVER_ALONE
5656
};
57+
58+
// Migration strategies help the scheduler decide wether the scheduling decision
59+
// for a batch request could be changed with the new set of available resources.
60+
// - BIN_PACK: sort hosts by the number of functions from the batch they are
61+
// running. Bin-pack batches in increasing order to hosts in
62+
// decreasing order.
63+
// - EMPTY_HOSTS: pack batches in increasing order to empty hosts.
64+
enum MigrationStrategy
65+
{
66+
BIN_PACK,
67+
EMPTY_HOSTS
68+
};
5769
}

src/proto/faabric.proto

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,3 +245,20 @@ message PointToPointMappings {
245245

246246
repeated PointToPointMapping mappings = 3;
247247
}
248+
249+
// ---------------------------------------------
250+
// FUNCTION MIGRATIONS
251+
// ---------------------------------------------
252+
253+
message PendingMigrations {
254+
int32 appId = 1;
255+
int32 groupId = 2;
256+
257+
message PendingMigration {
258+
int32 messageId = 1;
259+
string srcHost = 2;
260+
string dstHost = 3;
261+
}
262+
263+
repeated PendingMigration migrations = 3;
264+
}

src/scheduler/Scheduler.cpp

Lines changed: 167 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,10 @@ void Scheduler::reset()
137137
threadResults.clear();
138138
pushedSnapshotsMap.clear();
139139

140+
// Reset function migration tracking
141+
inFlightRequests.clear();
142+
pendingMigrations.clear();
143+
140144
// Records
141145
recordedMessagesAll.clear();
142146
recordedMessagesLocal.clear();
@@ -452,6 +456,25 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions(
452456
throw std::runtime_error("Invalid scheduler hint for messages");
453457
}
454458

459+
// Record in-flight request if function desires to be migrated
460+
if (firstMsg.migrationcheckperiod() > 0) {
461+
auto decisionPtr =
462+
std::make_shared<faabric::util::SchedulingDecision>(decision);
463+
inFlightRequests[decision.appId] = std::make_pair(req, decisionPtr);
464+
/*
465+
if (inFlightRequests.size() == 1) {
466+
functionMigrationThread.start(firstMsg.migrationcheckperiod());
467+
} else if (firstMsg.migrationcheckperiod() !=
468+
functionMigrationThread.wakeUpPeriodSeconds) {
469+
SPDLOG_WARN("Ignoring migration check period as the migration"
470+
"thread was initialised with a different one."
471+
"(provided: {}, current: {})",
472+
firstMsg.migrationcheckperiod(),
473+
functionMigrationThread.wakeUpPeriodSeconds);
474+
}
475+
*/
476+
}
477+
455478
// NOTE: we want to schedule things on this host _last_, otherwise functions
456479
// may start executing before all messages have been dispatched, thus
457480
// slowing the remaining scheduling.
@@ -889,6 +912,20 @@ void Scheduler::setFunctionResult(faabric::Message& msg)
889912
// Write the successful result to the result queue
890913
std::vector<uint8_t> inputData = faabric::util::messageToBytes(msg);
891914
redis.publishSchedulerResult(key, msg.statuskey(), inputData);
915+
916+
// Remove the app from in-flight map if still there, and this host is the
917+
// master host for the message
918+
if (msg.masterhost() == thisHost) {
919+
faabric::util::FullLock lock(mx);
920+
921+
inFlightRequests.erase(msg.appid());
922+
pendingMigrations.erase(msg.appid());
923+
// If there are no more apps to track, stop the thread checking for
924+
// migration opportunities
925+
if (inFlightRequests.size() == 0) {
926+
// functionMigrationThread.stop();
927+
}
928+
}
892929
}
893930

894931
void Scheduler::registerThread(uint32_t msgId)
@@ -1130,8 +1167,136 @@ ExecGraphNode Scheduler::getFunctionExecGraphNode(unsigned int messageId)
11301167
return node;
11311168
}
11321169

1133-
void Scheduler::checkForMigrationOpportunities()
1170+
void Scheduler::checkForMigrationOpportunities(
1171+
faabric::util::MigrationStrategy migrationStrategy)
11341172
{
1135-
SPDLOG_INFO("Not implemented");
1173+
// Vector to cache all migrations we have to do, and update the shared map
1174+
// at the very end just once. This is because we need a unique lock to write
1175+
// to the shared map, but the rest of this method can do with a shared lock.
1176+
std::vector<std::shared_ptr<faabric::PendingMigrations>>
1177+
tmpPendingMigrations;
1178+
1179+
{
1180+
faabric::util::SharedLock lock(mx);
1181+
1182+
// For each in-flight request, check if there is an opportunity to
1183+
// migrate
1184+
for (const auto& app : inFlightRequests) {
1185+
auto req = app.second.first;
1186+
auto originalDecision = *app.second.second;
1187+
1188+
// If we have already recorded a pending migration for this req,
1189+
// skip
1190+
if (canAppBeMigrated(originalDecision.appId) != nullptr) {
1191+
continue;
1192+
}
1193+
1194+
faabric::PendingMigrations msg;
1195+
msg.set_appid(originalDecision.appId);
1196+
// TODO - generate a new groupId here for processes to wait on
1197+
// during the migration? msg.set_groupid();
1198+
1199+
if (migrationStrategy ==
1200+
faabric::util::MigrationStrategy::BIN_PACK) {
1201+
// We assume the batch was originally scheduled using
1202+
// bin-packing, thus the scheduling decision has at the begining
1203+
// (left) the hosts with the most allocated requests, and at the
1204+
// end (right) the hosts with the fewest. To check for migration
1205+
// oportunities, we compare a pointer to the possible
1206+
// destination of the migration (left), with one to the possible
1207+
// source of the migration (right). NOTE - this is a slight
1208+
// simplification, but makes the code simpler.
1209+
auto left = originalDecision.hosts.begin();
1210+
auto right = originalDecision.hosts.end() - 1;
1211+
faabric::HostResources r = (*left == thisHost)
1212+
? getThisHostResources()
1213+
: getHostResources(*left);
1214+
auto nAvailable = [&r]() -> int {
1215+
return r.slots() - r.usedslots();
1216+
};
1217+
auto claimSlot = [&r]() {
1218+
int currentUsedSlots = r.usedslots();
1219+
SPDLOG_INFO("Old slots: {} - New slots: {}",
1220+
currentUsedSlots,
1221+
currentUsedSlots + 1);
1222+
r.set_usedslots(currentUsedSlots + 1);
1223+
};
1224+
while (left < right) {
1225+
// If both pointers point to the same host, no migration
1226+
// opportunity, and must check another possible source of
1227+
// the migration
1228+
if (*left == *right) {
1229+
--right;
1230+
continue;
1231+
}
1232+
1233+
// If the left pointer (possible destination of the
1234+
// migration) is out of available resources, no migration
1235+
// opportunity, and must check another possible destination
1236+
// of migration
1237+
if (nAvailable() == 0) {
1238+
auto oldHost = *left;
1239+
++left;
1240+
if (*left != oldHost) {
1241+
r = (*left == thisHost) ? getThisHostResources()
1242+
: getHostResources(*left);
1243+
}
1244+
continue;
1245+
}
1246+
1247+
// If each pointer points to a request scheduled in a
1248+
// different host, and the possible destination has slots,
1249+
// there is a migration opportunity
1250+
auto* migration = msg.add_migrations();
1251+
auto msgIdPtr =
1252+
originalDecision.messageIds.begin() +
1253+
std::distance(originalDecision.hosts.begin(), right);
1254+
migration->set_messageid(*msgIdPtr);
1255+
migration->set_srchost(*right);
1256+
migration->set_dsthost(*left);
1257+
// Decrement by one the availability, and check for more
1258+
// possible sources of migration
1259+
claimSlot();
1260+
--right;
1261+
}
1262+
} else {
1263+
SPDLOG_ERROR("Unrecognised migration strategy: {}",
1264+
migrationStrategy);
1265+
throw std::runtime_error("Unrecognised migration strategy.");
1266+
}
1267+
1268+
if (msg.migrations_size() > 0) {
1269+
tmpPendingMigrations.emplace_back(
1270+
std::make_shared<faabric::PendingMigrations>(msg));
1271+
SPDLOG_DEBUG("Detected migration opportunity for app: {}",
1272+
msg.appid());
1273+
} else {
1274+
SPDLOG_DEBUG("No migration opportunity detected for app: {}",
1275+
msg.appid());
1276+
}
1277+
}
1278+
}
1279+
1280+
// Finally, store all the pending migrations in the shared map acquiring
1281+
// a unique lock.
1282+
if (tmpPendingMigrations.size() > 0) {
1283+
faabric::util::FullLock lock(mx);
1284+
for (auto msgPtr : tmpPendingMigrations) {
1285+
SPDLOG_INFO("Adding app: {}", msgPtr->appid());
1286+
pendingMigrations[msgPtr->appid()] = std::move(msgPtr);
1287+
}
1288+
}
1289+
}
1290+
1291+
std::shared_ptr<faabric::PendingMigrations> Scheduler::canAppBeMigrated(
1292+
uint32_t appId)
1293+
{
1294+
faabric::util::SharedLock lock(mx);
1295+
1296+
if (pendingMigrations.find(appId) == pendingMigrations.end()) {
1297+
return nullptr;
1298+
}
1299+
1300+
return pendingMigrations[appId];
11361301
}
11371302
}

tests/test/scheduler/test_executor.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,18 @@ int32_t TestExecutor::executeTask(
201201
throw std::runtime_error("This is a test error");
202202
}
203203

204+
if (msg.function() == "migration") {
205+
// Sleep for sufficiently more than the check period
206+
SPDLOG_DEBUG("Migration test function going to sleep");
207+
SLEEP_MS(SHORT_TEST_TIMEOUT_MS);
208+
SPDLOG_DEBUG("Migration test function waking up");
209+
210+
msg.set_outputdata(
211+
fmt::format("Migration test function {} executed", msg.id()));
212+
213+
return 0;
214+
}
215+
204216
if (reqOrig->type() == faabric::BatchExecuteRequest::THREADS) {
205217
SPDLOG_DEBUG("TestExecutor executing simple thread {}", msg.id());
206218
return msg.id() / 100;

0 commit comments

Comments
 (0)