Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Visualize recent tasks in timeline. #240

Merged
merged 1 commit into from
Feb 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions webui/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
This is Ray's Web UI. It consists of two components:

* The **frontend** is a [Polymer](https://www.polymer-project.org/1.0/) app that
uses [google-charts](https://elements.polymer-project.org/elements/google-chart)
for visualization.
uses [D3](https://d3js.org/) for visualization.
* The **backend** is a Python 3 websocket server (see `backend/ray_ui.py`) that
connects to Redis and potentially Ray.

Expand Down
83 changes: 68 additions & 15 deletions webui/backend/ray_ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import binascii
from collections import defaultdict
import json
import numpy as np
import redis
import websockets

parser = argparse.ArgumentParser(description="parse information for the web ui")
Expand Down Expand Up @@ -31,36 +33,87 @@ def key_to_hex_identifiers(key):
task_id = hex_identifier(key[offset:(offset + IDENTIFIER_LENGTH)])
return worker_id, task_id

worker_ids = []

async def hello(websocket, path):
conn = await aioredis.create_connection((redis_ip_address, redis_port), loop=loop)
async def handle_get_recent_tasks(websocket, redis_conn, num_tasks):
keys = await redis_conn.execute("keys", "event_log:*")
if len(keys) == 0:
# There are no tasks, so send a message to the client saying so.
await websocket.send(json.dumps({"num_tasks": 0}))
else:
timestamps = []
contents = []
for key in keys:
content = await redis_conn.execute("lrange", key, "0", "-1")
contents.append(json.loads(content[0].decode()))
timestamps += [timestamp for (timestamp, task, kind, info) in contents[-1] if task == "ray:task"]

timestamps.sort()
time_cutoff = timestamps[(-2 * num_tasks):][0]

max_time = timestamps[-1]
min_time = time_cutoff - (max_time - time_cutoff) * 0.1
max_time = max_time + (max_time - time_cutoff) * 0.1

task_data = []
for i in range(len(keys)):
worker_id, task_id = key_to_hex_identifiers(keys[i])
data = contents[i]
if worker_id not in worker_ids:
worker_ids.append(worker_id)
worker_index = worker_ids.index(worker_id)

task_times = [timestamp for (timestamp, task, kind, info) in data if task == "ray:task"]
if task_times[1] <= time_cutoff:
continue

task_get_arguments_times = [timestamp for (timestamp, task, kind, info) in data if task == "ray:task:get_arguments"]
task_execute_times = [timestamp for (timestamp, task, kind, info) in data if task == "ray:task:execute"]
task_store_outputs_times = [timestamp for (timestamp, task, kind, info) in data if task == "ray:task:store_outputs"]
task_data.append({"task": task_times,
"get_arguments": task_get_arguments_times,
"execute": task_execute_times,
"store_outputs": task_store_outputs_times,
"worker_index": worker_index})
reply = {"min_time": min_time,
"max_time": max_time,
"num_tasks": len(task_data),
"task_data": task_data}
await websocket.send(json.dumps(reply))

async def serve_requests(websocket, path):
redis_conn = await aioredis.create_connection((redis_ip_address, redis_port), loop=loop)

# We loop infinitely because otherwise the websocket will be closed.
while True:
command = json.loads(await websocket.recv())
print("received command {}".format(command))

if command["command"] == "get-recent-tasks":
await handle_get_recent_tasks(websocket, redis_conn, command["num"])

if command["command"] == "get-workers":
result = []
workers = await conn.execute("keys", "WorkerInfo:*")
workers = await redis_conn.execute("keys", "WorkerInfo:*")
for key in workers:
content = await conn.execute("hgetall", key)
content = await redis_conn.execute("hgetall", key)
worker_id = key_to_hex_identifier(key)
result.append({"worker": worker_id, "export_counter": int(content[1])})
await websocket.send(json.dumps(result))
elif command["command"] == "get-clients":
result = []
clients = await conn.execute("keys", "CL:*")
clients = await redis_conn.execute("keys", "CL:*")
for key in clients:
content = await conn.execute("hgetall", key)
content = await redis_conn.execute("hgetall", key)
result.append({"client": hex_identifier(content[1]),
"node_ip_address": content[3].decode(),
"client_type": content[5].decode()})
await websocket.send(json.dumps(result))
elif command["command"] == "get-objects":
result = []
objects = await conn.execute("keys", "OI:*")
objects = await redis_conn.execute("keys", "OI:*")
for key in objects:
content = await conn.execute("hgetall", key)
content = await redis_conn.execute("hgetall", key)
result.append({"object_id": hex_identifier(content[1]),
"hash": hex_identifier(content[3]),
"data_size": content[5].decode()})
Expand All @@ -73,18 +126,18 @@ async def hello(websocket, path):
await websocket.send(json.dumps({"object_id": "none"}))
elif command["command"] == "get-tasks":
result = []
tasks = await conn.execute("keys", "TT:*")
tasks = await redis_conn.execute("keys", "TT:*")
for key in tasks:
content = await conn.execute("hgetall", key)
content = await redis_conn.execute("hgetall", key)
result.append({"task_id": key_to_hex_identifier(key),
"state": int(content[1]),
"node_id": hex_identifier(content[3])})
await websocket.send(json.dumps(result))
elif command["command"] == "get-timeline":
tasks = defaultdict(list)
for key in await conn.execute("keys", "event_log:*"):
for key in await redis_conn.execute("keys", "event_log:*"):
worker_id, task_id = key_to_hex_identifiers(key)
content = await conn.execute("lrange", key, "0", "-1")
content = await redis_conn.execute("lrange", key, "0", "-1")
data = json.loads(content[0].decode())
begin_and_end_time = [timestamp for (timestamp, task, kind, info) in data if task == "ray:task"]
tasks[worker_id].append({"task_id": task_id,
Expand All @@ -93,9 +146,9 @@ async def hello(websocket, path):
await websocket.send(json.dumps(tasks))
elif command["command"] == "get-events":
result = []
for key in await conn.execute("keys", "event_log:*"):
for key in await redis_conn.execute("keys", "event_log:*"):
worker_id, task_id = key_to_hex_identifiers(key)
answer = await conn.execute("lrange", key, "0", "-1")
answer = await redis_conn.execute("lrange", key, "0", "-1")
assert len(answer) == 1
events = json.loads(answer[0].decode())
result.extend([{"worker_id": worker_id,
Expand All @@ -109,7 +162,7 @@ async def hello(websocket, path):
redis_address = args.redis_address.split(":")
redis_ip_address, redis_port = redis_address[0], int(redis_address[1])

start_server = websockets.serve(hello, "localhost", args.port)
start_server = websockets.serve(serve_requests, "localhost", args.port)

loop.run_until_complete(start_server)
loop.run_forever()
2 changes: 2 additions & 0 deletions webui/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
<meta name="generator" content="Polymer Starter Kit">
<meta name="viewport" content="width=device-width, minimum-scale=1, initial-scale=1, user-scalable=yes">

<script src="https://d3js.org/d3.v4.min.js"></script>

<title>Ray UI</title>
<meta name="description" content="A web UI for Ray">

Expand Down
2 changes: 2 additions & 0 deletions webui/src/ray-app.html
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
<a name="tasks" href="/tasks">Tasks</a>
<a name="events" href="/events">Events</a>
<a name="timeline" href="/timeline">Timeline</a>
<a name="recent-tasks" href="/recent-tasks">Recent Tasks</a>
</iron-selector>
</app-drawer>

Expand All @@ -98,6 +99,7 @@
<ray-tasks name="tasks"></ray-tasks>
<ray-events name="events"></ray-events>
<ray-timeline name="timeline"></ray-timeline>
<ray-recent-tasks name="recent-tasks"></ray-recent-tasks>
<ray-view404 name="view404"></ray-view404>
</iron-pages>
</app-header-layout>
Expand Down
76 changes: 76 additions & 0 deletions webui/src/ray-recent-tasks.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<link rel="import" href="../bower_components/polymer/polymer.html">
<link rel="import" href="shared-styles.html">

<script src="recent-tasks.js"></script>

<dom-module id="ray-recent-tasks">
<template>
<style include="shared-styles">
:host {
display: block;

padding: 10px;
}
rect:hover
{
opacity: 0.5;
}

</style>

<div class="card">
<h1>Ray Recent Tasks</h1>
<input value=100 id="input_num_tasks"></input>
<button id="refresh_button">Refresh</button>
<br></br>
<svg id="recent_tasks"></svg>
</div>
</template>

<script>
var redis_address = "ws://127.0.0.1:8888";

var width = 1000,
height = 500,
row_height = 50,
num_workers = 8;

Polymer({
is: 'ray-recent-tasks',
ready: function() {

var self = this;
var socket = new WebSocket(redis_address);

recent_tasks = new RecentTasks(self.$.recent_tasks, {
"width": width, "height": height,
});

socket.onopen = function() {
socket.send(JSON.stringify({"command": "get-recent-tasks", "num": 100}));
}

socket.onmessage = function(messageEvent) {
var task_info = JSON.parse(messageEvent.data);
console.log(task_info.num_tasks);
if (task_info["num_tasks"] == 0) {
console.log("No tasks yet.");
return;
}
recent_tasks.draw_new_tasks(task_info);
}

socket.onclose = function(closeEvent) {
console.log(closeEvent)
}

d3.select(this.$.refresh_button)
.on("click", function () {
recent_tasks.erase();
var num_tasks_to_get = self.$.input_num_tasks.value;
socket.send(JSON.stringify({"command": "get-recent-tasks", "num": parseInt(num_tasks_to_get)}));
});
}
});
</script>
</dom-module>
74 changes: 74 additions & 0 deletions webui/src/recent-tasks.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
RecentTasks = function(elem, options) {
var self = this;

this.options = options;
var barHeight = 25;

var svg = d3.select(elem)
.attr("width", this.options.width)
.attr("height", this.options.height);

this.draw_new_tasks = function(task_info) {
this.task_info = task_info;
var x = d3.scaleLinear()
.domain([task_info.min_time, task_info.max_time])
.range([-1, width + 1]);

var xAxis = d3.axisBottom(x)
.tickSize(-height);

var gx = svg.append("g")
.attr("class", "axis axis--x")
.attr("transform", "translate(0," + (height - 10) + ")")
.call(xAxis);

var task_rects = svg.append("g").attr("class", "task_rects");
var get_arguments_rects = svg.append("g").attr("class", "get_arguments_rects");
var execute_rects = svg.append("g").attr("class", "execute_rects");
var store_outputs_rects = svg.append("g").attr("class", "store_outputs_rects");

task_rects.selectAll("rect")
.data(this.task_info.task_data)
.enter()
.append("rect")
.attr("x", function (d) { return x(d.task[0]); })
.attr("y", function (d) { return (d.worker_index + 1) * barHeight; })
.attr("width", function (d) { return x(d.task[1]) - x(d.task[0]); })
.attr("height", function (d) { return barHeight - 1; })
.attr("fill", "orange")

get_arguments_rects.selectAll("rect")
.data(this.task_info.task_data)
.enter()
.append("rect")
.attr("x", function (d) { return x(d.get_arguments[0]); })
.attr("y", function (d) { return (d.worker_index + 1) * barHeight + 1; })
.attr("width", function (d) { return x(d.get_arguments[1]) - x(d.get_arguments[0]); })
.attr("height", function (d) { return barHeight - 3; })
.attr("fill", "black")

execute_rects.selectAll("rect")
.data(this.task_info.task_data)
.enter()
.append("rect")
.attr("x", function (d) { return x(d.execute[0]); })
.attr("y", function (d) { return (d.worker_index + 1) * barHeight + 1; })
.attr("width", function (d) { return x(d.execute[1]) - x(d.execute[0]); })
.attr("height", function (d) { return barHeight - 3; })
.attr("fill", "blue")

store_outputs_rects.selectAll("rect")
.data(this.task_info.task_data)
.enter()
.append("rect")
.attr("x", function (d) { return x(d.store_outputs[0]); })
.attr("y", function (d) { return (d.worker_index + 1) * barHeight + 1; })
.attr("width", function (d) { return x(d.store_outputs[1]) - x(d.store_outputs[0]); })
.attr("height", function (d) { return barHeight - 3; })
.attr("fill", "green")
}

this.erase = function() {
svg.selectAll("g").remove()
}
}