Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[webui] Scalability fixes for the task timeline and visualizations #935

Merged
merged 7 commits into from
Sep 10, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fixes
  • Loading branch information
ericl committed Sep 6, 2017
commit fc53f9781290d19588b82a287e01413d5b6e9d67
73 changes: 44 additions & 29 deletions python/ray/experimental/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
TASK_STATUS_RECONSTRUCTING: "RECONSTRUCTING",
}

# Hard limit on the number of tasks to return to the UI client at once
MAX_TASKS_TO_VISUALIZE = 10000


class GlobalState(object):
"""A class used to interface with the Ray control state.
Expand Down Expand Up @@ -419,17 +422,25 @@ def task_profiles(self, start=None, end=None, num_tasks=None, fwd=True):
task_info = dict()
event_log_sets = self.redis_client.keys("event_log*")

if num_tasks is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code should not be part of the GlobalState API and rather num_tasks should be passed in from the webui

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

num_tasks = MAX_TASKS_TO_VISUALIZE
print(
"Warning: at most {} tasks will be fetched within this "
"time range.".format(MAX_TASKS_TO_VISUALIZE))
elif num_tasks > MAX_TASKS_TO_VISUALIZE:
print(
"Warning: too many tasks to visualize, "
"fetching only the first {} of {}.".format(
MAX_TASKS_TO_VISUALIZE, num_tasks))
num_tasks = MAX_TASKS_TO_VISUALIZE
# The heap is used to maintain the set of x tasks that occurred the
# most recently across all of the workers, where x is defined as the
# function parameter num. The key is the start time of the "get_task"
# component of each task. Calling heappop will result in the taks with
# the earliest "get_task_start" to be removed from the heap.

# Don't maintain the heap if we're not slicing some number
if num_tasks is not None:
heap = []
heapq.heapify(heap)
heap_size = 0
heap = []
heapq.heapify(heap)
heap_size = 0

# Set up a param dict to pass the redis command
params = {"withscores": True}
Expand All @@ -443,12 +454,11 @@ def task_profiles(self, start=None, end=None, num_tasks=None, fwd=True):
elif start is not None:
params["max"] = time.time()

if num_tasks is not None:
if start is None and end is None:
params["end"] = num_tasks - 1
else:
params["num"] = num_tasks
params["start"] = 0
if start is None and end is None:
params["end"] = num_tasks - 1
else:
params["num"] = num_tasks
params["start"] = 0

# Parse through event logs to determine task start and end points.
for event_log_set in event_log_sets:
Expand Down Expand Up @@ -481,9 +491,8 @@ def task_profiles(self, start=None, end=None, num_tasks=None, fwd=True):
task_info[task_id]["score"] = score
# Add task to (min/max) heap by its start point.
# if fwd, we want to delete the largest elements, so -score
if num_tasks is not None:
heapq.heappush(heap, (-score if fwd else score, task_id))
heap_size += 1
heapq.heappush(heap, (-score if fwd else score, task_id))
heap_size += 1

for event in event_dict:
if event[1] == "ray:get_task" and event[2] == 1:
Expand Down Expand Up @@ -518,7 +527,7 @@ def task_profiles(self, start=None, end=None, num_tasks=None, fwd=True):
task_info[task_id]["function_name"] = (
event[3]["function_name"])

if num_tasks is not None and heap_size > num_tasks:
if heap_size > num_tasks:
min_task, task_id_hex = heapq.heappop(heap)
del task_info[task_id_hex]
heap_size -= 1
Expand Down Expand Up @@ -562,8 +571,10 @@ def micros(ts):
def micros_rel(ts):
return micros(ts - start_time)

task_profiles = self.task_profiles(start=0, end=time.time())
Copy link
Contributor Author

@ericl ericl Sep 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was performing an unnecessary full table scan.

task_table = self.task_table()
Copy link
Contributor Author

@ericl ericl Sep 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was also a full table scan. The replacement is slower when the number of tasks is small, but has a bounded worst case latency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is actually a way to get the best of both worlds with the SCAN [1] class of functions from redis (I guess that's what you mean with the TODO above, do you want to make it a little more precise?)

[1] https://redis.io/commands/scan

Copy link
Contributor Author

@ericl ericl Sep 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SCAN is still O(n), but MGET is bounded by the limit. I think the refactoring is a bit nontrivial but I updated the TODO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, sounds good!

task_table = {}
# TODO(ekl) reduce the number of RPCs here
for task_id, _ in task_info.items():
task_table[task_id] = self.task_table(task_id)
seen_obj = {}

full_trace = []
Expand Down Expand Up @@ -649,14 +660,16 @@ def micros_rel(ts):
if parent_info:
parent_worker = workers[parent_info["worker_id"]]
parent_times = self._get_times(parent_info)
parent_profile = task_info.get(
task_table[task_id]["TaskSpec"]["ParentTaskID"])
parent = {
"cat": "submit_task",
"pid": "Node " + parent_worker["node_ip_address"],
"tid": parent_info["worker_id"],
"ts": micros_rel(task_profiles[task_table[task_id]
["TaskSpec"]
["ParentTaskID"]]
["get_arguments_start"]),
"ts": micros_rel(
parent_profile and
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With truncation it's possible we don't have some parent profiles. We are just ignoring them for now.

parent_profile["get_arguments_start"] or
start_time),
"ph": "s",
"name": "SubmitTask",
"args": {},
Expand Down Expand Up @@ -699,14 +712,16 @@ def micros_rel(ts):
if parent_info:
parent_worker = workers[parent_info["worker_id"]]
parent_times = self._get_times(parent_info)
parent_profile = task_info.get(
task_table[task_id]["TaskSpec"]["ParentTaskID"])
parent = {
"cat": "submit_task",
"pid": "Node " + parent_worker["node_ip_address"],
"tid": parent_info["worker_id"],
"ts": micros_rel(task_profiles[task_table[task_id]
["TaskSpec"]
["ParentTaskID"]]
["get_arguments_start"]),
"ts": micros_rel(
parent_profile and
parent_profile["get_arguments_start"] or
start_time),
"ph": "s",
"name": "SubmitTask",
"args": {},
Expand Down Expand Up @@ -743,7 +758,7 @@ def micros_rel(ts):
seen_obj[arg] += 1
owner_task = self._object_table(arg)["TaskID"]
owner_worker = (workers[
task_profiles[owner_task]["worker_id"]])
task_info[owner_task]["worker_id"]])
# Adding/subtracting 2 to the time associated with
# the beginning/ending of the flow event is
# necessary to make the flow events show up
Expand All @@ -760,8 +775,8 @@ def micros_rel(ts):
"cat": "obj_dependency",
"pid": ("Node " +
owner_worker["node_ip_address"]),
"tid": task_profiles[owner_task]["worker_id"],
"ts": micros_rel(task_profiles[
"tid": task_info[owner_task]["worker_id"],
"ts": micros_rel(task_info[
owner_task]["store_outputs_end"]) - 2,
"ph": "s",
"name": "ObjectDependency",
Expand Down
7 changes: 4 additions & 3 deletions python/ray/experimental/ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,14 +413,15 @@ def handle_submit(sender):
raise ValueError("Unexpected time value '{}'".format(
time_opt.value))
# Write trace to a JSON file
print("{} tasks to trace".format(len(tasks)))
print("Dumping task profiling data to " + json_tmp)
print("Collected profiles for {} tasks.".format(len(tasks)))
print(
"Dumping task profile data to {}, "
"this might take a while...".format(json_tmp))
ray.global_state.dump_catapult_trace(json_tmp,
tasks,
breakdowns=breakdown,
obj_dep=obj_dep.value,
task_dep=task_dep.value)

print("Opening html file in browser...")

# Check that the catapult repo is cloned to the correct location
Expand Down