Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(autoware_debug_tools): add processing time visualizer #75

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .cspell-partial.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"ignorePaths": [
"**/perception/**",
"sensing/tier4_pcl_extensions/include/**",
"perception/bytetrack/lib/**"
"perception/bytetrack/lib/**",
"common/autoware_debug_tools/**"
],
"ignoreRegExpList": [],
"words": ["dltype", "tvmgen", "quantizer", "imageio", "mimsave"]
Expand Down
23 changes: 23 additions & 0 deletions common/autoware_debug_tools/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Autoware Debug Tools

This package provides tools for debugging Autoware.

## Processing Time Visualizer

This tool visualizes `tier4_debug_msgs/msg/ProcessingTimeTree` messages.

### Usage

1. Run the following command to start the visualizer.

```bash
ros2 run autoware_debug_tools processing_time_visualizer
```

2. Select a topic to visualize.

![select_topic](images/select-topic.png)

3. Then, the visualizer will show the processing time tree.

![visualize-tree](images/visualize-tree.png)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import curses
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add README about what is the purpose of this script and how to use it.
You can refer to README of planning_debug_tools in autoware_tools.

import time
from typing import Dict
import uuid

import pyperclip
import rclpy
import rclpy.executors
from rclpy.node import Node
from tier4_debug_msgs.msg import ProcessingTimeTree as ProcessingTimeTreeMsg

from .print_tree import print_trees
from .topic_selector import select_topic
from .tree import ProcessingTimeTree
from .utils import exit_curses
from .utils import init_curses


class ProcessingTimeVisualizer(Node):
def __init__(self):
super().__init__("processing_time_visualizer" + str(uuid.uuid4()).replace("-", "_"))
self.subscriber = self.subscribe_processing_time_tree()
self.trees: Dict[str, ProcessingTimeTree] = {}
self.worst_case_tree: Dict[str, ProcessingTimeTree] = {}
self.stdcscr = init_curses()
self.show_comment = False
print_trees("🌲 Processing Time Tree 🌲", self.topic_name, self.trees, self.stdcscr)

self.create_timer(0.1, self.update_screen)

def subscribe_processing_time_tree(self):
topics = []

s = time.time()
while True:
for topic_name, topic_types in self.get_topic_names_and_types():
for topic_type in topic_types:
if (
topic_type == "tier4_debug_msgs/msg/ProcessingTimeTree"
and topic_name not in topics
):
topics.append(topic_name)

if time.time() - s > 1.0:
break

if len(topics) == 0:
self.get_logger().info("No ProcessingTimeTree topic found")
self.get_logger().info("Exiting...")
exit(1)
else:
self.topic_name = curses.wrapper(select_topic, topics)
subscriber = self.create_subscription(
ProcessingTimeTreeMsg,
self.topic_name,
self.callback,
10,
)

return subscriber

def update_screen(self):
key = self.stdcscr.getch()

self.show_comment = not self.show_comment if key == ord("c") else self.show_comment
logs = print_trees(
"🌲 Processing Time Tree 🌲",
self.topic_name,
self.trees.values(),
self.stdcscr,
self.show_comment,
)
if key == ord("y"):
pyperclip.copy(logs)
if key == ord("q"):
raise KeyboardInterrupt

def callback(self, msg: ProcessingTimeTreeMsg):
tree = ProcessingTimeTree.from_msg(msg)
self.trees[tree.name] = tree
if tree.name not in self.worst_case_tree:
self.worst_case_tree[tree.name] = tree
else:
self.worst_case_tree[tree.name] = (
tree
if tree.processing_time > self.worst_case_tree[tree.name].processing_time
else self.worst_case_tree[tree.name]
)


def main(args=None):
rclpy.init(args=args)
try:
node = ProcessingTimeVisualizer()
except KeyboardInterrupt:
exit_curses()
return
try:
rclpy.spin(node)
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
node.destroy_node()
exit_curses()
if len(node.worst_case_tree) == 0:
exit(1)
print("⏰ Worst Case Execution Time ⏰")
for tree in node.worst_case_tree.values():
print(tree, end=None)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import curses
from itertools import chain
from typing import List

from .tree import ProcessingTimeTree
from .utils import abbreviate_topic
from .utils import wrap_lines


def print_trees(
prefix: str,
topic_name: str,
trees: List[ProcessingTimeTree],
stdscr: curses.window,
show_comment: bool = False,
):
stdscr.clear()
height, width = stdscr.getmaxyx()
stdscr.addstr(0, 0, prefix[: width - 2], curses.color_pair(2))
topic_showing = (abbreviate_topic(topic_name) if len(topic_name) > width else topic_name)[
: width - 2
]
stdscr.addstr(1, 0, topic_showing, curses.color_pair(1))
tree_lines = list(chain.from_iterable(tree.to_lines(show_comment) + [""] for tree in trees))
tree_lines = wrap_lines(tree_lines, width, height - 2)
for i, line in enumerate(tree_lines):
stdscr.addstr(i + 2, 1, line)
stdscr.addstr(height - 1, 0, "'q' => quit. 'c' => show comment. 'y' => copy."[: width - 2])
stdscr.refresh()

return "".join([line + "\n" for line in tree_lines])
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import curses
from typing import List
from typing import Optional

from .utils import abbreviate_topic
from .utils import wrap_topic_name


def select_topic(stdscr: curses.window, topics: List[str]) -> Optional[str]:
curses.curs_set(0) # Hide the cursor
curses.start_color() # Enable color support
curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_WHITE) # Define color pair
curses.init_pair(2, curses.COLOR_RED, curses.COLOR_BLACK) # Define red color pair

