Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[xray] Basic task reconstruction mechanism #2526

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions java/test.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
#!/usr/bin/env bash

# Cause the script to exit if a single command fails.
set -e

# Show explicitly which commands are currently running.
set -x

ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
$ROOT_DIR/../build.sh -l java

Expand Down
4 changes: 3 additions & 1 deletion src/ray/gcs/format/gcs.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ table ObjectTableData {
}

table TaskReconstructionData {
num_executions: int;
// The number of times this task has been reconstructed so far.
num_reconstructions: int;
// The node manager that is trying to reconstruct the task.
node_manager_id: string;
}

Expand Down
18 changes: 18 additions & 0 deletions src/ray/gcs/redis_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,24 @@ Status RedisContext::RunAsync(const std::string &command, const UniqueID &id,
return Status::OK();
}

Status RedisContext::RunArgvAsync(const std::vector<std::string> &args) {
// Build the arguments.
std::vector<const char *> argv;
std::vector<size_t> argc;
for (size_t i = 0; i < args.size(); ++i) {
argv.push_back(args[i].data());
argc.push_back(args[i].size());
}
// Run the Redis command.
int status;
status = redisAsyncCommandArgv(async_context_, nullptr, nullptr, args.size(),
argv.data(), argc.data());
if (status == REDIS_ERR) {
return Status::RedisError(std::string(async_context_->errstr));
}
return Status::OK();
}

