diff --git a/src/scheduling/firmament_scheduler.proto b/src/scheduling/firmament_scheduler.proto index a1cc3a778..61d7d8b04 100644 --- a/src/scheduling/firmament_scheduler.proto +++ b/src/scheduling/firmament_scheduler.proto @@ -36,6 +36,9 @@ message ScheduleRequest {} message SchedulingDeltas { repeated SchedulingDelta deltas = 1; + // Added support for events. Added field to collect + // unscheduled tasks in a scheduling round. + repeated uint64 unscheduled_tasks = 2; } message TaskCompletedResponse { diff --git a/src/scheduling/firmament_scheduler_service.cc b/src/scheduling/firmament_scheduler_service.cc index 212e87b91..6e56e36e9 100644 --- a/src/scheduling/firmament_scheduler_service.cc +++ b/src/scheduling/firmament_scheduler_service.cc @@ -53,13 +53,15 @@ using firmament::scheduler::SimpleScheduler; using firmament::scheduler::TopologyManager; using firmament::platform::sim::SimulatedMessagingAdapter; +DECLARE_bool(gather_unscheduled_tasks); DEFINE_string(firmament_scheduler_service_address, "127.0.0.1", "The address of the scheduler service"); DEFINE_string(firmament_scheduler_service_port, "9090", "The port of the scheduler service"); DECLARE_bool(resource_stats_update_based_on_resource_reservation); DEFINE_string(service_scheduler, "flow", "Scheduler to use: flow | simple"); -DEFINE_uint64(queue_based_scheduling_time, 100, "Queue Based Schedule run time"); +DEFINE_uint64(queue_based_scheduling_time, 100, + "Queue Based Schedule run time"); namespace firmament { @@ -83,6 +85,9 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { knowledge_base_, topology_manager_, sim_messaging_adapter_, NULL, top_level_res_id_, "", &wall_time_, trace_generator_, &labels_map_, &affinity_antiaffinity_tasks_); + // Get cost model pointer to clear unscheduled tasks of previous + // scheduling round and get unscheduled tasks of current scheduling round. + cost_model_ = dynamic_cast(scheduler_)->cost_model(); } else if (FLAGS_service_scheduler == "simple") { scheduler_ = new SimpleScheduler( job_map_, resource_map_, @@ -175,21 +180,75 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { SchedulingDeltas* reply) override { boost::lock_guard lock( scheduler_->scheduling_lock_); + // Clear unscheduled tasks related maps and sets of previous scheduling + // round. + if (FLAGS_gather_unscheduled_tasks) { + cost_model_->ClearUnscheduledTasksData(); + } SchedulerStats sstat; vector deltas; + // Schedule tasks which does not have pod affinity/anti-affinity + // requirements. scheduler_->ScheduleAllJobs(&sstat, &deltas); + uint64_t total_unsched_tasks_size = 0; + if (FLAGS_gather_unscheduled_tasks) { + // Get unscheduled tasks of above scheduling round. + vector unscheduled_normal_tasks; + cost_model_->GetUnscheduledTasks(&unscheduled_normal_tasks); + auto unscheduled_normal_tasks_ret = reply->mutable_unscheduled_tasks(); + for (auto& unsched_task : unscheduled_normal_tasks) { + uint64_t* unsched_task_ret = unscheduled_normal_tasks_ret->Add(); + *unsched_task_ret = unsched_task; + total_unsched_tasks_size++; + } + // Clear unscheduled tasks related maps and sets. + cost_model_->ClearUnscheduledTasksData(); + } + + // Schedule tasks having pod affinity/anti-affinity. clock_t start = clock(); uint64_t elapsed = 0; - // Schedule tasks having pod affinity/anti-affinity + unordered_set unscheduled_affinity_tasks_set; + vector unscheduled_affinity_tasks; while (affinity_antiaffinity_tasks_.size() && (elapsed < FLAGS_queue_based_scheduling_time)) { - scheduler_->ScheduleAllQueueJobs(&sstat, &deltas); + uint64_t task_scheduled = + scheduler_->ScheduleAllQueueJobs(&sstat, &deltas); + TaskID_t task_id = dynamic_cast(scheduler_) + ->GetSingleTaskTobeScheduled(); + if (FLAGS_gather_unscheduled_tasks) { + if (!task_scheduled) { + if (unscheduled_affinity_tasks_set.find(task_id) == + unscheduled_affinity_tasks_set.end()) { + unscheduled_affinity_tasks_set.insert(task_id); + unscheduled_affinity_tasks.push_back(task_id); + } + } else { + unscheduled_affinity_tasks_set.erase(task_id); + } + } clock_t stop = clock(); elapsed = (double)(stop - start) * 1000.0 / CLOCKS_PER_SEC; } - // Extract results - if (deltas.size()) { - LOG(INFO) << "Got " << deltas.size() << " scheduling deltas"; + // Get unscheduled tasks of above scheduling round which tried scheduling + // tasks having pod affinity/anti-affinity. And populate the same into + // reply. + if (FLAGS_gather_unscheduled_tasks) { + auto unscheduled_affinity_tasks_ret = reply->mutable_unscheduled_tasks(); + for (auto& unsched_task : unscheduled_affinity_tasks) { + if (unscheduled_affinity_tasks_set.find(unsched_task) != + unscheduled_affinity_tasks_set.end()) { + uint64_t* unsched_task_ret = unscheduled_affinity_tasks_ret->Add(); + *unsched_task_ret = unsched_task; + total_unsched_tasks_size++; + } + } + } + + // Extract scheduling results. + LOG(INFO) << "Got " << deltas.size() << " scheduling deltas"; + if (FLAGS_gather_unscheduled_tasks) { + LOG(INFO) << "Got " << total_unsched_tasks_size << " unscheduled tasks"; } for (auto& d : deltas) { // LOG(INFO) << "Delta: " << d.DebugString(); @@ -575,9 +634,9 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { rs_ptr->mutable_topology_node(), *updated_rtnd_ptr, boost::bind(&FirmamentSchedulerServiceImpl::UpdateNodeLabels, this, _1, _2)); - DFSTraverseResourceProtobufTreesReturnRTNDs( - rs_ptr->mutable_topology_node(), *updated_rtnd_ptr, - boost::bind(&FirmamentSchedulerServiceImpl::UpdateNodeTaints, this, _1, + DFSTraverseResourceProtobufTreesReturnRTNDs( + rs_ptr->mutable_topology_node(), *updated_rtnd_ptr, + boost::bind(&FirmamentSchedulerServiceImpl::UpdateNodeTaints, this, _1, _2)); // TODO(ionel): Support other types of node updates. reply->set_type(NodeReplyType::NODE_UPDATED_OK); @@ -594,17 +653,17 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { label_ptr->CopyFrom(label); } } - + void UpdateNodeTaints(ResourceTopologyNodeDescriptor* old_rtnd_ptr, const ResourceTopologyNodeDescriptor& new_rtnd_ptr) { - ResourceDescriptor* old_rd_ptr = old_rtnd_ptr->mutable_resource_desc(); - const ResourceDescriptor& new_rd = new_rtnd_ptr.resource_desc(); - old_rd_ptr->clear_taints(); - for (const auto& taint : new_rd.taints()) { - Taint* taint_ptr = old_rd_ptr->add_taints(); - taint_ptr->CopyFrom(taint); - } - } + ResourceDescriptor* old_rd_ptr = old_rtnd_ptr->mutable_resource_desc(); + const ResourceDescriptor& new_rd = new_rtnd_ptr.resource_desc(); + old_rd_ptr->clear_taints(); + for (const auto& taint : new_rd.taints()) { + Taint* taint_ptr = old_rd_ptr->add_taints(); + taint_ptr->CopyFrom(taint); + } + } Status AddTaskStats(ServerContext* context, const TaskStats* task_stats, TaskStatsResponse* reply) override { @@ -682,6 +741,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { } boost::recursive_mutex task_submission_lock_; boost::recursive_mutex node_addition_lock_; + CostModelInterface* cost_model_; }; } // namespace firmament diff --git a/src/scheduling/flow/cost_model_interface.h b/src/scheduling/flow/cost_model_interface.h index 617df5ab2..5b34f18fb 100644 --- a/src/scheduling/flow/cost_model_interface.h +++ b/src/scheduling/flow/cost_model_interface.h @@ -248,6 +248,20 @@ class CostModelInterface { return delta; } + /** + * Clear unscheduled tasks related maps and sets. + */ + virtual void ClearUnscheduledTasksData() { + } + + /** + * Get all task ids which are connected to task EC, in turn task EC + * is not connected to any of the machine ecs. i.e get unscheduled + * tasks. + */ + virtual void GetUnscheduledTasks(vector* unscheduled_tasks_ptr) { + } + protected: shared_ptr flow_graph_manager_; }; diff --git a/src/scheduling/flow/cpu_cost_model.cc b/src/scheduling/flow/cpu_cost_model.cc index cd7e5f966..c3fef3215 100644 --- a/src/scheduling/flow/cpu_cost_model.cc +++ b/src/scheduling/flow/cpu_cost_model.cc @@ -33,6 +33,7 @@ DEFINE_uint64(max_multi_arcs_for_cpu, 50, "Maximum number of multi-arcs."); DECLARE_uint64(max_tasks_per_pu); +DECLARE_bool(gather_unscheduled_tasks); DECLARE_bool(pod_affinity_antiaffinity_symmetry); namespace firmament { @@ -64,7 +65,22 @@ void CpuCostModel::AccumulateResourceStats(ResourceDescriptor* accumulator, other->num_slots_below()); } -pair CpuCostModel::GetTaskMappingForSingleTask(TaskID_t task_id) { +// Events support for firmament. +void CpuCostModel::ClearUnscheduledTasksData() { + // Clear all map and sets related to unscheduled tasks. + task_ec_to_connected_tasks_.clear(); + task_ec_to_connected_tasks_set_.clear(); + task_ec_with_no_pref_arcs_.clear(); + task_ec_with_no_pref_arcs_set_.clear(); +} + +// Events support for firmament. +vector* CpuCostModel::GetTasksConnectedToTaskEC(EquivClass_t ec) { + return &task_ec_to_connected_tasks_[ec]; +} + +pair CpuCostModel::GetTaskMappingForSingleTask( + TaskID_t task_id) { // This function returns best matching resource for given taskid. vector* ecs = GetTaskEquivClasses(task_id); pair delta; @@ -82,6 +98,19 @@ pair CpuCostModel::GetTaskMappingForSingleTask(TaskID_t return delta; } +// Events support for firmament. +void CpuCostModel::GetUnscheduledTasks( + vector* unscheduled_tasks_ptr) { + // Return all tasks that are connected to task EC, where task EC has zero + // outgoing preferred arcs to machine ecs. + for (auto& ec : task_ec_with_no_pref_arcs_) { + vector* tasks_connected_to_ec = GetTasksConnectedToTaskEC(ec); + unscheduled_tasks_ptr->insert(std::end(*unscheduled_tasks_ptr), + std::begin(*tasks_connected_to_ec), + std::end(*tasks_connected_to_ec)); + } +} + ArcDescriptor CpuCostModel::TaskToUnscheduledAgg(TaskID_t task_id) { return ArcDescriptor(2560000, 1ULL, 0ULL); } @@ -267,46 +296,44 @@ ArcDescriptor CpuCostModel::EquivClassToEquivClass(EquivClass_t ec1, pod_affinity_normalized_score = pod_affinity_score.final_score; } } - //Expressing taints/tolerations priority scores - unordered_map>* - taints_priority_scores_ptr = - FindOrNull(ec_to_node_priority_scores, ec1); - CHECK_NOTNULL(taints_priority_scores_ptr); - PriorityScoresList_t* priority_scores_struct_ptr = - FindOrNull(*taints_priority_scores_ptr, *machine_res_id); - CHECK_NOTNULL(priority_scores_struct_ptr); - PriorityScore_t& taints_score = - priority_scores_struct_ptr->intolerable_taints_priority; - if (taints_score.satisfy) { - MinMaxScores_t* max_min_priority_scores = - FindOrNull(ec_to_max_min_priority_scores, ec1); - CHECK_NOTNULL(max_min_priority_scores); - if (taints_score.final_score == -1) { - // Normalised taints score is not calculated for this - // machine, so calculate and store it once. - int64_t max_score = - max_min_priority_scores->intolerable_taints_priority.max_score; - if (max_score) { - taints_score.final_score = (taints_score.score / (float)(max_score)) * omega_; - } - } - }else{ + // Expressing taints/tolerations priority scores + unordered_map>* taints_priority_scores_ptr = + FindOrNull(ec_to_node_priority_scores, ec1); + CHECK_NOTNULL(taints_priority_scores_ptr); + PriorityScoresList_t* priority_scores_struct_ptr = + FindOrNull(*taints_priority_scores_ptr, *machine_res_id); + CHECK_NOTNULL(priority_scores_struct_ptr); + PriorityScore_t& taints_score = + priority_scores_struct_ptr->intolerable_taints_priority; + if (taints_score.satisfy) { + MinMaxScores_t* max_min_priority_scores = + FindOrNull(ec_to_max_min_priority_scores, ec1); + CHECK_NOTNULL(max_min_priority_scores); + if (taints_score.final_score == -1) { + // Normalised taints score is not calculated for this + // machine, so calculate and store it once. + int64_t max_score = + max_min_priority_scores->intolerable_taints_priority.max_score; + if (max_score) { + taints_score.final_score = + (taints_score.score / (float)(max_score)) * omega_; + } + } + } else { + taints_score.final_score = 0; + } - taints_score.final_score = 0; - } - cost_vector.node_affinity_soft_cost_ = omega_ - node_affinity_normalized_score; cost_vector.pod_affinity_soft_cost_ = omega_ - pod_affinity_normalized_score; cost_vector.intolerable_taints_cost_ = taints_score.final_score; - - + Cost_t final_cost = FlattenCostVector(cost_vector); - //Added for solver + // Added for solver if (pod_affinity_or_anti_affinity_task) { - ResourceID_t current_resource_id = - ResourceIDFromString(rs->topology_node().children(0).resource_desc().uuid()); + ResourceID_t current_resource_id = ResourceIDFromString( + rs->topology_node().children(0).resource_desc().uuid()); ResourceID_t* res_id = FindOrNull(ec_to_best_fit_resource_, ec1); if (!res_id) { ec_to_min_cost_[ec1] = final_cost; @@ -415,7 +442,8 @@ vector* CpuCostModel::GetTaskEquivClasses(TaskID_t task_id) { CHECK_NOTNULL(task_resource_request); size_t task_agg = 0; bool pod_antiaffinity_symmetry = false; - if (td_ptr->has_affinity() || (td_ptr->tolerations_size() > DEFAULT_TOLERATIONS)) { + if (td_ptr->has_affinity() || + (td_ptr->tolerations_size() > DEFAULT_TOLERATIONS)) { // For tasks which has affinity requirements, we hash the job id. // TODO(jagadish): This hash has to be handled in an efficient way in // future. @@ -453,6 +481,31 @@ vector* CpuCostModel::GetTaskEquivClasses(TaskID_t task_id) { if (pod_antiaffinity_symmetry) { ecs_with_pod_antiaffinity_symmetry_.insert(resource_request_ec); } + if (FLAGS_gather_unscheduled_tasks) { + // When preemption is enabled, there is a chance that task can be in + // 'RUNNING' state. Skip such tasks. You may need to revisit this code once + // preemption code is completed. + if (td_ptr->state() != TaskDescriptor::RUNNING) { + auto it = task_ec_to_connected_tasks_.find(resource_request_ec); + // Make sure we store unique unscheduled task entries. + if (it != task_ec_to_connected_tasks_.end()) { + unordered_set& connected_tasks_set = + task_ec_to_connected_tasks_set_[resource_request_ec]; + if (connected_tasks_set.find(task_id) == connected_tasks_set.end()) { + it->second.push_back(task_id); + connected_tasks_set.insert(task_id); + } + } else { + vector connected_tasks; + unordered_set connected_tasks_set; + connected_tasks.push_back(task_id); + task_ec_to_connected_tasks_[resource_request_ec] = connected_tasks; + connected_tasks_set.insert(task_id); + task_ec_to_connected_tasks_set_[resource_request_ec] = + connected_tasks_set; + } + } + } return ecs; } @@ -563,122 +616,114 @@ void CpuCostModel::CalculatePrioritiesCost(const EquivClass_t ec, } } - -//Taints and Tolerations +// Taints and Tolerations void CpuCostModel::CalculateIntolerableTaintsCost(const ResourceDescriptor& rd, - const TaskDescriptor* td_ptr, - const EquivClass_t ec ){ - bool IsTolerable = false; - int64_t intolerable_taint_cost = 0; - for (const auto& tolerations: td_ptr->tolerations()) { - if (tolerations.effect() == "PreferNoSchedule" || tolerations.effect() == ""){ - if(tolerations.operator_() == "Exists"){ - if (tolerations.key() != ""){ - InsertIfNotPresent(&tolerationSoftExistsMap,tolerations.key(),tolerations.value()); - }else{ - IsTolerable = true; - } - - } - else if ((tolerations.operator_() == "Equal") || (tolerations.operator_() == "")){ - - InsertIfNotPresent(&tolerationSoftEqualMap,tolerations.key(),tolerations.value()); - - } - else{ - LOG(FATAL) << "Unsupported operator :" << tolerations.operator_(); - break; - } - - } - - } + const TaskDescriptor* td_ptr, + const EquivClass_t ec) { + bool IsTolerable = false; + int64_t intolerable_taint_cost = 0; + for (const auto& tolerations : td_ptr->tolerations()) { + if (tolerations.effect() == "PreferNoSchedule" || + tolerations.effect() == "") { + if (tolerations.operator_() == "Exists") { + if (tolerations.key() != "") { + InsertIfNotPresent(&tolerationSoftExistsMap, tolerations.key(), + tolerations.value()); + } else { + IsTolerable = true; + } - if (!IsTolerable){ - for (const auto& taint : rd.taints()){ - if (taint.effect() == "PreferNoSchedule"){ - //If the key does not exist in Exists Map, look for Equal Map for any matching key and value + } else if ((tolerations.operator_() == "Equal") || + (tolerations.operator_() == "")) { + InsertIfNotPresent(&tolerationSoftEqualMap, tolerations.key(), + tolerations.value()); - if (!ContainsKey(tolerationSoftExistsMap,taint.key())){ - const string* value = FindOrNull(tolerationSoftEqualMap, taint.key()); + } else { + LOG(FATAL) << "Unsupported operator :" << tolerations.operator_(); + break; + } + } + } - //If key is found, then value is not NULL - if (value != NULL) { - //Check if the value matches for the found key - if ((*value) != taint.value()) { - intolerable_taint_cost = intolerable_taint_cost + 1; - } - }else{ //If the key is not found, then taint is not tolerable - intolerable_taint_cost = intolerable_taint_cost + 1; - } + if (!IsTolerable) { + for (const auto& taint : rd.taints()) { + if (taint.effect() == "PreferNoSchedule") { + // If the key does not exist in Exists Map, look for Equal Map for any + // matching key and value - } + if (!ContainsKey(tolerationSoftExistsMap, taint.key())) { + const string* value = FindOrNull(tolerationSoftEqualMap, taint.key()); - } - } - } - // Fill the intolerable taints priority min, max and actual scores which will - // be used in cost calculation. - unordered_map>* - taints_priority_scores_ptr = - FindOrNull(ec_to_node_priority_scores, ec); - if (!taints_priority_scores_ptr) { - // For this EC, no node to priority scores map exists, so initialize - // it. - unordered_map> - node_to_priority_scores_map; - InsertIfNotPresent(&ec_to_node_priority_scores, ec, - node_to_priority_scores_map); - taints_priority_scores_ptr = - FindOrNull(ec_to_node_priority_scores, ec); - } - CHECK_NOTNULL(taints_priority_scores_ptr); - ResourceID_t res_id = ResourceIDFromString(rd.uuid()); - PriorityScoresList_t* priority_scores_struct_ptr = - FindOrNull(*taints_priority_scores_ptr, res_id); - if (!priority_scores_struct_ptr) { - // Priority scores is empty for this node, so initialize it zero. - PriorityScoresList_t priority_scores_list; - InsertIfNotPresent(taints_priority_scores_ptr, res_id, - priority_scores_list); - priority_scores_struct_ptr = - FindOrNull(*taints_priority_scores_ptr, res_id); - } - CHECK_NOTNULL(priority_scores_struct_ptr); - // Store the intolerable taints min, max and actual priority scores that will - // be utilized in calculating normalized cost. - PriorityScore_t& taints_score = - priority_scores_struct_ptr->intolerable_taints_priority; - if (!intolerable_taint_cost) { - // If machine does not satisfies soft constraint then we flag machine - // such that cost of omega_ is used in cost calculation. - taints_score.satisfy = false; - } - if (taints_score.satisfy) { - // Machine satisfies soft constraints. - // Store the intolerable taints min, max and actual priority scores. - taints_score.score = intolerable_taint_cost; - MinMaxScores_t* max_min_priority_scores = - FindOrNull(ec_to_max_min_priority_scores, ec); - if (!max_min_priority_scores) { - MinMaxScores_t priority_scores_list; - InsertIfNotPresent(&ec_to_max_min_priority_scores, ec, - priority_scores_list); - max_min_priority_scores = - FindOrNull(ec_to_max_min_priority_scores, ec); - } - MinMaxScore_t& min_max_taints_score = - max_min_priority_scores->intolerable_taints_priority; - if (min_max_taints_score.max_score < intolerable_taint_cost || - min_max_taints_score.max_score == -1) { - min_max_taints_score.max_score = intolerable_taint_cost; + // If key is found, then value is not NULL + if (value != NULL) { + // Check if the value matches for the found key + if ((*value) != taint.value()) { + intolerable_taint_cost = intolerable_taint_cost + 1; + } + } else { // If the key is not found, then taint is not tolerable + intolerable_taint_cost = intolerable_taint_cost + 1; } - } + } } - - + } + } + // Fill the intolerable taints priority min, max and actual scores which will + // be used in cost calculation. + unordered_map>* taints_priority_scores_ptr = + FindOrNull(ec_to_node_priority_scores, ec); + if (!taints_priority_scores_ptr) { + // For this EC, no node to priority scores map exists, so initialize + // it. + unordered_map> + node_to_priority_scores_map; + InsertIfNotPresent(&ec_to_node_priority_scores, ec, + node_to_priority_scores_map); + taints_priority_scores_ptr = FindOrNull(ec_to_node_priority_scores, ec); + } + CHECK_NOTNULL(taints_priority_scores_ptr); + ResourceID_t res_id = ResourceIDFromString(rd.uuid()); + PriorityScoresList_t* priority_scores_struct_ptr = + FindOrNull(*taints_priority_scores_ptr, res_id); + if (!priority_scores_struct_ptr) { + // Priority scores is empty for this node, so initialize it zero. + PriorityScoresList_t priority_scores_list; + InsertIfNotPresent(taints_priority_scores_ptr, res_id, + priority_scores_list); + priority_scores_struct_ptr = + FindOrNull(*taints_priority_scores_ptr, res_id); + } + CHECK_NOTNULL(priority_scores_struct_ptr); + // Store the intolerable taints min, max and actual priority scores that will + // be utilized in calculating normalized cost. + PriorityScore_t& taints_score = + priority_scores_struct_ptr->intolerable_taints_priority; + if (!intolerable_taint_cost) { + // If machine does not satisfies soft constraint then we flag machine + // such that cost of omega_ is used in cost calculation. + taints_score.satisfy = false; + } + if (taints_score.satisfy) { + // Machine satisfies soft constraints. + // Store the intolerable taints min, max and actual priority scores. + taints_score.score = intolerable_taint_cost; + MinMaxScores_t* max_min_priority_scores = + FindOrNull(ec_to_max_min_priority_scores, ec); + if (!max_min_priority_scores) { + MinMaxScores_t priority_scores_list; + InsertIfNotPresent(&ec_to_max_min_priority_scores, ec, + priority_scores_list); + max_min_priority_scores = FindOrNull(ec_to_max_min_priority_scores, ec); + } + MinMaxScore_t& min_max_taints_score = + max_min_priority_scores->intolerable_taints_priority; + if (min_max_taints_score.max_score < intolerable_taint_cost || + min_max_taints_score.max_score == -1) { + min_max_taints_score.max_score = intolerable_taint_cost; + } + } +} // Pod affinity/anti-affinity bool CpuCostModel::MatchExpressionWithPodLabels( @@ -880,7 +925,7 @@ bool CpuCostModel::SatisfiesPodAntiAffinityTerm( return false; } } - namespaces.clear(); + namespaces.clear(); return true; } @@ -1030,7 +1075,6 @@ int64_t CpuCostModel::CalculatePodAntiAffinitySymmetryPreference( return sum_of_weights; } - // Soft constraint check for pod affinity/anti-affinity. void CpuCostModel::CalculatePodAffinityAntiAffinityPreference( const ResourceDescriptor& rd, const TaskDescriptor& td, @@ -1116,10 +1160,8 @@ void CpuCostModel::CalculatePodAffinityAntiAffinityPreference( FindOrNull(*nodes_priority_scores_ptr, res_id); if (!priority_scores_struct_ptr) { PriorityScoresList_t priority_scores_list; - InsertIfNotPresent(nodes_priority_scores_ptr, res_id, - priority_scores_list); - priority_scores_struct_ptr = - FindOrNull(*nodes_priority_scores_ptr, res_id); + InsertIfNotPresent(nodes_priority_scores_ptr, res_id, priority_scores_list); + priority_scores_struct_ptr = FindOrNull(*nodes_priority_scores_ptr, res_id); } CHECK_NOTNULL(priority_scores_struct_ptr); PriorityScore_t& pod_affinity_score = @@ -1360,9 +1402,9 @@ vector* CpuCostModel::GetEquivClassToEquivClassesArcs( // scores. But we are not clearing it just after scheduling round completed, // we are clearing in the subsequent scheduling round, need to improve this. ec_to_node_priority_scores.clear(); - //Added to clear the map before filling the values for each node - tolerationSoftEqualMap.clear(); - tolerationSoftExistsMap.clear(); + // Added to clear the map before filling the values for each node + tolerationSoftEqualMap.clear(); + tolerationSoftExistsMap.clear(); for (auto& ec_machines : ecs_for_machines_) { ResourceStatus* rs = FindPtrOrNull(*resource_map_, ec_machines.first); CHECK_NOTNULL(rs); @@ -1381,19 +1423,22 @@ vector* CpuCostModel::GetEquivClassToEquivClassesArcs( } else { continue; } - //Check whether taints in the machine has matching tolerations - if (scheduler::HasMatchingTolerationforNodeTaints(rd, *td_ptr)) { - CalculateIntolerableTaintsCost(rd, td_ptr, ec); - }else { - continue; + // Check whether taints in the machine has matching tolerations + if (scheduler::HasMatchingTolerationforNodeTaints(rd, *td_ptr)) { + CalculateIntolerableTaintsCost(rd, td_ptr, ec); + } else { + continue; } - // Checking costs for intolerable taints - + // Checking costs for intolerable taints + // Checking pod anti-affinity symmetry - if (FLAGS_pod_affinity_antiaffinity_symmetry && (ecs_with_pod_antiaffinity_symmetry_.find(ec) != ecs_with_pod_antiaffinity_symmetry_.end())) { + if (FLAGS_pod_affinity_antiaffinity_symmetry && + (ecs_with_pod_antiaffinity_symmetry_.find(ec) != + ecs_with_pod_antiaffinity_symmetry_.end())) { if (td_ptr->labels_size()) { - if (SatisfiesPodAntiAffinitySymmetry(ResourceIDFromString(rd.uuid()), *td_ptr)) { - //Calculate soft constriants score if needed. + if (SatisfiesPodAntiAffinitySymmetry( + ResourceIDFromString(rd.uuid()), *td_ptr)) { + // Calculate soft constriants score if needed. } else { continue; } @@ -1414,8 +1459,8 @@ vector* CpuCostModel::GetEquivClassToEquivClassesArcs( uint64_t index = 0; CpuMemResVector_t cur_resource; uint64_t task_count = rd.num_running_tasks_below(); - //TODO(dujun) : FLAGS_max_tasks_per_pu is treated as equivalent to max-pods, - // as max-pods functionality is not yet merged at this point. + // TODO(pratik) : FLAGS_max_tasks_per_pu is treated as equivalent to + // max-pods, as max-pods functionality is not yet merged at this point. for (cur_resource = *task_resource_request; cur_resource.cpu_cores_ <= available_resources.cpu_cores_ && cur_resource.ram_cap_ <= available_resources.ram_cap_ && @@ -1428,6 +1473,19 @@ vector* CpuCostModel::GetEquivClassToEquivClassesArcs( pref_ecs->push_back(ec_machines.second[index]); } } + if (FLAGS_gather_unscheduled_tasks) { + if (pref_ecs->size() == 0) { + // So tasks connected to this task EC will never be scheduled, so populate + // this 'ec' to 'task_ec_with_no_pref_arcs_'. Reason why tasks not + // scheduled could be 1) Not enough cpu/mem 2) Any nodes not satisfying + // scheduling constraints like affinity etc. + if (task_ec_with_no_pref_arcs_set_.find(ec) == + task_ec_with_no_pref_arcs_set_.end()) { + task_ec_with_no_pref_arcs_.push_back(ec); + task_ec_with_no_pref_arcs_set_.insert(ec); + } + } + } } return pref_ecs; } diff --git a/src/scheduling/flow/cpu_cost_model.h b/src/scheduling/flow/cpu_cost_model.h index 5797bc6a4..d0b7c4bbb 100644 --- a/src/scheduling/flow/cpu_cost_model.h +++ b/src/scheduling/flow/cpu_cost_model.h @@ -203,6 +203,12 @@ class CpuCostModel : public CostModelInterface { void PrepareStats(FlowGraphNode* accumulator); FlowGraphNode* UpdateStats(FlowGraphNode* accumulator, FlowGraphNode* other); pair GetTaskMappingForSingleTask(TaskID_t task_id); + // Get all the tasks that are connected to task EC. + vector* GetTasksConnectedToTaskEC(TaskID_t task_id); + // Clear unscheduled tasks related maps and sets. + void ClearUnscheduledTasksData(); + // Get unscheduled tasks in a scheduling round. + void GetUnscheduledTasks(vector* unscheduled_tasks_ptr); private: // Fixed value for OMEGA, the normalization ceiling for each dimension's cost @@ -269,6 +275,11 @@ class CpuCostModel : public CostModelInterface { unordered_map ec_to_min_cost_; unordered_map tolerationSoftEqualMap; unordered_map tolerationSoftExistsMap; + unordered_set task_ec_with_no_pref_arcs_set_; + vector task_ec_with_no_pref_arcs_; + unordered_map> task_ec_to_connected_tasks_; + unordered_map> + task_ec_to_connected_tasks_set_; }; } // namespace firmament diff --git a/src/scheduling/flow/flow_scheduler.cc b/src/scheduling/flow/flow_scheduler.cc index e5a775d3a..6816af53b 100644 --- a/src/scheduling/flow/flow_scheduler.cc +++ b/src/scheduling/flow/flow_scheduler.cc @@ -53,6 +53,7 @@ DEFINE_uint64(max_solver_runtime, 100000000, "Maximum runtime of the solver in u-sec"); DEFINE_int64(time_dependent_cost_update_frequency, 10000000ULL, "Update frequency for time-dependent costs, in microseconds."); +DEFINE_bool(gather_unscheduled_tasks, true, "Gather unscheduled tasks"); DEFINE_bool(debug_cost_model, false, "Store cost model debug info in CSV files."); DEFINE_uint64(purge_unconnected_ec_frequency, 10, "Frequency in solver runs " @@ -568,6 +569,10 @@ uint64_t FlowScheduler::ScheduleJobs(const vector& jd_ptr_vect, // known before AddOrUpdateJobNodes is invoked below, as it may add arcs // depending on these metrics. UpdateCostModelResourceStats(); + if (FLAGS_gather_unscheduled_tasks) { + // Clear unscheduled tasks related maps and sets. + cost_model_->ClearUnscheduledTasksData(); + } flow_graph_manager_->AddOrUpdateJobNodes(jds_with_runnables); num_scheduled_tasks += RunSchedulingIteration(scheduler_stats, deltas, &jds_with_runnables); VLOG(1) << "STOP SCHEDULING, placed " << num_scheduled_tasks << " tasks"; @@ -611,6 +616,7 @@ uint64_t FlowScheduler::RunSchedulingIteration( // TODO(malte): this can be removed when we've factored archived tasks // and jobs out of the job_map_ into separate data structures. // (cf. issue #24). + /* vector job_vec; for (auto it = job_map_->begin(); it != job_map_->end(); @@ -622,8 +628,30 @@ uint64_t FlowScheduler::RunSchedulingIteration( job_vec.push_back(&it->second); } } + */ // This will re-visit all jobs and update their time-dependent costs + // Changed above code to revisit only jobs from job_vector not from + // job_map_ i.e, jobs with pod affinty and pod anti-affinity are handled + // in sepearte scheduling round even for time dependent costs update. + // Jobs with pod affinty/anti-affinty are scheduled one task at a time + // in a single scheduling round, whereas for other jobs tasks are + // scheduled in a batch. VLOG(1) << "Flow scheduler updating time-dependent costs."; + vector job_vec; + for (auto it = (*job_vector).begin(); + it != (*job_vector).end(); + ++it) { + // We only need to reconsider this job if it is still active + if ((*it)->state() != JobDescriptor::COMPLETED && + (*it)->state() != JobDescriptor::FAILED && + (*it)->state() != JobDescriptor::ABORTED) { + job_vec.push_back(*it); + } + } + if (FLAGS_gather_unscheduled_tasks) { + // Clear unscheduled tasks related maps and sets. + cost_model_->ClearUnscheduledTasksData(); + } flow_graph_manager_->UpdateTimeDependentCosts(job_vec); last_updated_time_dependent_costs_ = cur_time; } @@ -641,6 +669,8 @@ uint64_t FlowScheduler::RunSchedulingIteration( } else { string id = ((*job_vector)[0])->uuid(); TaskID_t single_task_id = *(runnable_tasks_[JobIDFromString(id)].begin()); + // Single task that needs to scheduled. + task_to_be_scheduled_ = single_task_id; pair single_delta = solver_dispatcher_->RunSimpleSolverForSingleTask(scheduler_stats, single_task_id); diff --git a/src/scheduling/flow/flow_scheduler.h b/src/scheduling/flow/flow_scheduler.h index aa9682a77..7da026039 100644 --- a/src/scheduling/flow/flow_scheduler.h +++ b/src/scheduling/flow/flow_scheduler.h @@ -108,10 +108,16 @@ class FlowScheduler : public EventDrivenScheduler { const CostModelInterface& cost_model() const { return *cost_model_; } + CostModelInterface* cost_model() { + return cost_model_; + } const SolverDispatcher& dispatcher() const { return *solver_dispatcher_; } + TaskID_t GetSingleTaskTobeScheduled() { + return task_to_be_scheduled_; + } protected: virtual void HandleTaskMigration(TaskDescriptor* td_ptr, ResourceDescriptor* rd_ptr); @@ -154,6 +160,8 @@ class FlowScheduler : public EventDrivenScheduler { DIMACSChangeStats* dimacs_stats_; uint64_t solver_run_cnt_; unordered_set resource_roots_; + // Single task that needs to scheduled in queue based scheduling round. + TaskID_t task_to_be_scheduled_; }; } // namespace scheduler