current_topic = 0
start_index = 0
max_topics = 8

while True:
stdscr.clear()
height, width = stdscr.getmaxyx()

# Check if the terminal window is too small
if (
width < max(len(abbreviate_topic(topic)) for topic in topics) + 2
or height < max_topics + 2
):
error_msg = "Terminal window too small. Please resize."
stdscr.addstr(height // 2, width // 2 - len(error_msg) // 2, error_msg)
stdscr.refresh()
key = stdscr.getch()
if key in [ord("q"), ord("Q")]:
return None
continue

# Display the full selected topic in red at the top, with wrapping if necessary
full_topic = topics[current_topic]
lines = wrap_topic_name(full_topic, width - 2)

for i, line in enumerate(lines):
stdscr.attron(curses.color_pair(2))
stdscr.addstr(i, 1, line)
stdscr.attroff(curses.color_pair(2))

# Display the topics
for idx in range(start_index, min(start_index + max_topics, len(topics))):
abbreviated_option = abbreviate_topic(topics[idx])[: width - 2] # Truncate if necessary
x = width // 2 - len(abbreviated_option) // 2
y = height // 2 - max_topics // 2 + idx - start_index + len(lines)
if idx == current_topic:
stdscr.attron(curses.color_pair(1))
stdscr.addstr(y, x, abbreviated_option)
stdscr.attroff(curses.color_pair(1))
else:
stdscr.addstr(y, x, abbreviated_option)

# Display navigation buttons if needed
if start_index + max_topics < len(topics):
string = "Next>"
stdscr.addstr(height - 1, width - len(string) - 1, string)
if start_index > 0:
string = "<Prev"
stdscr.addstr(height - 1, 0, string)

stdscr.refresh()

# Handle user input
key = stdscr.getch()
if key == curses.KEY_UP and current_topic > 0:
current_topic -= 1
if current_topic < start_index:
start_index -= 1
elif key == curses.KEY_DOWN and current_topic < len(topics) - 1:
current_topic += 1
if current_topic >= start_index + max_topics:
start_index += 1
elif key in [curses.KEY_ENTER, 10, 13]:
return topics[current_topic]
elif key == curses.KEY_RIGHT and start_index + max_topics < len(topics):
start_index += max_topics
current_topic = start_index
elif key == curses.KEY_LEFT and start_index > 0:
start_index -= max_topics
current_topic = start_index
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from typing import Dict

from tier4_debug_msgs.msg import ProcessingTimeTree as ProcessingTimeTreeMsg


class ProcessingTimeTree:
def __init__(
self,
name: str = "",
processing_time: float = 0.0,
comment: str = "",
id: int = 1, # noqa
parent_id: int = 0,
):
self.name = name
self.processing_time = processing_time
self.comment = comment
self.id = id
self.parent_id = parent_id
self.children = []

@classmethod
def from_msg(cls, msg: ProcessingTimeTreeMsg) -> "ProcessingTimeTree":
# Create a dictionary to map node IDs to ProcessingTimeTree objects
node_dict: Dict[int, ProcessingTimeTree] = {
node.id: ProcessingTimeTree(
node.name, node.processing_time, node.comment, node.id, node.parent_id
)
for node in msg.nodes
}

# Build the tree structure
root = node_dict[1]
for node in list(node_dict.values()):
parent = node_dict.get(node.parent_id)
if parent:
parent.children.append(node)

return root

def to_lines(self, show_comment: bool = True) -> str:
def construct_string(
node: "ProcessingTimeTree",
lines: list,
prefix: str,
is_last: bool,
is_root: bool,
) -> None:
# If not the root, append the prefix and the node information
line = ""
if not is_root:
line += prefix + ("└── " if is_last else "├── ")
line += f"{node.name}: {node.processing_time:.2f} [ms]"
line += f": {node.comment}" if show_comment and node.comment else ""
lines.append(line)
# Recur for each child node
for i, child in enumerate(node.children):
construct_string(
child,
lines,
prefix + (" " if is_last else "│ "),
i == len(node.children) - 1,
False,
)

lines = []
# Start the recursive string construction with the root node
construct_string(self, lines, "", True, True)
return lines

def __str__(self) -> str:
return "".join([line + "\n" for line in self.to_lines()])

def __eq__(self, other: "ProcessingTimeTree") -> bool:
return self.name == other.name
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import curses
from typing import List


def abbreviate_topic(topic: str) -> str:
parts = topic.split("/")
abbreviated_parts = [part[0] if len(part) > 1 else part for part in parts[:-1]]
return "/".join(abbreviated_parts + [parts[-1]])


def wrap_topic_name(text: str, width: int) -> List[str]:
lines = []
while len(text) > width:
split_point = text.rfind("/", 0, width)
if split_point == -1:
split_point = width
lines.append(text[:split_point])
text = text[split_point:]
lines.append(text)
return lines


def wrap_lines(lines, width, height):
return [line[:width] for line in lines][:height]


def exit_curses():
curses.echo()
curses.nocbreak()
curses.endwin()


def init_curses() -> curses.window:
stdscr = curses.initscr()
stdscr.nodelay(True)
curses.noecho()
curses.cbreak()
stdscr.keypad(True)
curses.mousemask(curses.ALL_MOUSE_EVENTS)
curses.start_color()
curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_WHITE)
return stdscr
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Loading