Status RedisContext::SubscribeAsync(const ClientID &client_id,
const TablePubsub pubsub_channel,
const RedisCallback &redisCallback,
Expand Down
6 changes: 6 additions & 0 deletions src/ray/gcs/redis_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ class RedisContext {
const TablePubsub pubsub_channel, RedisCallback redisCallback,
int log_length = -1);

/// Run an arbitrary Redis command without a callback.
///
/// \param args The vector of command args to pass to Redis.
/// \return Status.
Status RunArgvAsync(const std::vector<std::string> &args);

/// Subscribe to a specific Pub-Sub channel.
///
/// \param client_id The client ID that subscribe this message.
Expand Down
34 changes: 31 additions & 3 deletions src/ray/gcs/tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ class PubsubInterface {
virtual ~PubsubInterface(){};
};

template <typename ID, typename Data>
class LogInterface {
public:
using DataT = typename Data::NativeTableType;
using WriteCallback =
std::function<void(AsyncGcsClient *client, const ID &id, const DataT &data)>;
virtual Status Append(const JobID &job_id, const ID &id, std::shared_ptr<DataT> &data,
const WriteCallback &done) = 0;
virtual Status AppendAt(const JobID &job_id, const ID &task_id,
std::shared_ptr<DataT> &data, const WriteCallback &done,
const WriteCallback &failure, int log_length) = 0;
virtual ~LogInterface(){};
};

/// \class Log
///
/// A GCS table where every entry is an append-only log. This class is not
Expand All @@ -63,14 +77,13 @@ class PubsubInterface {
/// ClientTable: Stores a log of which GCS clients have been added or deleted
/// from the system.
template <typename ID, typename Data>
class Log : virtual public PubsubInterface<ID> {
class Log : public LogInterface<ID, Data>, virtual public PubsubInterface<ID> {
public:
using DataT = typename Data::NativeTableType;
using Callback = std::function<void(AsyncGcsClient *client, const ID &id,
const std::vector<DataT> &data)>;
/// The callback to call when a write to a key succeeds.
using WriteCallback =
std::function<void(AsyncGcsClient *client, const ID &id, const DataT &data)>;
using WriteCallback = typename LogInterface<ID, Data>::WriteCallback;
/// The callback to call when a SUBSCRIBE call completes and we are ready to
/// request and receive notifications.
using SubscriptionCallback = std::function<void(AsyncGcsClient *client)>;
Expand Down Expand Up @@ -354,6 +367,21 @@ class TaskLeaseTable : public Table<TaskID, TaskLeaseData> {
pubsub_channel_ = TablePubsub::TASK_LEASE;
prefix_ = TablePrefix::TASK_LEASE;
}

Status Add(const JobID &job_id, const TaskID &id, std::shared_ptr<TaskLeaseDataT> &data,
const WriteCallback &done) override {
RAY_RETURN_NOT_OK((Table<TaskID, TaskLeaseData>::Add(job_id, id, data, done)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: there's a pair of unnecessary parentheses.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I need the parentheses for it to build for some reason...I don't know exactly why, but the compiler seems to think that there are 2 arguments to the macro, maybe because of the templated argument?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the reason that you have to add Table<TaskID, TaskLeaseData>:: because the two methods have the same name? Would that go away if we just change this method name from Add to something else like UpdateTaskLease?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just googled this, the reason is because the macro processor doesn't know about template, thus recognizes Table<TaskID as a parameter.
Someone suggests this way:

#define COMMA ,
RAY_RETURN_NOT_OK(Table<TaskID COMMA TaskLeaseData>::(...))

It works. But I think adding an extra pair of parentheses is easier. Or as @robertnishihara suggested, use another function name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided against changing the function name since I believe it's nicer to have the TaskLeaseTable still implement the Table<TaskID, TaskLeaseData> interface.

// Mark the entry for expiration in Redis. It's okay if this command fails
// since the lease entry itself contains the expiration period. In the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, is it actually ok if this fails? E.g., if it fails, then it will fail to extend the previous expiration, and so the key could actually expire too early, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to Redis documentation:

The timeout will only be cleared by commands that delete or overwrite the contents of the key, including DEL, SET, GETSET and all the *STORE commands.

I believe that the Add command in the previous line will overwrite the contents.

// worst case, if the command fails, then a client that looks up the lease
// entry will overestimate the expiration time.
// TODO(swang): Use a common helper function to format the key instead of
// hardcoding it to match the Redis module.
std::vector<std::string> args = {"PEXPIRE",
EnumNameTablePrefix(prefix_) + id.binary(),
std::to_string(data->timeout)};
return context_->RunArgvAsync(args);
}
};

namespace raylet {
Expand Down
13 changes: 13 additions & 0 deletions src/ray/raylet/lineage_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,19 @@ void LineageCache::HandleEntryCommitted(const TaskID &task_id) {
}
}

const Task &LineageCache::GetTask(const TaskID &task_id) const {
const auto &entries = lineage_.GetEntries();
auto it = entries.find(task_id);
RAY_CHECK(it != entries.end());
return it->second.TaskData();
}

bool LineageCache::ContainsTask(const TaskID &task_id) const {
const auto &entries = lineage_.GetEntries();
auto it = entries.find(task_id);
return it != entries.end();
}

} // namespace raylet

} // namespace ray
12 changes: 12 additions & 0 deletions src/ray/raylet/lineage_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,18 @@ class LineageCache {
/// \param task_id The ID of the task entry that was committed.
void HandleEntryCommitted(const TaskID &task_id);

/// Get a task. The task must be in the lineage cache.
///
/// \param task_id The ID of the task to get.
/// \return A const reference to the task data.
const Task &GetTask(const TaskID &task_id) const;

/// Get whether the lineage cache contains the task.
///
/// \param task_id The ID of the task to get.
/// \return Whether the task is in the lineage cache.
bool ContainsTask(const TaskID &task_id) const;

private:
/// Try to flush a task that is in UNCOMMITTED_READY state. If the task has
/// parents that are not committed yet, then the child will be flushed once
Expand Down
70 changes: 66 additions & 4 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
local_queues_(SchedulingQueue()),
scheduling_policy_(local_queues_),
reconstruction_policy_(
io_service_, [this](const TaskID &task_id) { ResubmitTask(task_id); },
io_service_,
[this](const TaskID &task_id) { HandleTaskReconstruction(task_id); },
RayConfig::instance().initial_reconstruction_timeout_milliseconds(),
gcs_client_->client_table().GetLocalClientId(), gcs_client->task_lease_table(),
std::make_shared<ObjectDirectory>(gcs_client)),
std::make_shared<ObjectDirectory>(gcs_client),
gcs_client_->task_reconstruction_log()),
task_dependency_manager_(
object_manager, reconstruction_policy_, io_service,
gcs_client_->client_table().GetLocalClientId(),
Expand Down Expand Up @@ -1116,8 +1118,68 @@ void NodeManager::FinishAssignedTask(Worker &worker) {
worker.AssignTaskId(TaskID::nil());
}

void NodeManager::ResubmitTask(const TaskID &task_id) {
RAY_LOG(WARNING) << "Task re-execution is not currently implemented";
void NodeManager::HandleTaskReconstruction(const TaskID &task_id) {
// Retrieve the task spec in order to re-execute the task.
RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup(
JobID::nil(), task_id,
/*success_callback=*/
[this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id,
const ray::protocol::TaskT &task_data) {
// The task was in the GCS task table. Use the stored task spec to
// re-execute the task.
const Task task(task_data);
ResubmitTask(task);
},
/*failure_callback=*/
[this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id) {
// The task was not in the GCS task table. It must therefore be in the
// lineage cache.
if (!lineage_cache_.ContainsTask(task_id)) {
// The task was not in the lineage cache.
// TODO(swang): This should not ever happen, but Java TaskIDs are
// currently computed differently from Python TaskIDs, so
// reconstruction is currently broken for Java. Once the TaskID
// generation code matches for both frontends, we should be able to
// remove this warning and make it a fatal check.
RAY_LOG(WARNING) << "Task " << task_id << " to reconstruct was not found in "
"the GCS or the lineage cache. This "
"job may hang.";
} else {
// Use a copy of the cached task spec to re-execute the task.
const Task task = lineage_cache_.GetTask(task_id);
ResubmitTask(task);
}
}));
}

void NodeManager::ResubmitTask(const Task &task) {
// Actor reconstruction is turned off by default right now. If this is an
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious: is this a stage approach - aka. actor reconstruction will be enabled later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we're still thinking about how exactly actor reconstruction should be exposed to the user. I believe the current plan is to have actor reconstruction enabled by a flag from the frontend.

// actor task, treat the task as failed and do not resubmit it.
if (task.GetTaskSpecification().IsActorTask()) {
TreatTaskAsFailed(task.GetTaskSpecification());
return;
}

// Driver tasks cannot be reconstructed. If this is a driver task, push an
// error to the driver and do not resubmit it.
if (task.GetTaskSpecification().IsDriverTask()) {
// TODO(rkn): Define this constant somewhere else.
std::string type = "put_reconstruction";
std::ostringstream error_message;
error_message << "The task with ID " << task.GetTaskSpecification().TaskId()
<< " is a driver task and so the object created by ray.put "
<< "could not be reconstructed.";
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
task.GetTaskSpecification().DriverId(), type, error_message.str(),
current_time_ms()));
return;
}

// The task may be reconstructed. Submit it with an empty lineage, since any
// uncommitted lineage must already be in the lineage cache. At this point,
// the task should not yet exist in the local scheduling queue. If it does,
// then this is a spurious reconstruction.
SubmitTask(task, Lineage());
}

void NodeManager::HandleObjectLocal(const ObjectID &object_id) {
Expand Down
7 changes: 5 additions & 2 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ class NodeManager {
void FinishAssignedTask(Worker &worker);
/// Perform a placement decision on placeable tasks.
void ScheduleTasks();
/// Resubmit a task whose return value needs to be reconstructed.
void ResubmitTask(const TaskID &task_id);
/// Handle a task whose return value(s) must be reconstructed.
void HandleTaskReconstruction(const TaskID &task_id);
/// Resubmit a task for execution. This is a task that was previously already
/// submitted to a raylet but which must now be re-executed.
void ResubmitTask(const Task &task);
/// Attempt to forward a task to a remote different node manager. If this
/// fails, the task will be resubmit locally.
///
Expand Down
56 changes: 45 additions & 11 deletions src/ray/raylet/reconstruction_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ ReconstructionPolicy::ReconstructionPolicy(
std::function<void(const TaskID &)> reconstruction_handler,
int64_t initial_reconstruction_timeout_ms, const ClientID &client_id,
gcs::PubsubInterface<TaskID> &task_lease_pubsub,
std::shared_ptr<ObjectDirectoryInterface> object_directory)
std::shared_ptr<ObjectDirectoryInterface> object_directory,
gcs::LogInterface<TaskID, TaskReconstructionData> &task_reconstruction_log)
: io_service_(io_service),
reconstruction_handler_(reconstruction_handler),
initial_reconstruction_timeout_ms_(initial_reconstruction_timeout_ms),
client_id_(client_id),
task_lease_pubsub_(task_lease_pubsub),
object_directory_(std::move(object_directory)) {}
object_directory_(std::move(object_directory)),
task_reconstruction_log_(task_reconstruction_log) {}

void ReconstructionPolicy::SetTaskTimeout(
std::unordered_map<TaskID, ReconstructionTask>::iterator task_it,
Expand Down Expand Up @@ -59,6 +61,23 @@ void ReconstructionPolicy::SetTaskTimeout(
});
}

void ReconstructionPolicy::HandleReconstructionLogAppend(const TaskID &task_id,
bool success) {
auto it = listening_tasks_.find(task_id);
if (it == listening_tasks_.end()) {
return;
}

// Reset the timer to wait for task lease notifications again. NOTE(swang):
// The timer should already be set here, but we extend it to give some time
// for the reconstructed task to propagate notifications.
SetTaskTimeout(it, initial_reconstruction_timeout_ms_);

if (success) {
reconstruction_handler_(task_id);
}
}

void ReconstructionPolicy::AttemptReconstruction(const TaskID &task_id,
const ObjectID &required_object_id,
int reconstruction_attempt) {
Expand All @@ -81,17 +100,32 @@ void ReconstructionPolicy::AttemptReconstruction(const TaskID &task_id,
// reconstruction_attempt many times.
return;
}

// Attempt to reconstruct the task by inserting an entry into the task
// reconstruction log. This will fail if another node has already inserted
// an entry for this reconstruction.
auto reconstruction_entry = std::make_shared<TaskReconstructionDataT>();
reconstruction_entry->num_reconstructions = reconstruction_attempt;
reconstruction_entry->node_manager_id = client_id_.binary();
RAY_CHECK_OK(task_reconstruction_log_.AppendAt(
JobID::nil(), task_id, reconstruction_entry,
/*success_callback=*/
[this](gcs::AsyncGcsClient *client, const TaskID &task_id,
const TaskReconstructionDataT &data) {
HandleReconstructionLogAppend(task_id, /*success=*/true);
},
/*failure_callback=*/
[this](gcs::AsyncGcsClient *client, const TaskID &task_id,
const TaskReconstructionDataT &data) {
HandleReconstructionLogAppend(task_id, /*success=*/false);
},
reconstruction_attempt));

// Increment the number of times reconstruction has been attempted. This is
// used to suppress duplicate reconstructions of the same task.
// used to suppress duplicate reconstructions of the same task. If
// reconstruction is attempted again, the next attempt will try to insert a
// task reconstruction entry at the next index in the log.
it->second.reconstruction_attempt++;

// Reset the timer to wait for task lease notifications again. NOTE(swang):
// The timer should already be set here, but we extend it to give some time
// for the reconstructed task to propagate notifications.
SetTaskTimeout(it, initial_reconstruction_timeout_ms_);
// TODO(swang): Suppress simultaneous attempts to reconstruct the task using
// the task reconstruction log.
reconstruction_handler_(task_id);
}

void ReconstructionPolicy::HandleTaskLeaseExpired(const TaskID &task_id) {
Expand Down
18 changes: 12 additions & 6 deletions src/ray/raylet/reconstruction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface {
/// the GCS.
/// \param task_lease_pubsub The GCS pub-sub storage system to request task
/// lease notifications from.
ReconstructionPolicy(boost::asio::io_service &io_service,
std::function<void(const TaskID &)> reconstruction_handler,
int64_t initial_reconstruction_timeout_ms,
const ClientID &client_id,
gcs::PubsubInterface<TaskID> &task_lease_pubsub,
std::shared_ptr<ObjectDirectoryInterface> object_directory);
ReconstructionPolicy(
boost::asio::io_service &io_service,
std::function<void(const TaskID &)> reconstruction_handler,
int64_t initial_reconstruction_timeout_ms, const ClientID &client_id,
gcs::PubsubInterface<TaskID> &task_lease_pubsub,
std::shared_ptr<ObjectDirectoryInterface> object_directory,
gcs::LogInterface<TaskID, TaskReconstructionData> &task_reconstruction_log);

/// Listen for task lease notifications about an object that may require
/// reconstruction. If no notifications are received within the initial
Expand Down Expand Up @@ -114,6 +115,10 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface {
/// Handle expiration of a task lease.
void HandleTaskLeaseExpired(const TaskID &task_id);

/// Handle the response for an attempt at adding an entry to the task
/// reconstruction log.
void HandleReconstructionLogAppend(const TaskID &task_id, bool success);

/// The event loop.
boost::asio::io_service &io_service_;
/// The handler to call for tasks that require reconstruction.
Expand All @@ -127,6 +132,7 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface {
gcs::PubsubInterface<TaskID> &task_lease_pubsub_;
/// The object directory used to lookup object locations.
std::shared_ptr<ObjectDirectoryInterface> object_directory_;
gcs::LogInterface<TaskID, TaskReconstructionData> &task_reconstruction_log_;
/// The tasks that we are currently subscribed to in the GCS.
std::unordered_map<TaskID, ReconstructionTask> listening_tasks_;
};
Expand Down
Loading