diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 863ea11c8007..abc81e12ecc2 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -376,6 +376,19 @@ RAY_CONFIG(int64_t, min_spilling_size, 100 * 1024 * 1024) /// take more than this percentage of the available memory. RAY_CONFIG(float, object_spilling_threshold, 1.0) +RAY_CONFIG(float, block_tasks_threshold, 1.0) + +RAY_CONFIG(float, evict_tasks_threshold, 1.0) + +// Whether to use BlockTasks +RAY_CONFIG(bool, enable_BlockTasks, false) + +// Whether to block Spill at BlockTasks +RAY_CONFIG(bool, enable_BlockTasksSpill, false) + +// Whether to use EvictTasks when spill required +RAY_CONFIG(bool, enable_EvictTasks, false) + /// Maximum number of objects that can be fused into a single file. RAY_CONFIG(int64_t, max_fused_object_count, 2000) diff --git a/src/ray/common/task/task_priority.h b/src/ray/common/task/task_priority.h index 2109886dae5a..384929d165e1 100644 --- a/src/ray/common/task/task_priority.h +++ b/src/ray/common/task/task_priority.h @@ -48,6 +48,10 @@ struct Priority { return score[depth]; } + int GetDepth() { + return (int)score.size(); + } + void SetScore(int64_t depth, int s) { extend(depth + 1); RAY_CHECK(score[depth] >= s); diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 8a6155e90d93..f00ea59bc685 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -40,8 +40,7 @@ void BuildCommonTaskSpec( const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, const BundleID &bundle_id, bool placement_group_capture_child_tasks, - const std::string debugger_breakpoint, - const Priority &priority, + const std::string debugger_breakpoint, const Priority &priority, const std::string &serialized_runtime_env, const std::vector &runtime_env_uris, const std::string &concurrency_group_name = "") { @@ -50,9 +49,7 @@ void BuildCommonTaskSpec( task_id, name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, current_task_id, task_index, caller_id, address, num_returns, required_resources, required_placement_resources, bundle_id, placement_group_capture_child_tasks, - debugger_breakpoint, - priority, - serialized_runtime_env, runtime_env_uris, + debugger_breakpoint, priority, serialized_runtime_env, runtime_env_uris, concurrency_group_name); // Set task arguments. for (const auto &arg : args) { @@ -684,9 +681,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ rpc_address_, local_raylet_client_, core_worker_client_pool_, raylet_client_factory, std::move(lease_policy), memory_store_, task_manager_, local_raylet_id, RayConfig::instance().worker_lease_timeout_milliseconds(), actor_creator_, - /*get_task_priority=*/[](const TaskSpecification &spec) { - return spec.GetPriority(); - }, + /*get_task_priority=*/ + [](const TaskSpecification &spec) { return spec.GetPriority(); }, RayConfig::instance().max_tasks_in_flight_per_worker(), boost::asio::steady_timer(io_service_)); auto report_locality_data_callback = @@ -1216,9 +1212,7 @@ Status CoreWorker::CreateOwned(const std::shared_ptr &metadata, if (status.ok()) { status = plasma_store_provider_->Create(metadata, data_size, *object_id, /* owner_address = */ rpc_address_, - Priority(), - data, - created_by_worker); + Priority(), data, created_by_worker); } if (!status.ok() || !data) { if (owned_by_us) { @@ -1235,8 +1229,8 @@ Status CoreWorker::CreateOwned(const std::shared_ptr &metadata, Status CoreWorker::CreateExisting(const std::shared_ptr &metadata, const size_t data_size, const ObjectID &object_id, const rpc::Address &owner_address, - const Priority &priority, - std::shared_ptr *data, bool created_by_worker) { + const Priority &priority, std::shared_ptr *data, + bool created_by_worker) { if (options_.is_local_mode) { return Status::NotImplemented( "Creating an object with a pre-existing ObjectID is not supported in local " @@ -1670,16 +1664,15 @@ std::vector CoreWorker::SubmitTask( ? function.GetFunctionDescriptor()->DefaultTaskName() : task_options.name; // TODO(ekl) offload task building onto a thread pool for performance - BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, task_name, - worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), - rpc_address_, function, args, task_options.num_returns, - constrained_resources, required_resources, placement_options, - placement_group_capture_child_tasks, debugger_breakpoint, - Priority(), - task_options.serialized_runtime_env, task_options.runtime_env_uris); + BuildCommonTaskSpec( + builder, worker_context_.GetCurrentJobID(), task_id, task_name, + worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, + function, args, task_options.num_returns, constrained_resources, required_resources, + placement_options, placement_group_capture_child_tasks, debugger_breakpoint, + Priority(), task_options.serialized_runtime_env, task_options.runtime_env_uris); builder.SetNormalTaskSpec(max_retries, retry_exceptions); TaskSpecification task_spec = builder.Build(); - //priority = task_manager_->GenerateTaskPriority(task_spec); + // priority = task_manager_->GenerateTaskPriority(task_spec); RAY_LOG(DEBUG) << "Submit task " << task_spec.DebugString(); std::vector returned_refs; if (options_.is_local_mode) { @@ -1689,7 +1682,6 @@ std::vector CoreWorker::SubmitTask( CurrentCallSite(), max_retries); io_service_.post( [this, task_spec]() { - //(Jae) This is the reason why tasks are not placed with priority RAY_UNUSED(direct_task_submitter_->SubmitTask(task_spec)); }, "CoreWorker.SubmitTask"); @@ -1734,8 +1726,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, new_placement_resources, actor_creation_options.placement_options, actor_creation_options.placement_group_capture_child_tasks, "", /* debugger_breakpoint */ - Priority(), - actor_creation_options.serialized_runtime_env, + Priority(), actor_creation_options.serialized_runtime_env, actor_creation_options.runtime_env_uris); auto actor_handle = std::make_unique( @@ -1917,11 +1908,10 @@ std::vector CoreWorker::SubmitActorTask( worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, num_returns, task_options.resources, required_resources, std::make_pair(PlacementGroupID::Nil(), -1), - true, /* placement_group_capture_child_tasks */ - "", /* debugger_breakpoint */ - Priority(), - "{}", /* serialized_runtime_env */ - {}, /* runtime_env_uris */ + true, /* placement_group_capture_child_tasks */ + "", /* debugger_breakpoint */ + Priority(), "{}", /* serialized_runtime_env */ + {}, /* runtime_env_uris */ task_options.concurrency_group_name); // NOTE: placement_group_capture_child_tasks and runtime_env will // be ignored in the actor because we should always follow the actor's option. @@ -2144,9 +2134,10 @@ Status CoreWorker::AllocateReturnObject(const ObjectID &object_id, const std::vector &contained_object_ids, int64_t &task_output_inlined_bytes, std::shared_ptr *return_object) { + auto spec = worker_context_.GetCurrentTask(); rpc::Address owner_address(options_.is_local_mode ? rpc::Address() - : worker_context_.GetCurrentTask()->CallerAddress()); + : spec->CallerAddress()); bool object_already_exists = false; std::shared_ptr data_buffer; @@ -2169,8 +2160,7 @@ Status CoreWorker::AllocateReturnObject(const ObjectID &object_id, task_output_inlined_bytes += static_cast(data_size); } else { RAY_RETURN_NOT_OK(CreateExisting(metadata, data_size, object_id, owner_address, - Priority(), - &data_buffer, + spec->GetPriority(), &data_buffer, /*created_by_worker=*/true)); object_already_exists = !data_buffer; } @@ -2180,6 +2170,8 @@ Status CoreWorker::AllocateReturnObject(const ObjectID &object_id, auto contained_refs = GetObjectRefs(contained_object_ids); *return_object = std::make_shared(data_buffer, metadata, std::move(contained_refs)); + } else { + RAY_LOG(DEBUG) << "Return object already exists " << object_id; } return Status::OK(); @@ -3124,16 +3116,15 @@ void CoreWorker::HandleExit(const rpc::ExitRequest &request, rpc::ExitReply *rep // any object pinning RPCs in flight. bool is_idle = !own_objects && pins_in_flight == 0; reply->set_success(is_idle); - send_reply_callback( - Status::OK(), - [this, is_idle]() { - // If the worker is idle, we exit. - if (is_idle) { - Exit(rpc::WorkerExitType::IDLE_EXIT); - } - }, - // We need to kill it regardless if the RPC failed. - [this]() { Exit(rpc::WorkerExitType::INTENDED_EXIT); }); + send_reply_callback(Status::OK(), + [this, is_idle]() { + // If the worker is idle, we exit. + if (is_idle) { + Exit(rpc::WorkerExitType::IDLE_EXIT); + } + }, + // We need to kill it regardless if the RPC failed. + [this]() { Exit(rpc::WorkerExitType::INTENDED_EXIT); }); } void CoreWorker::HandleAssignObjectOwner(const rpc::AssignObjectOwnerRequest &request, diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index c3e78a30d8d7..716f91820949 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -506,11 +506,7 @@ void CoreWorkerDirectTaskReceiver::HandleTask( return_object->set_object_id(id.Binary()); // The object is nullptr if it already existed in the object store. - if (!return_objects[i]) { - RAY_LOG(INFO) << "Failed to create task return object " << id - << " in the object store, returning an error to the application."; - return_objects[i] = std::make_shared(rpc::ErrorType::OBJECT_LOST); - } + RAY_CHECK(return_objects[i]) << id; const auto &result = return_objects[i]; return_object->set_size(result->GetSize()); diff --git a/src/ray/object_manager/common.h b/src/ray/object_manager/common.h index 8774ad034d0d..d8f2c07e2f17 100644 --- a/src/ray/object_manager/common.h +++ b/src/ray/object_manager/common.h @@ -29,7 +29,8 @@ using SpillObjectsCallback = std::function; /// Callback when the creation of object(s) is blocked. The priority is the /// highest priority of a blocked object. -using ObjectCreationBlockedCallback = std::function; +using ObjectCreationBlockedCallback = std::function; using SetShouldSpillCallback = std::function; @@ -53,6 +54,8 @@ struct ObjectInfo { int owner_port; /// Owner's worker ID. WorkerID owner_worker_id; + // Priority of the object. Used for blockTasks() memory backpressure + ray::Priority priority; int64_t GetObjectSize() const { return data_size + metadata_size; } @@ -62,7 +65,8 @@ struct ObjectInfo { (owner_raylet_id == other.owner_raylet_id) && (owner_ip_address == other.owner_ip_address) && (owner_port == other.owner_port) && - (owner_worker_id == other.owner_worker_id)); + (owner_worker_id == other.owner_worker_id) && + (priority == other.priority)); } }; diff --git a/src/ray/object_manager/plasma/common.h b/src/ray/object_manager/plasma/common.h index 6297a2f60eca..5306865a7830 100644 --- a/src/ray/object_manager/plasma/common.h +++ b/src/ray/object_manager/plasma/common.h @@ -109,6 +109,8 @@ class LocalObject { const plasma::flatbuf::ObjectSource &GetSource() const { return source; } + ray::Priority &GetPriority() { return object_info.priority; } + void ToPlasmaObject(PlasmaObject *object, bool check_sealed) const { RAY_DCHECK(object != nullptr); if (check_sealed) { diff --git a/src/ray/object_manager/plasma/create_request_queue.cc b/src/ray/object_manager/plasma/create_request_queue.cc index 3a5f7ddd5670..008807921aa0 100644 --- a/src/ray/object_manager/plasma/create_request_queue.cc +++ b/src/ray/object_manager/plasma/create_request_queue.cc @@ -31,10 +31,9 @@ uint64_t CreateRequestQueue::AddRequest(const ray::TaskKey &task_id, size_t object_size) { auto req_id = next_req_id_++; fulfilled_requests_[req_id] = nullptr; - //auto taskId = task.GetTaskSpecification().GetTaskKey(); - queue_.emplace( - task_id, - new CreateRequest(object_id, req_id, client, create_callback, object_size)); + // auto taskId = task.GetTaskSpecification().GetTaskKey(); + queue_.emplace(task_id, new CreateRequest(object_id, req_id, client, create_callback, + object_size)); num_bytes_pending_ += object_size; return req_id; } @@ -67,16 +66,21 @@ std::pair CreateRequestQueue::TryRequestImmediately( PlasmaObject result = {}; // Immediately fulfill it using the fallback allocator. - PlasmaError error = create_callback(/*fallback_allocator=*/true, &result, - /*spilling_required=*/nullptr); + PlasmaError error = + create_callback(/*fallback_allocator=*/true, &result, + /*spilling_required=*/nullptr, nullptr, nullptr, nullptr); return {result, error}; } Status CreateRequestQueue::ProcessRequest(bool fallback_allocator, std::unique_ptr &request, - bool *spilling_required) { + bool *spilling_required, + bool *block_tasks_required, + bool *evict_tasks_required, + ray::Priority *lowest_pri) { request->error = - request->create_callback(fallback_allocator, &request->result, spilling_required); + request->create_callback(fallback_allocator, &request->result, spilling_required, + block_tasks_required, evict_tasks_required, lowest_pri); if (request->error == PlasmaError::OutOfMemory) { return Status::ObjectStoreFull(""); } else { @@ -91,8 +95,8 @@ Status CreateRequestQueue::ProcessFirstRequest() { if (queue_it != queue_.end()) { bool spilling_required = false; std::unique_ptr &request = queue_it->second; - auto status = - ProcessRequest(/*fallback_allocator=*/false, request, &spilling_required); + auto status = ProcessRequest(/*fallback_allocator=*/false, request, + &spilling_required, nullptr, nullptr, nullptr); if (spilling_required) { spill_objects_callback_(); } @@ -124,7 +128,7 @@ Status CreateRequestQueue::ProcessFirstRequest() { } else { // Trigger the fallback allocator. status = ProcessRequest(/*fallback_allocator=*/true, request, - /*spilling_required=*/nullptr); + /*spilling_required=*/nullptr, nullptr, nullptr, nullptr); if (!status.ok()) { std::string dump = ""; if (dump_debug_info_callback_ && !logged_oom) { @@ -143,23 +147,42 @@ Status CreateRequestQueue::ProcessFirstRequest() { return Status::OK(); } -void CreateRequestQueue::SetShouldSpill(bool should_spill){ - should_spill_ = should_spill; +void CreateRequestQueue::SetShouldSpill(bool should_spill) { + should_spill_ = should_spill; } Status CreateRequestQueue::ProcessRequests() { // Suppress OOM dump to once per grace period. bool logged_oom = false; + bool enable_blocktasks = RayConfig::instance().enable_BlockTasks(); + bool enable_blocktasks_spill = RayConfig::instance().enable_BlockTasksSpill(); + bool enable_evicttasks = RayConfig::instance().enable_EvictTasks(); + while (!queue_.empty()) { auto queue_it = queue_.begin(); bool spilling_required = false; + bool block_tasks_required = false; + bool evict_tasks_required = false; std::unique_ptr &request = queue_it->second; + ray::Priority lowest_pri; + ray::TaskKey task_id = queue_it->first; auto status = - ProcessRequest(/*fallback_allocator=*/false, request, &spilling_required); + ProcessRequest(/*fallback_allocator=*/false, request, &spilling_required, + &block_tasks_required, &evict_tasks_required, &lowest_pri); if (spilling_required) { spill_objects_callback_(); } + //Turn these flags on only when matching flags are on + block_tasks_required = (block_tasks_required&enable_blocktasks); + evict_tasks_required = (evict_tasks_required&enable_evicttasks); + + if (block_tasks_required || evict_tasks_required) { + on_object_creation_blocked_callback_(lowest_pri, block_tasks_required, + evict_tasks_required); + } + auto now = get_time_(); + if (status.ok()) { FinishRequest(queue_it); // Reset the oom start time since the creation succeeds. @@ -173,12 +196,20 @@ Status CreateRequestQueue::ProcessRequests() { oom_start_time_ns_ = now; } - // Notify the scheduler that object creation is blocked at this priority. - on_object_creation_blocked_callback_(queue_it->first.first); - if (!should_spill_) { - RAY_LOG(INFO) << "Object creation of priority " << queue_it->first.first << " blocked"; - return Status::TransientObjectStoreFull("Waiting for higher priority tasks to finish"); - } + if (enable_blocktasks || enable_evicttasks) { + RAY_LOG(DEBUG) << "[JAE_DEBUG] calling object_creation_blocked_callback (" + << enable_blocktasks <<" " << enable_evicttasks << " " + << enable_blocktasks_spill << ") on priority " + << lowest_pri; + if(!block_tasks_required && !evict_tasks_required){ + on_object_creation_blocked_callback_(lowest_pri, enable_blocktasks , enable_evicttasks); + } + if (enable_blocktasks_spill || (enable_evicttasks && (!should_spill_)) + /*if evictTasks is enabled, do not trigger spill unless should_spill_ is set*/) { + return Status::TransientObjectStoreFull( + "Waiting for higher priority tasks to finish"); + } + } auto grace_period_ns = oom_grace_period_ns_; auto spill_pending = spill_objects_callback_(); @@ -194,18 +225,18 @@ Status CreateRequestQueue::ProcessRequests() { return Status::ObjectStoreFull("Waiting for grace period."); } else { // Trigger the fallback allocator. - status = ProcessRequest(/*fallback_allocator=*/true, request, - /*spilling_required=*/nullptr); + auto status = ProcessRequest(/*fallback_allocator=*/true, request, + /*spilling_required=*/nullptr, nullptr, nullptr, nullptr); if (!status.ok()) { std::string dump = ""; if (dump_debug_info_callback_ && !logged_oom) { dump = dump_debug_info_callback_(); logged_oom = true; } - RAY_LOG(INFO) << "Out-of-memory: Failed to create object " - << (request)->object_id << " of size " - << (request)->object_size / 1024 / 1024 << "MB\n" - << dump; + RAY_LOG(INFO) << "Out-of-memory: Failed to create object " + << (request)->object_id << " of size " + << (request)->object_size / 1024 / 1024 << "MB\n" + << dump; } FinishRequest(queue_it); } @@ -214,15 +245,18 @@ Status CreateRequestQueue::ProcessRequests() { // If we make it here, then there is nothing left in the queue. It's safe to // run new tasks again. - RAY_UNUSED(on_object_creation_blocked_callback_(ray::Priority())); + if (RayConfig::instance().enable_BlockTasks()) { + RAY_LOG(DEBUG) << "[JAE_DEBUG] resetting object_creation_blocked_callback priority"; + RAY_UNUSED(on_object_creation_blocked_callback_(ray::Priority(), true, false)); + } return Status::OK(); } void CreateRequestQueue::FinishRequest( - absl::btree_map>::iterator queue_it) { + absl::btree_map>::iterator queue_it) { // Fulfill the request. - //auto &request = *(queue_it->second); + // auto &request = *(queue_it->second); auto it = fulfilled_requests_.find(queue_it->second->request_id); RAY_CHECK(it != fulfilled_requests_.end()); RAY_CHECK(it->second == nullptr); diff --git a/src/ray/object_manager/plasma/create_request_queue.h b/src/ray/object_manager/plasma/create_request_queue.h index 19b2328c960b..9ec3b94b2585 100644 --- a/src/ray/object_manager/plasma/create_request_queue.h +++ b/src/ray/object_manager/plasma/create_request_queue.h @@ -34,7 +34,8 @@ namespace plasma { class CreateRequestQueue { public: using CreateObjectCallback = std::function; + bool fallback_allocator, PlasmaObject *result, bool *spilling_required, + bool *block_tasks_required, bool *evict_tasks_required, ray::Priority *lowest_pri)>; CreateRequestQueue(int64_t oom_grace_period_s, ray::SpillObjectsCallback spill_objects_callback, @@ -158,7 +159,8 @@ class CreateRequestQueue { /// returned by the request handler inside. Returns OK if the request can be /// finished. Status ProcessRequest(bool fallback_allocator, std::unique_ptr &request, - bool *spilling_required); + bool *spilling_required, bool *block_tasks_required, + bool *evict_tasks_required, ray::Priority *lowest_pri); /// Finish a queued request and remove it from the queue. void FinishRequest(absl::btree_map>::iterator queue_it); diff --git a/src/ray/object_manager/plasma/eviction_policy.cc b/src/ray/object_manager/plasma/eviction_policy.cc index 77e283e121ff..0a15f8c6fe2f 100644 --- a/src/ray/object_manager/plasma/eviction_policy.cc +++ b/src/ray/object_manager/plasma/eviction_policy.cc @@ -80,8 +80,11 @@ int64_t LRUCache::ChooseObjectsToEvict(int64_t num_bytes_required, std::vector &objects_to_evict) { int64_t bytes_evicted = 0; auto it = item_list_.end(); + RAY_LOG(DEBUG) << "[JAE_DEBUG] [" << __func__ << "] num_bytes_required: " + <first; objects_to_evict.push_back(it->first); bytes_evicted += it->second; bytes_evicted_total_ += it->second; @@ -126,7 +129,11 @@ int64_t EvictionPolicy::RequireSpace(int64_t size, RAY_LOG(DEBUG) << "There is not enough space to create this object, so evicting " << objects_to_evict.size() << " objects to free up " << num_bytes_evicted << " bytes. The number of bytes in use (before " - << "this eviction) is " << allocator_.Allocated() << "."; + << "this eviction) is " << allocator_.Allocated() + << " size is " << size + << " FootprintLimit() is " << allocator_.GetFootprintLimit() + << " required_space is " << required_space + << " space to free is " << space_to_free << "."; return required_space - num_bytes_evicted; } diff --git a/src/ray/object_manager/plasma/object_lifecycle_manager.cc b/src/ray/object_manager/plasma/object_lifecycle_manager.cc index 594e9a645156..5a486228fab0 100644 --- a/src/ray/object_manager/plasma/object_lifecycle_manager.cc +++ b/src/ray/object_manager/plasma/object_lifecycle_manager.cc @@ -49,6 +49,10 @@ std::pair ObjectLifecycleManager::Cre return {entry, PlasmaError::OK}; } +ray::Priority ObjectLifecycleManager::GetLowestPriObject() { + return object_store_->GetLowestPriObject(); +} + const LocalObject *ObjectLifecycleManager::GetObject(const ObjectID &object_id) const { return object_store_->GetObject(object_id); } @@ -133,7 +137,8 @@ bool ObjectLifecycleManager::AddReference(const ObjectID &object_id) { entry->ref_count++; stats_collector_.OnObjectRefIncreased(*entry); RAY_LOG(DEBUG) << "Object " << object_id << " reference has incremented" - << ", num bytes in use is now " << GetNumBytesInUse(); + << ", num bytes in use is now " << GetNumBytesInUse() + << " ref_count is " << entry->ref_count; return true; } @@ -147,6 +152,8 @@ bool ObjectLifecycleManager::RemoveReference(const ObjectID &object_id) { } entry->ref_count--; + RAY_LOG(DEBUG) << "Object " << object_id << " reference has decremented ref_count is " + << entry->ref_count; stats_collector_.OnObjectRefDecreased(*entry); if (entry->ref_count > 0) { diff --git a/src/ray/object_manager/plasma/object_lifecycle_manager.h b/src/ray/object_manager/plasma/object_lifecycle_manager.h index a989fd09ba8d..fd76bdbe75e2 100644 --- a/src/ray/object_manager/plasma/object_lifecycle_manager.h +++ b/src/ray/object_manager/plasma/object_lifecycle_manager.h @@ -45,6 +45,12 @@ class IObjectLifecycleManager { const ray::ObjectInfo &object_info, plasma::flatbuf::ObjectSource source, bool fallback_allocator) = 0; + /// Get object of lowest priority. + /// \return + /// - nullptr if such object doesn't exist. + /// - otherwise, pointer to the object. + virtual ray::Priority GetLowestPriObject() = 0; + /// Get object by id. /// \return /// - nullptr if such object doesn't exist. @@ -106,6 +112,8 @@ class ObjectLifecycleManager : public IObjectLifecycleManager { const ray::ObjectInfo &object_info, plasma::flatbuf::ObjectSource source, bool fallback_allocator) override; + ray::Priority GetLowestPriObject() override; + const LocalObject *GetObject(const ObjectID &object_id) const override; const LocalObject *SealObject(const ObjectID &object_id) override; diff --git a/src/ray/object_manager/plasma/object_store.cc b/src/ray/object_manager/plasma/object_store.cc index a36ad1d54906..cbaf2818871b 100644 --- a/src/ray/object_manager/plasma/object_store.cc +++ b/src/ray/object_manager/plasma/object_store.cc @@ -51,6 +51,22 @@ const LocalObject *ObjectStore::CreateObject(const ray::ObjectInfo &object_info, return entry; } +ray::Priority ObjectStore::GetLowestPriObject() { + // Return the lowest priority object in object_table + auto it = object_table_.begin(); + if(it == object_table_.end()) + return ray::Priority(); + ray::Priority lowest_priority = it->second->GetPriority(); + for (; it != object_table_.end(); it++){ + ray::Priority p = it->second->GetPriority(); + if(lowest_priority < p){ + lowest_priority = p; + } + } + return lowest_priority; +} + + const LocalObject *ObjectStore::GetObject(const ObjectID &object_id) const { auto it = object_table_.find(object_id); if (it == object_table_.end()) { @@ -71,6 +87,7 @@ const LocalObject *ObjectStore::SealObject(const ObjectID &object_id) { bool ObjectStore::DeleteObject(const ObjectID &object_id) { auto entry = GetMutableObject(object_id); + RAY_LOG(DEBUG) << "[JAE_DEBUG] [" << __func__ << "] object is freed:" << object_id; if (entry == nullptr) { return false; } diff --git a/src/ray/object_manager/plasma/object_store.h b/src/ray/object_manager/plasma/object_store.h index 9cb0601b8c52..f8b79d41a5dc 100644 --- a/src/ray/object_manager/plasma/object_store.h +++ b/src/ray/object_manager/plasma/object_store.h @@ -44,6 +44,13 @@ class IObjectStore { plasma::flatbuf::ObjectSource source, bool fallback_allocate) = 0; + /// Get object of lowest priority. + /// + /// \return + /// - nullptr if such object doesn't exist. + /// - otherwise, pointer to the object. + virtual ray::Priority GetLowestPriObject() = 0; + /// Get object by id. /// /// \param object_id Object ID of the object to be sealed. @@ -79,6 +86,8 @@ class ObjectStore : public IObjectStore { plasma::flatbuf::ObjectSource source, bool fallback_allocate) override; + ray::Priority GetLowestPriObject() override; + const LocalObject *GetObject(const ObjectID &object_id) const override; const LocalObject *SealObject(const ObjectID &object_id) override; diff --git a/src/ray/object_manager/plasma/protocol.cc b/src/ray/object_manager/plasma/protocol.cc index 8e73daba82b0..063bb64e0d70 100644 --- a/src/ray/object_manager/plasma/protocol.cc +++ b/src/ray/object_manager/plasma/protocol.cc @@ -216,6 +216,11 @@ void ReadCreateRequest(uint8_t *data, size_t size, ray::ObjectInfo *object_info, object_info->owner_ip_address = message->owner_ip_address()->str(); object_info->owner_port = message->owner_port(); object_info->owner_worker_id = WorkerID::FromBinary(message->owner_worker_id()->str()); + auto p = message->priority(); + for(unsigned int i=0; isize(); i++){ + object_info->priority.SetScore(i, (*p)[i]); + } + //object_info->priority = message->priority(); *source = message->source(); *device_num = message->device_num(); return; diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index 74398edc2405..4c8560e91ed0 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -67,14 +67,15 @@ ray::ObjectID GetCreateRequestObjectId(const std::vector &message) { } } // namespace -PlasmaStore::PlasmaStore(instrumented_io_context &main_service, IAllocator &allocator, - const std::string &socket_name, uint32_t delay_on_oom_ms, - float object_spilling_threshold, - ray::SpillObjectsCallback spill_objects_callback, - std::function object_store_full_callback, - ray::AddObjectCallback add_object_callback, - ray::DeleteObjectCallback delete_object_callback, - ray::ObjectCreationBlockedCallback on_object_creation_blocked_callback) +PlasmaStore::PlasmaStore( + instrumented_io_context &main_service, IAllocator &allocator, + const std::string &socket_name, uint32_t delay_on_oom_ms, + float object_spilling_threshold, float block_tasks_threshold, + float evict_tasks_threshold, ray::SpillObjectsCallback spill_objects_callback, + std::function object_store_full_callback, + ray::AddObjectCallback add_object_callback, + ray::DeleteObjectCallback delete_object_callback, + ray::ObjectCreationBlockedCallback on_object_creation_blocked_callback) : io_context_(main_service), socket_name_(socket_name), acceptor_(main_service, ParseUrlEndpoint(socket_name)), @@ -85,10 +86,11 @@ PlasmaStore::PlasmaStore(instrumented_io_context &main_service, IAllocator &allo object_lifecycle_mgr_(allocator_, delete_object_callback_), delay_on_oom_ms_(delay_on_oom_ms), object_spilling_threshold_(object_spilling_threshold), + block_tasks_threshold_(block_tasks_threshold), + evict_tasks_threshold_(evict_tasks_threshold), create_request_queue_( /*oom_grace_period_s=*/RayConfig::instance().oom_grace_period_s(), - spill_objects_callback, - on_object_creation_blocked_callback, + spill_objects_callback, on_object_creation_blocked_callback, object_store_full_callback, /*get_time=*/ []() { return absl::GetCurrentTimeNanos(); }, @@ -137,11 +139,11 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID &object_id, client->MarkObjectAsUsed(object_id); } -PlasmaError PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr &client, - const std::vector &message, - bool fallback_allocator, - PlasmaObject *object, - bool *spilling_required) { +PlasmaError PlasmaStore::HandleCreateObjectRequest( + const std::shared_ptr &client, const std::vector &message, + bool fallback_allocator, PlasmaObject *object, bool *spilling_required, + bool *block_tasks_required, bool *evict_tasks_required, + ray::Priority *lowest_priority) { uint8_t *input = (uint8_t *)message.data(); size_t input_size = message.size(); ray::ObjectInfo object_info; @@ -160,26 +162,38 @@ PlasmaError PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr << ", data_size=" << object_info.data_size << ", metadata_size=" << object_info.metadata_size; } - //TODO(Jae) Erase this later - /* const int64_t footprint_limit = allocator_.GetFootprintLimit(); - const float allocated_percentage = - static_cast(allocator_.Allocated()) / footprint_limit; - if(allocated_percentage >= block_tasks_threshold_){ - blockTasks(); + float allocated_percentage; + + if (footprint_limit != 0) { + allocated_percentage = static_cast(allocator_.Allocated()) / footprint_limit; + } else { + allocated_percentage = 0; + } + if (block_tasks_required != nullptr) { + if (allocated_percentage >= block_tasks_threshold_) { + *block_tasks_required = true; + } + } + if (evict_tasks_required != nullptr) { + if (allocated_percentage >= evict_tasks_threshold_) { + *evict_tasks_required = true; + } } - if(allocated_percentage >= evict_tasks_threshold_){ - evictTasks(); + if (lowest_priority != nullptr) { + //LocalObject *lowest_pri_obj = object_lifecycle_mgr_.GetLowestPriObject(); + //lowest_priority = lowest_pri_obj->GetPriority(); + ray::Priority p = object_lifecycle_mgr_.GetLowestPriObject(); + int size = p.GetDepth(); + int i; + for(i=0; iSetScore(i, p.GetScore(i)); + } } - */ // Trigger object spilling if current usage is above the specified threshold. if (spilling_required != nullptr) { - const int64_t footprint_limit = allocator_.GetFootprintLimit(); if (footprint_limit != 0) { - const int64_t footprint_limit = allocator_.GetFootprintLimit(); - const float allocated_percentage = - static_cast(allocator_.Allocated()) / footprint_limit; if (allocated_percentage > object_spilling_threshold_) { RAY_LOG(DEBUG) << "Triggering object spilling because current usage " << allocated_percentage << "% is above threshold " @@ -278,6 +292,7 @@ void PlasmaStore::ReleaseObject(const ObjectID &object_id, auto entry = object_lifecycle_mgr_.GetObject(object_id); RAY_CHECK(entry != nullptr); // Remove the client from the object's array of clients. + RAY_LOG(DEBUG) << "[JAE_DEBUG] [ReleaseObject] Object " << object_id; RAY_CHECK(RemoveFromClientObjectIds(object_id, client) == 1); } @@ -372,16 +387,21 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr &client, for (size_t i = 0; i < request->priority()->size(); i++) { priority.score.push_back(request->priority()->Get(i)); } + // Check the log and remove this if it gets a correct value + RAY_LOG(DEBUG) << "[JAE_DEBUG] [" << __func__ << "] priority passed is " << priority; ray::TaskKey key(priority, ObjectID::FromRandom().TaskId()); // absl failed analyze mutex safety for lambda - auto handle_create = [this, client, message]( - bool fallback_allocator, PlasmaObject *result, - bool *spilling_required) ABSL_NO_THREAD_SAFETY_ANALYSIS { - mutex_.AssertHeld(); - return HandleCreateObjectRequest(client, message, fallback_allocator, result, - spilling_required); - }; + auto handle_create = + [this, client, message]( + bool fallback_allocator, PlasmaObject *result, bool *spilling_required, + bool *block_tasks_required, bool *evict_tasks_required, + ray::Priority *lowest_priority) ABSL_NO_THREAD_SAFETY_ANALYSIS { + mutex_.AssertHeld(); + return HandleCreateObjectRequest(client, message, fallback_allocator, result, + spilling_required, block_tasks_required, + evict_tasks_required, lowest_priority); + }; if (request->try_immediately()) { RAY_LOG(DEBUG) << "Received request to create object " << object_id @@ -397,8 +417,8 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr &client, } else { // Returns a request ID that the client can use to later get the result // (to allow async processing on the client). - auto req_id = - create_request_queue_.AddRequest(key, object_id, client, handle_create, object_size); + auto req_id = create_request_queue_.AddRequest(key, object_id, client, + handle_create, object_size); RAY_LOG(DEBUG) << "Received create request for object " << object_id << " assigned request ID " << req_id << ", " << object_size << " bytes"; diff --git a/src/ray/object_manager/plasma/store.h b/src/ray/object_manager/plasma/store.h index 0ab45d3d0fcc..b65bf74969f6 100644 --- a/src/ray/object_manager/plasma/store.h +++ b/src/ray/object_manager/plasma/store.h @@ -56,6 +56,8 @@ class PlasmaStore { PlasmaStore(instrumented_io_context &main_service, IAllocator &allocator, const std::string &socket_name, uint32_t delay_on_oom_ms, float object_spilling_threshold, + float block_tasks_threshold, + float evict_tasks_threshold, ray::SpillObjectsCallback spill_objects_callback, std::function object_store_full_callback, ray::AddObjectCallback add_object_callback, @@ -201,7 +203,10 @@ class PlasmaStore { PlasmaError HandleCreateObjectRequest(const std::shared_ptr &client, const std::vector &message, bool fallback_allocator, PlasmaObject *object, - bool *spilling_required) + bool *spilling_required, + bool *block_tasks_required, + bool *evict_tasks_required, + ray::Priority *lowest_priority) EXCLUSIVE_LOCKS_REQUIRED(mutex_); void ReplyToCreateClient(const std::shared_ptr &client, @@ -269,8 +274,8 @@ class PlasmaStore { /// The percentage of object store memory used above which //blocking new tasks is triggerted - const float block_request_threshold_ = 0.8; - const float evict_request_threshold_ = 0.8; + const float block_tasks_threshold_; + const float evict_tasks_threshold_; /// A timer that is set when the first request in the queue is not /// serviceable because there is not enough memory. The request will be diff --git a/src/ray/object_manager/plasma/store_runner.cc b/src/ray/object_manager/plasma/store_runner.cc index 8eb1f36f8e3d..edf073394923 100644 --- a/src/ray/object_manager/plasma/store_runner.cc +++ b/src/ray/object_manager/plasma/store_runner.cc @@ -92,6 +92,8 @@ void PlasmaStoreRunner::Start(ray::SpillObjectsCallback spill_objects_callback, store_.reset(new PlasmaStore(main_service_, *allocator_, socket_name_, RayConfig::instance().object_store_full_delay_ms(), RayConfig::instance().object_spilling_threshold(), + RayConfig::instance().block_tasks_threshold(), + RayConfig::instance().evict_tasks_threshold(), spill_objects_callback, object_store_full_callback, add_object_callback, delete_object_callback, on_object_creation_blocked_callback)); diff --git a/src/ray/object_manager/test/create_request_queue_test.cc b/src/ray/object_manager/test/create_request_queue_test.cc index 95666d82e003..fa276a51342c 100644 --- a/src/ray/object_manager/test/create_request_queue_test.cc +++ b/src/ray/object_manager/test/create_request_queue_test.cc @@ -19,6 +19,8 @@ #include "ray/common/status.h" #include "ray/common/task/task_priority.h" +#include + namespace plasma { class MockClient : public ClientInterface { @@ -74,6 +76,41 @@ class CreateRequestQueueTest : public ::testing::Test { int num_global_gc_ = 0; }; +TEST_F(CreateRequestQueueTest, TestBlockTasks) { + auto oom_request = [&](bool fallback, PlasmaObject *result, bool *spill_requested) { + return PlasmaError::OutOfMemory; + }; + auto blocked_request = [&](bool fallback, PlasmaObject *result, bool *spill_requested) { + result->data_size = 1234; + return PlasmaError::OK; + }; + ray::TaskKey key(ray::Priority({1,2}), ObjectID::FromRandom().TaskId()); + ray::TaskKey key1(ray::Priority({3,4}), ObjectID::FromRandom().TaskId()); + + auto client = std::make_shared(); + auto req_id1 = queue_.AddRequest(key, ObjectID::Nil(), client, oom_request, 1234); + auto req_id2 = queue_.AddRequest(key1, ObjectID::Nil(), client, blocked_request, 1234); + + // Neither request was fulfilled. + ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull()); + ASSERT_TRUE(queue_.ProcessRequests().IsObjectStoreFull()); + ASSERT_REQUEST_UNFINISHED(queue_, req_id1); + ASSERT_REQUEST_UNFINISHED(queue_, req_id2); + ASSERT_EQ(num_global_gc_, 2); + + // Grace period is done. + // Blocked request should be still blocked + current_time_ns_ += oom_grace_period_s_ * 2e9; + ASSERT_TRUE(queue_.ProcessRequests().ok()); + ASSERT_EQ(num_global_gc_, 3); + + // Both requests fulfilled. + ASSERT_REQUEST_FINISHED(queue_, req_id1, PlasmaError::OutOfMemory); + ASSERT_REQUEST_UNFINISHED(queue_, req_id2, PlasmaError::OK); + + AssertNoLeaks(); +} + TEST_F(CreateRequestQueueTest, TestBtree) { auto request = [&](bool fallback, PlasmaObject *result, bool *spill_requested) { result->data_size = 1234; diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index b46a9be2062c..4634a4065c39 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -22,11 +22,16 @@ namespace ray { namespace raylet { -void LocalObjectManager::PinObjects(const std::vector &object_ids, +void LocalObjectManager::PinObjectsAndWaitForFree(const std::vector &object_ids, std::vector> &&objects, const rpc::Address &owner_address) { for (size_t i = 0; i < object_ids.size(); i++) { const auto &object_id = object_ids[i]; + if (objects_waiting_for_free_.count(object_id)) { + continue; + } + objects_waiting_for_free_.insert(object_id); + auto &object = objects[i]; if (object == nullptr) { RAY_LOG(ERROR) << "Plasma object " << object_id @@ -36,12 +41,7 @@ void LocalObjectManager::PinObjects(const std::vector &object_ids, RAY_LOG(DEBUG) << "Pinning object " << object_id; pinned_objects_size_ += object->GetSize(); pinned_objects_.emplace(object_id, std::make_pair(std::move(object), owner_address)); - } -} -void LocalObjectManager::WaitForObjectFree(const rpc::Address &owner_address, - const std::vector &object_ids) { - for (const auto &object_id : object_ids) { // Create a object eviction subscription message. auto wait_request = std::make_unique(); wait_request->set_object_id(object_id.Binary()); @@ -84,6 +84,7 @@ void LocalObjectManager::ReleaseFreedObject(const ObjectID &object_id) { RAY_CHECK((pinned_objects_.count(object_id) > 0) || (spilled_objects_url_.count(object_id) > 0) || (objects_pending_spill_.count(object_id) > 0)); + RAY_CHECK(objects_waiting_for_free_.erase(object_id)); spilled_object_pending_delete_.push(object_id); if (pinned_objects_.count(object_id)) { pinned_objects_size_ -= pinned_objects_[object_id].first->GetSize(); @@ -140,11 +141,11 @@ bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) { return false; } - RAY_LOG(DEBUG) << "Choosing objects to spill of total size " << num_bytes_to_spill; int64_t bytes_to_spill = 0; auto it = pinned_objects_.begin(); std::vector objects_to_spill; int64_t counts = 0; + RAY_LOG(DEBUG) << "Choosing objects to spill of total size " << num_bytes_to_spill; while (bytes_to_spill <= num_bytes_to_spill && it != pinned_objects_.end() && counts < max_fused_object_count_) { if (is_plasma_object_spillable_(it->first)) { diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index 83c2b561229e..db4d64760b6d 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -71,19 +71,10 @@ class LocalObjectManager { /// \param objects Pointers to the objects to be pinned. The pointer should /// be kept in scope until the object can be released. /// \param owner_address The owner of the objects to be pinned. - void PinObjects(const std::vector &object_ids, + void PinObjectsAndWaitForFree(const std::vector &object_ids, std::vector> &&objects, const rpc::Address &owner_address); - /// Wait for the objects' owner to free the object. The objects will be - /// released when the owner at the given address fails or replies that the - /// object can be evicted. - /// - /// \param owner_address The address of the owner of the objects. - /// \param object_ids The objects to be freed. - void WaitForObjectFree(const rpc::Address &owner_address, - const std::vector &object_ids); - /// Spill objects as much as possible as fast as possible up to the max throughput. /// /// \return True if spilling is in progress. @@ -211,6 +202,8 @@ class LocalObjectManager { absl::flat_hash_map, rpc::Address>> pinned_objects_; + absl::flat_hash_set objects_waiting_for_free_; + // Total size of objects pinned on this node. size_t pinned_objects_size_ = 0; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index f2069a666b55..a5ea1fc99f46 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -227,16 +227,17 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self return GetLocalObjectManager().IsSpillingInProgress(); }, /*on_object_creation_blocked_callback=*/ - [this](const Priority &base_priority) { - //cluster_task_manager_->BlockTasks(priority); - //TODO(Jae) Remove this line and set the actual value for should_spill - //from the ClusterTaskManager - cluster_task_manager_->BlockTasks(base_priority); - bool should_spill = cluster_task_manager_->EvictTasks(base_priority); - io_service_.post([this, should_spill](){ - object_manager_.SetShouldSpill(should_spill); + [this](const Priority &base_priority, bool block_tasks, bool evict_tasks) { + if(block_tasks){ + cluster_task_manager_->BlockTasks(base_priority); + } + if(evict_tasks){ + bool should_spill = cluster_task_manager_->EvictTasks(base_priority); + io_service_.post([this, should_spill](){ + object_manager_.SetShouldSpill(should_spill); },""); - }, + } + }, /*object_store_full_callback=*/ [this]() { // Post on the node manager's event loop since this @@ -332,9 +333,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self } auto destroy_worker = [this](std::shared_ptr worker, rpc::WorkerExitType disconnect_type) { - DisconnectClient(worker->Connection(), disconnect_type); - worker->MarkDead(); - KillWorker(worker); + DestroyWorker(worker, disconnect_type); }; auto is_owner_alive = [this](const WorkerID &owner_worker_id, const NodeID &owner_node_id) { @@ -536,7 +535,6 @@ void NodeManager::KillWorker(std::shared_ptr worker) { }); } -//when make a change in this function make sure to change destroy_worker lammbda function as well void NodeManager::DestroyWorker(std::shared_ptr worker, rpc::WorkerExitType disconnect_type) { // We should disconnect the client first. Otherwise, we'll remove bundle resources @@ -2109,9 +2107,7 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, send_reply_callback(Status::Invalid("Failed to get objects."), nullptr, nullptr); return; } - local_object_manager_.PinObjects(object_ids, std::move(results), owner_address); - // Wait for the object to be freed by the owner, which keeps the ref count. - local_object_manager_.WaitForObjectFree(owner_address, object_ids); + local_object_manager_.PinObjectsAndWaitForFree(object_ids, std::move(results), owner_address); send_reply_callback(Status::OK(), nullptr, nullptr); } diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index a1f0775a8a52..db8efb017533 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -31,7 +31,8 @@ ClusterTaskManager::ClusterTaskManager( const NodeID &self_node_id, std::shared_ptr cluster_resource_scheduler, TaskDependencyManagerInterface &task_dependency_manager, - std::function, rpc::WorkerExitType)> destroy_worker, + std::function, rpc::WorkerExitType)> + destroy_worker, std::function is_owner_alive, NodeInfoGetter get_node_info, std::function announce_infeasible_task, @@ -40,8 +41,7 @@ ClusterTaskManager::ClusterTaskManager( std::function &object_ids, std::vector> *results)> get_task_arguments, - size_t max_pinned_task_arguments_bytes, - SetShouldSpillCallback set_should_spill) + size_t max_pinned_task_arguments_bytes, SetShouldSpillCallback set_should_spill) : self_node_id_(self_node_id), cluster_resource_scheduler_(cluster_resource_scheduler), task_dependency_manager_(task_dependency_manager), @@ -54,13 +54,13 @@ ClusterTaskManager::ClusterTaskManager( report_worker_backlog_(RayConfig::instance().report_worker_backlog()), worker_pool_(worker_pool), leased_workers_(leased_workers), - block_requested_priority_(Priority()), + block_requested_priority_(Priority()), get_task_arguments_(get_task_arguments), max_pinned_task_arguments_bytes_(max_pinned_task_arguments_bytes), metric_tasks_queued_(0), metric_tasks_dispatched_(0), metric_tasks_spilled_(0), - set_should_spill_(set_should_spill){} + set_should_spill_(set_should_spill) {} bool ClusterTaskManager::SchedulePendingTasks() { // Always try to schedule infeasible tasks in case they are now feasible. @@ -77,7 +77,12 @@ bool ClusterTaskManager::SchedulePendingTasks() { // there are not enough available resources blocks other // tasks from being scheduled. Priority task_priority = work_it->first.first; - if(task_priority >= block_requested_priority_){ + RAY_LOG(DEBUG) << "[JAE_DEBUG] schedulePendingTasks task " << + work_it->second->task.GetTaskSpecification().TaskId() << "priority:" << task_priority + << " block requested is " << block_requested_priority_; + if (task_priority >= block_requested_priority_) { + RAY_LOG(DEBUG) << "[JAE_DEBUG] schedulePendingTasks blocked task " + << task_priority; return did_schedule; } @@ -148,7 +153,8 @@ bool ClusterTaskManager::WaitForTaskArgsRequests(std::shared_ptr work) { task_dependency_manager_.RequestTaskDependencies(task_id, task.GetDependencies()); if (args_ready) { RAY_LOG(DEBUG) << "Args already ready, task can be dispatched " << task_id; - tasks_to_dispatch_[scheduling_key].emplace(task.GetTaskSpecification().GetTaskKey(), work); + tasks_to_dispatch_[scheduling_key].emplace(task.GetTaskSpecification().GetTaskKey(), + work); } else { RAY_LOG(DEBUG) << "Waiting for args for task: " << task.GetTaskSpecification().TaskId(); @@ -159,7 +165,8 @@ bool ClusterTaskManager::WaitForTaskArgsRequests(std::shared_ptr work) { } else { RAY_LOG(DEBUG) << "No args, task can be dispatched " << task.GetTaskSpecification().TaskId(); - tasks_to_dispatch_[scheduling_key].emplace(task.GetTaskSpecification().GetTaskKey(), work); + tasks_to_dispatch_[scheduling_key].emplace(task.GetTaskSpecification().GetTaskKey(), + work); } return can_dispatch; } @@ -288,7 +295,6 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( auto &work = work_it->second; const auto &task = work->task; const auto spec = task.GetTaskSpecification(); - //TODO (Jae) NodeManager.cc TaskID task_id = spec.TaskId(); if (work->status == WorkStatus::WAITING_FOR_WORKER) { work_it++; @@ -297,11 +303,15 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( // Block tasks of a lower priority. Priority task_priority = work_it->first.first; - if(task_priority >= block_requested_priority_){ + RAY_LOG(DEBUG) << "[JAE_DEBUG] DispatchScheduledTasksToWorkers task " + << task_priority << " block requested is " + << block_requested_priority_; + if (task_priority >= block_requested_priority_) { + RAY_LOG(DEBUG) << "[JAE_DEBUG] DispatchScheduledTasksToWorkers blocked task " + << task_priority; break; } - bool args_missing = false; bool success = PinTaskArgsIfMemoryAvailable(spec, &args_missing); // An argument was evicted since this task was added to the dispatch @@ -440,9 +450,11 @@ void ClusterTaskManager::QueueAndScheduleTask( // If the scheduling class is infeasible, just add the work to the infeasible queue // directly. if (infeasible_tasks_.count(scheduling_class) > 0) { - infeasible_tasks_[scheduling_class].emplace(task.GetTaskSpecification().GetTaskKey(), work); + infeasible_tasks_[scheduling_class].emplace(task.GetTaskSpecification().GetTaskKey(), + work); } else { - tasks_to_schedule_[scheduling_class].emplace(task.GetTaskSpecification().GetTaskKey(), work); + tasks_to_schedule_[scheduling_class].emplace(task.GetTaskSpecification().GetTaskKey(), + work); } AddToBacklogTracker(task); ScheduleAndDispatchTasks(); @@ -461,7 +473,8 @@ void ClusterTaskManager::TasksUnblocked(const std::vector &ready_ids) { const auto &scheduling_key = task.GetTaskSpecification().GetSchedulingClass(); RAY_LOG(DEBUG) << "Args ready, task can be dispatched " << task.GetTaskSpecification().TaskId(); - tasks_to_dispatch_[scheduling_key].emplace(task.GetTaskSpecification().GetTaskKey(), work); + tasks_to_dispatch_[scheduling_key].emplace(task.GetTaskSpecification().GetTaskKey(), + work); waiting_task_queue_.erase(it->second); waiting_tasks_index_.erase(it); } @@ -529,7 +542,8 @@ bool ClusterTaskManager::PinTaskArgsIfMemoryAvailable(const TaskSpecification &s ReleaseTaskArgs(spec.TaskId()); RAY_LOG(DEBUG) << "Cannot dispatch task " << spec.TaskId() << " with arguments of size " << task_arg_bytes - << " current pinned bytes is " << pinned_task_arguments_bytes_; + << " current pinned bytes is " << pinned_task_arguments_bytes_ + << " max_pinned_task_arguments_bytes is " << max_pinned_task_arguments_bytes_; return false; } @@ -928,10 +942,11 @@ bool ClusterTaskManager::AnyPendingTasks(RayTask *exemplar, bool *any_pending, std::string ClusterTaskManager::DebugStr() const { // TODO(Shanly): This method will be replaced with `DebugString` once we remove the // legacy scheduler. - auto accumulator = [](size_t state, - const std::pair>> &pair) { - return state + pair.second.size(); - }; + auto accumulator = + [](size_t state, + const std::pair>> &pair) { + return state + pair.second.size(); + }; size_t num_infeasible_tasks = std::accumulate( infeasible_tasks_.begin(), infeasible_tasks_.end(), (size_t)0, accumulator); size_t num_tasks_to_schedule = std::accumulate( @@ -1211,21 +1226,19 @@ bool ClusterTaskManager::EvictTasks(Priority base_priority) { for (auto &entry : leased_workers_) { std::shared_ptr worker = entry.second; Priority priority = worker->GetAssignedTask().GetTaskSpecification().GetPriority(); - //Smaller priority have higher priority - //Does not have less than check it - if (priority >= base_priority){ + if (priority > base_priority) { workers_to_kill.push_back(worker); should_spill = false; } } for (auto &worker : workers_to_kill) { - //Consider Using CancelTask instead of DestroyWorker + // Consider Using CancelTask instead of DestroyWorker destroy_worker_(worker, rpc::WorkerExitType::INTENDED_EXIT); } - //Check Deadlock corner cases - //Finer granularity preemption is not considered, kill all the lower priorities + // Check Deadlock corner cases + // Finer granularity preemption is not considered, kill all the lower priorities return should_spill; } diff --git a/src/ray/raylet/test/local_object_manager_test.cc b/src/ray/raylet/test/local_object_manager_test.cc index 06db5b3bc20a..cc95d545c2d4 100644 --- a/src/ray/raylet/test/local_object_manager_test.cc +++ b/src/ray/raylet/test/local_object_manager_test.cc @@ -366,8 +366,7 @@ TEST_F(LocalObjectManagerTest, TestPin) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); - manager.WaitForObjectFree(owner_address, object_ids); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); for (size_t i = 0; i < free_objects_batch_size; i++) { ASSERT_TRUE(freed.empty()); @@ -393,7 +392,7 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); manager.SpillObjects(object_ids, [&](const Status &status) mutable { ASSERT_TRUE(status.ok()); }); @@ -449,7 +448,7 @@ TEST_F(LocalObjectManagerTest, TestExplicitSpill) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); int num_times_fired = 0; manager.SpillObjects(object_ids, [&](const Status &status) mutable { @@ -494,8 +493,7 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); - manager.WaitForObjectFree(owner_address, object_ids); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); int num_times_fired = 0; manager.SpillObjects(object_ids, [&](const Status &status) mutable { @@ -548,7 +546,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); ASSERT_TRUE(manager.SpillObjectsOfSize(total_size / 2)); for (const auto &id : object_ids) { ASSERT_EQ((*unpins)[id], 0); @@ -615,7 +613,7 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxFuseCount) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); ASSERT_TRUE(manager.SpillObjectsOfSize(total_size)); for (const auto &id : object_ids) { ASSERT_EQ((*unpins)[id], 0); @@ -661,7 +659,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectNotEvictable) { std::vector()); objects.push_back(std::move(object)); - manager.PinObjects(object_ids, std::move(objects), owner_address); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); ASSERT_FALSE(manager.SpillObjectsOfSize(1000)); for (const auto &id : object_ids) { ASSERT_EQ((*unpins)[id], 0); @@ -690,7 +688,7 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); // This will spill until 2 workers are occupied. manager.SpillObjectUptoMaxThroughput(); @@ -757,7 +755,7 @@ TEST_F(LocalObjectManagerTest, TestSpillError) { std::vector> objects; objects.push_back(std::move(object)); - manager.PinObjects({object_id}, std::move(objects), owner_address); + manager.PinObjectsAndWaitForFree({object_id}, std::move(objects), owner_address); int num_times_fired = 0; manager.SpillObjects({object_id}, [&](const Status &status) mutable { @@ -802,7 +800,7 @@ TEST_F(LocalObjectManagerTest, TestPartialSpillError) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); manager.SpillObjects(object_ids, [&](const Status &status) mutable { ASSERT_TRUE(status.ok()); }); @@ -838,8 +836,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteNoSpilledObjects) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); - manager.WaitForObjectFree(owner_address, object_ids); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); for (size_t i = 0; i < free_objects_batch_size; i++) { ASSERT_TRUE(freed.empty()); @@ -867,8 +864,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpilledObjects) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); - manager.WaitForObjectFree(owner_address, object_ids); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); // 2 Objects are spilled out of 3. std::vector object_ids_to_spill; @@ -916,8 +912,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCount) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); - manager.WaitForObjectFree(owner_address, object_ids); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); // Every object is spilled. std::vector object_ids_to_spill; @@ -977,8 +972,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); - manager.WaitForObjectFree(owner_address, object_ids); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); // Objects are spilled. std::vector spill_set_1; @@ -1051,8 +1045,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); - manager.WaitForObjectFree(owner_address, object_ids); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); std::vector object_ids_to_spill; int spilled_urls_size = free_objects_batch_size; @@ -1103,8 +1096,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCountRaceCondition) { std::vector()); objects.push_back(std::move(object)); } - manager.PinObjects(object_ids, std::move(objects), owner_address); - manager.WaitForObjectFree(owner_address, object_ids); + manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address); // Every object is spilled. std::vector object_ids_to_spill;