Skip to content

Commit

Permalink
Merge pull request #13 from shivramsrivastava/with_events_change
Browse files Browse the repository at this point in the history
Adding events feature in Firmament
  • Loading branch information
shivramsrivastava authored Oct 10, 2018
2 parents 0fb48c9 + c04a64b commit ce592fd
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 182 deletions.
3 changes: 3 additions & 0 deletions src/scheduling/firmament_scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ message ScheduleRequest {}

message SchedulingDeltas {
repeated SchedulingDelta deltas = 1;
// Added support for events. Added field to collect
// unscheduled tasks in a scheduling round.
repeated uint64 unscheduled_tasks = 2;
}

message TaskCompletedResponse {
Expand Down
96 changes: 78 additions & 18 deletions src/scheduling/firmament_scheduler_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ using firmament::scheduler::SimpleScheduler;
using firmament::scheduler::TopologyManager;
using firmament::platform::sim::SimulatedMessagingAdapter;

DECLARE_bool(gather_unscheduled_tasks);
DEFINE_string(firmament_scheduler_service_address, "127.0.0.1",
"The address of the scheduler service");
DEFINE_string(firmament_scheduler_service_port, "9090",
"The port of the scheduler service");
DECLARE_bool(resource_stats_update_based_on_resource_reservation);
DEFINE_string(service_scheduler, "flow", "Scheduler to use: flow | simple");
DEFINE_uint64(queue_based_scheduling_time, 100, "Queue Based Schedule run time");
DEFINE_uint64(queue_based_scheduling_time, 100,
"Queue Based Schedule run time");

namespace firmament {

Expand All @@ -83,6 +85,9 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
knowledge_base_, topology_manager_, sim_messaging_adapter_, NULL,
top_level_res_id_, "", &wall_time_, trace_generator_, &labels_map_,
&affinity_antiaffinity_tasks_);
// Get cost model pointer to clear unscheduled tasks of previous
// scheduling round and get unscheduled tasks of current scheduling round.
cost_model_ = dynamic_cast<FlowScheduler*>(scheduler_)->cost_model();
} else if (FLAGS_service_scheduler == "simple") {
scheduler_ = new SimpleScheduler(
job_map_, resource_map_,
Expand Down Expand Up @@ -175,21 +180,75 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
SchedulingDeltas* reply) override {
boost::lock_guard<boost::recursive_mutex> lock(
scheduler_->scheduling_lock_);
// Clear unscheduled tasks related maps and sets of previous scheduling
// round.
if (FLAGS_gather_unscheduled_tasks) {
cost_model_->ClearUnscheduledTasksData();
}
SchedulerStats sstat;
vector<SchedulingDelta> deltas;
// Schedule tasks which does not have pod affinity/anti-affinity
// requirements.
scheduler_->ScheduleAllJobs(&sstat, &deltas);
uint64_t total_unsched_tasks_size = 0;
if (FLAGS_gather_unscheduled_tasks) {
// Get unscheduled tasks of above scheduling round.
vector<uint64_t> unscheduled_normal_tasks;
cost_model_->GetUnscheduledTasks(&unscheduled_normal_tasks);
auto unscheduled_normal_tasks_ret = reply->mutable_unscheduled_tasks();
for (auto& unsched_task : unscheduled_normal_tasks) {
uint64_t* unsched_task_ret = unscheduled_normal_tasks_ret->Add();
*unsched_task_ret = unsched_task;
total_unsched_tasks_size++;
}
// Clear unscheduled tasks related maps and sets.
cost_model_->ClearUnscheduledTasksData();
}

// Schedule tasks having pod affinity/anti-affinity.
clock_t start = clock();
uint64_t elapsed = 0;
// Schedule tasks having pod affinity/anti-affinity
unordered_set<uint64_t> unscheduled_affinity_tasks_set;
vector<uint64_t> unscheduled_affinity_tasks;
while (affinity_antiaffinity_tasks_.size() &&
(elapsed < FLAGS_queue_based_scheduling_time)) {
scheduler_->ScheduleAllQueueJobs(&sstat, &deltas);
uint64_t task_scheduled =
scheduler_->ScheduleAllQueueJobs(&sstat, &deltas);
TaskID_t task_id = dynamic_cast<FlowScheduler*>(scheduler_)
->GetSingleTaskTobeScheduled();
if (FLAGS_gather_unscheduled_tasks) {
if (!task_scheduled) {
if (unscheduled_affinity_tasks_set.find(task_id) ==
unscheduled_affinity_tasks_set.end()) {
unscheduled_affinity_tasks_set.insert(task_id);
unscheduled_affinity_tasks.push_back(task_id);
}
} else {
unscheduled_affinity_tasks_set.erase(task_id);
}
}
clock_t stop = clock();
elapsed = (double)(stop - start) * 1000.0 / CLOCKS_PER_SEC;
}
// Extract results
if (deltas.size()) {
LOG(INFO) << "Got " << deltas.size() << " scheduling deltas";
// Get unscheduled tasks of above scheduling round which tried scheduling
// tasks having pod affinity/anti-affinity. And populate the same into
// reply.
if (FLAGS_gather_unscheduled_tasks) {
auto unscheduled_affinity_tasks_ret = reply->mutable_unscheduled_tasks();
for (auto& unsched_task : unscheduled_affinity_tasks) {
if (unscheduled_affinity_tasks_set.find(unsched_task) !=
unscheduled_affinity_tasks_set.end()) {
uint64_t* unsched_task_ret = unscheduled_affinity_tasks_ret->Add();
*unsched_task_ret = unsched_task;
total_unsched_tasks_size++;
}
}
}

// Extract scheduling results.
LOG(INFO) << "Got " << deltas.size() << " scheduling deltas";
if (FLAGS_gather_unscheduled_tasks) {
LOG(INFO) << "Got " << total_unsched_tasks_size << " unscheduled tasks";
}
for (auto& d : deltas) {
// LOG(INFO) << "Delta: " << d.DebugString();
Expand Down Expand Up @@ -575,9 +634,9 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
rs_ptr->mutable_topology_node(), *updated_rtnd_ptr,
boost::bind(&FirmamentSchedulerServiceImpl::UpdateNodeLabels, this, _1,
_2));
DFSTraverseResourceProtobufTreesReturnRTNDs(
rs_ptr->mutable_topology_node(), *updated_rtnd_ptr,
boost::bind(&FirmamentSchedulerServiceImpl::UpdateNodeTaints, this, _1,
DFSTraverseResourceProtobufTreesReturnRTNDs(
rs_ptr->mutable_topology_node(), *updated_rtnd_ptr,
boost::bind(&FirmamentSchedulerServiceImpl::UpdateNodeTaints, this, _1,
_2));
// TODO(ionel): Support other types of node updates.
reply->set_type(NodeReplyType::NODE_UPDATED_OK);
Expand All @@ -594,17 +653,17 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
label_ptr->CopyFrom(label);
}
}

void UpdateNodeTaints(ResourceTopologyNodeDescriptor* old_rtnd_ptr,
const ResourceTopologyNodeDescriptor& new_rtnd_ptr) {
ResourceDescriptor* old_rd_ptr = old_rtnd_ptr->mutable_resource_desc();
const ResourceDescriptor& new_rd = new_rtnd_ptr.resource_desc();
old_rd_ptr->clear_taints();
for (const auto& taint : new_rd.taints()) {
Taint* taint_ptr = old_rd_ptr->add_taints();
taint_ptr->CopyFrom(taint);
}
}
ResourceDescriptor* old_rd_ptr = old_rtnd_ptr->mutable_resource_desc();
const ResourceDescriptor& new_rd = new_rtnd_ptr.resource_desc();
old_rd_ptr->clear_taints();
for (const auto& taint : new_rd.taints()) {
Taint* taint_ptr = old_rd_ptr->add_taints();
taint_ptr->CopyFrom(taint);
}
}

Status AddTaskStats(ServerContext* context, const TaskStats* task_stats,
TaskStatsResponse* reply) override {
Expand Down Expand Up @@ -682,6 +741,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
}
boost::recursive_mutex task_submission_lock_;
boost::recursive_mutex node_addition_lock_;
CostModelInterface* cost_model_;
};

} // namespace firmament
Expand Down
14 changes: 14 additions & 0 deletions src/scheduling/flow/cost_model_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,20 @@ class CostModelInterface {
return delta;
}

/**
* Clear unscheduled tasks related maps and sets.
*/
virtual void ClearUnscheduledTasksData() {
}

/**
* Get all task ids which are connected to task EC, in turn task EC
* is not connected to any of the machine ecs. i.e get unscheduled
* tasks.
*/
virtual void GetUnscheduledTasks(vector<uint64_t>* unscheduled_tasks_ptr) {
}

protected:
shared_ptr<FlowGraphManager> flow_graph_manager_;
};
Expand Down
Loading

0 comments on commit ce592fd

Please sign in to comment.