-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[xray] Basic task reconstruction mechanism #2526
Changes from all commits
5f55166
6995e47
d2f5876
3db0b93
45012bc
01289a5
31c8867
709eecc
65331cf
ea028d5
81653ae
d205925
e8be7a3
33b17df
fef5c3e
11793ce
efe98a0
16872d4
bea11c4
7095656
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,20 @@ class PubsubInterface { | |
virtual ~PubsubInterface(){}; | ||
}; | ||
|
||
template <typename ID, typename Data> | ||
class LogInterface { | ||
public: | ||
using DataT = typename Data::NativeTableType; | ||
using WriteCallback = | ||
std::function<void(AsyncGcsClient *client, const ID &id, const DataT &data)>; | ||
virtual Status Append(const JobID &job_id, const ID &id, std::shared_ptr<DataT> &data, | ||
const WriteCallback &done) = 0; | ||
virtual Status AppendAt(const JobID &job_id, const ID &task_id, | ||
std::shared_ptr<DataT> &data, const WriteCallback &done, | ||
const WriteCallback &failure, int log_length) = 0; | ||
virtual ~LogInterface(){}; | ||
}; | ||
|
||
/// \class Log | ||
/// | ||
/// A GCS table where every entry is an append-only log. This class is not | ||
|
@@ -63,14 +77,13 @@ class PubsubInterface { | |
/// ClientTable: Stores a log of which GCS clients have been added or deleted | ||
/// from the system. | ||
template <typename ID, typename Data> | ||
class Log : virtual public PubsubInterface<ID> { | ||
class Log : public LogInterface<ID, Data>, virtual public PubsubInterface<ID> { | ||
public: | ||
using DataT = typename Data::NativeTableType; | ||
using Callback = std::function<void(AsyncGcsClient *client, const ID &id, | ||
const std::vector<DataT> &data)>; | ||
/// The callback to call when a write to a key succeeds. | ||
using WriteCallback = | ||
std::function<void(AsyncGcsClient *client, const ID &id, const DataT &data)>; | ||
using WriteCallback = typename LogInterface<ID, Data>::WriteCallback; | ||
/// The callback to call when a SUBSCRIBE call completes and we are ready to | ||
/// request and receive notifications. | ||
using SubscriptionCallback = std::function<void(AsyncGcsClient *client)>; | ||
|
@@ -354,6 +367,21 @@ class TaskLeaseTable : public Table<TaskID, TaskLeaseData> { | |
pubsub_channel_ = TablePubsub::TASK_LEASE; | ||
prefix_ = TablePrefix::TASK_LEASE; | ||
} | ||
|
||
Status Add(const JobID &job_id, const TaskID &id, std::shared_ptr<TaskLeaseDataT> &data, | ||
const WriteCallback &done) override { | ||
RAY_RETURN_NOT_OK((Table<TaskID, TaskLeaseData>::Add(job_id, id, data, done))); | ||
// Mark the entry for expiration in Redis. It's okay if this command fails | ||
// since the lease entry itself contains the expiration period. In the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, is it actually ok if this fails? E.g., if it fails, then it will fail to extend the previous expiration, and so the key could actually expire too early, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to Redis documentation:
I believe that the |
||
// worst case, if the command fails, then a client that looks up the lease | ||
// entry will overestimate the expiration time. | ||
// TODO(swang): Use a common helper function to format the key instead of | ||
// hardcoding it to match the Redis module. | ||
std::vector<std::string> args = {"PEXPIRE", | ||
EnumNameTablePrefix(prefix_) + id.binary(), | ||
std::to_string(data->timeout)}; | ||
return context_->RunArgvAsync(args); | ||
} | ||
}; | ||
|
||
namespace raylet { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,10 +91,12 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, | |
local_queues_(SchedulingQueue()), | ||
scheduling_policy_(local_queues_), | ||
reconstruction_policy_( | ||
io_service_, [this](const TaskID &task_id) { ResubmitTask(task_id); }, | ||
io_service_, | ||
[this](const TaskID &task_id) { HandleTaskReconstruction(task_id); }, | ||
RayConfig::instance().initial_reconstruction_timeout_milliseconds(), | ||
gcs_client_->client_table().GetLocalClientId(), gcs_client->task_lease_table(), | ||
std::make_shared<ObjectDirectory>(gcs_client)), | ||
std::make_shared<ObjectDirectory>(gcs_client), | ||
gcs_client_->task_reconstruction_log()), | ||
task_dependency_manager_( | ||
object_manager, reconstruction_policy_, io_service, | ||
gcs_client_->client_table().GetLocalClientId(), | ||
|
@@ -1116,8 +1118,68 @@ void NodeManager::FinishAssignedTask(Worker &worker) { | |
worker.AssignTaskId(TaskID::nil()); | ||
} | ||
|
||
void NodeManager::ResubmitTask(const TaskID &task_id) { | ||
RAY_LOG(WARNING) << "Task re-execution is not currently implemented"; | ||
void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { | ||
// Retrieve the task spec in order to re-execute the task. | ||
RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup( | ||
JobID::nil(), task_id, | ||
/*success_callback=*/ | ||
[this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id, | ||
const ray::protocol::TaskT &task_data) { | ||
// The task was in the GCS task table. Use the stored task spec to | ||
// re-execute the task. | ||
const Task task(task_data); | ||
ResubmitTask(task); | ||
}, | ||
/*failure_callback=*/ | ||
[this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id) { | ||
// The task was not in the GCS task table. It must therefore be in the | ||
// lineage cache. | ||
if (!lineage_cache_.ContainsTask(task_id)) { | ||
// The task was not in the lineage cache. | ||
// TODO(swang): This should not ever happen, but Java TaskIDs are | ||
// currently computed differently from Python TaskIDs, so | ||
// reconstruction is currently broken for Java. Once the TaskID | ||
// generation code matches for both frontends, we should be able to | ||
// remove this warning and make it a fatal check. | ||
RAY_LOG(WARNING) << "Task " << task_id << " to reconstruct was not found in " | ||
"the GCS or the lineage cache. This " | ||
"job may hang."; | ||
} else { | ||
// Use a copy of the cached task spec to re-execute the task. | ||
const Task task = lineage_cache_.GetTask(task_id); | ||
ResubmitTask(task); | ||
} | ||
})); | ||
} | ||
|
||
void NodeManager::ResubmitTask(const Task &task) { | ||
// Actor reconstruction is turned off by default right now. If this is an | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just curious: is this a stage approach - aka. actor reconstruction will be enabled later? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we're still thinking about how exactly actor reconstruction should be exposed to the user. I believe the current plan is to have actor reconstruction enabled by a flag from the frontend. |
||
// actor task, treat the task as failed and do not resubmit it. | ||
if (task.GetTaskSpecification().IsActorTask()) { | ||
TreatTaskAsFailed(task.GetTaskSpecification()); | ||
return; | ||
} | ||
|
||
// Driver tasks cannot be reconstructed. If this is a driver task, push an | ||
// error to the driver and do not resubmit it. | ||
if (task.GetTaskSpecification().IsDriverTask()) { | ||
// TODO(rkn): Define this constant somewhere else. | ||
std::string type = "put_reconstruction"; | ||
std::ostringstream error_message; | ||
error_message << "The task with ID " << task.GetTaskSpecification().TaskId() | ||
<< " is a driver task and so the object created by ray.put " | ||
<< "could not be reconstructed."; | ||
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( | ||
task.GetTaskSpecification().DriverId(), type, error_message.str(), | ||
current_time_ms())); | ||
return; | ||
} | ||
|
||
// The task may be reconstructed. Submit it with an empty lineage, since any | ||
// uncommitted lineage must already be in the lineage cache. At this point, | ||
// the task should not yet exist in the local scheduling queue. If it does, | ||
// then this is a spurious reconstruction. | ||
SubmitTask(task, Lineage()); | ||
} | ||
|
||
void NodeManager::HandleObjectLocal(const ObjectID &object_id) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: there's a pair of unnecessary parentheses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I need the parentheses for it to build for some reason...I don't know exactly why, but the compiler seems to think that there are 2 arguments to the macro, maybe because of the templated argument?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the reason that you have to add
Table<TaskID, TaskLeaseData>::
because the two methods have the same name? Would that go away if we just change this method name fromAdd
to something else likeUpdateTaskLease
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just googled this, the reason is because the macro processor doesn't know about template, thus recognizes
Table<TaskID
as a parameter.Someone suggests this way:
It works. But I think adding an extra pair of parentheses is easier. Or as @robertnishihara suggested, use another function name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided against changing the function name since I believe it's nicer to have the
TaskLeaseTable
still implement theTable<TaskID, TaskLeaseData>
interface.