Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove task context from python worker #5987

Merged
merged 14 commits into from
Oct 25, 2019
Prev Previous commit
Next Next commit
Remove setters from cython
  • Loading branch information
edoakes committed Oct 25, 2019
commit ed3d914be04b10848a17749f808ba0740df8d5f4
21 changes: 1 addition & 20 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,6 @@ cdef _store_task_outputs(worker, return_ids, outputs):
cdef execute_task(
CTaskType task_type,
const CRayFunction &ray_function,
const CJobID &c_job_id,
const CActorID &c_actor_id,
const unordered_map[c_string, double] &c_resources,
const c_vector[shared_ptr[CRayObject]] &c_args,
const c_vector[CObjectID] &c_arg_reference_ids,
Expand Down Expand Up @@ -603,8 +601,6 @@ cdef execute_task(
cdef CRayStatus task_execution_handler(
CTaskType task_type,
const CRayFunction &ray_function,
const CJobID &c_job_id,
const CActorID &c_actor_id,
const unordered_map[c_string, double] &c_resources,
const c_vector[shared_ptr[CRayObject]] &c_args,
const c_vector[CObjectID] &c_arg_reference_ids,
Expand All @@ -615,8 +611,7 @@ cdef CRayStatus task_execution_handler(
try:
# The call to execute_task should never raise an exception. If it
# does, that indicates that there was an unexpected internal error.
execute_task(task_type, ray_function, c_job_id,
c_actor_id, c_resources, c_args,
execute_task(task_type, ray_function, c_resources, c_args,
c_arg_reference_ids, c_return_ids, returns)
except Exception:
traceback_str = traceback.format_exc() + (
Expand Down Expand Up @@ -672,23 +667,9 @@ cdef class CoreWorker:
def get_current_task_id(self):
return TaskID(self.core_worker.get().GetCurrentTaskId().Binary())

def set_current_task_id(self, TaskID task_id):
cdef:
CTaskID c_task_id = task_id.native()

with nogil:
self.core_worker.get().SetCurrentTaskId(c_task_id)

def get_current_job_id(self):
return JobID(self.core_worker.get().GetCurrentJobId().Binary())

def set_current_job_id(self, JobID job_id):
cdef:
CJobID c_job_id = job_id.native()

with nogil:
self.core_worker.get().SetCurrentJobId(c_job_id)

def get_actor_id(self):
return ActorID(self.core_worker.get().GetActorId().Binary())

Expand Down
6 changes: 0 additions & 6 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus (
CTaskType task_type,
const CRayFunction &ray_function,
const CJobID &job_id,
const CActorID &actor_id,
const unordered_map[c_string, double] &resources,
const c_vector[shared_ptr[CRayObject]] &args,
const c_vector[CObjectID] &arg_reference_ids,
Expand Down Expand Up @@ -116,12 +114,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
# TODO(edoakes): remove this once the raylet client is no longer used
# directly.
CRayletClient &GetRayletClient()
# TODO(edoakes): remove these once the Python core worker uses the task
# interfaces
CJobID GetCurrentJobId()
void SetCurrentJobId(const CJobID &job_id)
CTaskID GetCurrentTaskId()
void SetCurrentTaskId(const CTaskID &task_id)
const CActorID &GetActorId()
CTaskID GetCallerId()
const ResourceMappingType &GetResourceIDs() const
Expand Down
6 changes: 0 additions & 6 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,12 @@ class CoreWorker {
// Get the resource IDs available to this worker (as assigned by the raylet).
const ResourceMappingType GetResourceIDs() const;

// TODO(edoakes): remove this once Python core worker uses the task interfaces.
const TaskID &GetCurrentTaskId() const { return worker_context_.GetCurrentTaskID(); }

// TODO(edoakes): remove this once Python core worker uses the task interfaces.
void SetCurrentTaskId(const TaskID &task_id);

// TODO(edoakes): remove this once Python core worker uses the task interfaces.
const JobID &GetCurrentJobId() const { return worker_context_.GetCurrentJobID(); }

// TODO(edoakes): remove this once Python core worker uses the task interfaces.
void SetCurrentJobId(const JobID &job_id) { worker_context_.SetCurrentJobId(job_id); }

void SetActorId(const ActorID &actor_id) {
RAY_CHECK(actor_id_.IsNil());
actor_id_ = actor_id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeInitCoreWork

auto task_execution_callback =
[](ray::TaskType task_type, const ray::RayFunction &ray_function,
const JobID &job_id, const ActorID &actor_id,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<ray::RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/task_execution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Status CoreWorkerTaskExecutionInterface::ExecuteTask(
return_ids.pop_back();
task_type = TaskType::ACTOR_TASK;
}
status = task_execution_callback_(task_type, func, task_spec.JobId(), ActorID::Nil(),
status = task_execution_callback_(task_type, func,
task_spec.GetRequiredResources().GetResourceMap(),
args, arg_reference_ids, return_ids, results);

Expand Down
3 changes: 1 addition & 2 deletions src/ray/core_worker/task_execution.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ class CoreWorkerTaskExecutionInterface {
// Callback that must be implemented and provided by the language-specific worker
// frontend to execute tasks and return their results.
using TaskExecutionCallback = std::function<Status(
TaskType task_type, const RayFunction &ray_function, const JobID &job_id,
const ActorID &actor_id,
TaskType task_type, const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<RayObject>> &args,
const std::vector<ObjectID> &arg_reference_ids,
Expand Down