Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions news/382.deps.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Removed `crashtest` dependency and vendored part of it into `cleo`
13 changes: 1 addition & 12 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ classifiers = [

[tool.poetry.dependencies]
python = "^3.8"
crashtest = "^0.4.1"
rapidfuzz = "^3.0.0"

[tool.poetry.group.dev.dependencies]
Expand Down
17 changes: 2 additions & 15 deletions src/cleo/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@


if TYPE_CHECKING:
from crashtest.solution_providers.solution_provider_repository import (
SolutionProviderRepository,
)

from cleo.commands.command import Command
from cleo.events.event_dispatcher import EventDispatcher
from cleo.io.inputs.input import Input
Expand Down Expand Up @@ -78,8 +74,6 @@ def __init__(self, name: str = "console", version: str = "") -> None:

self._command_loader: CommandLoader | None = None

self._solution_provider_repository: SolutionProviderRepository | None = None

@property
def name(self) -> str:
return self._name
Expand Down Expand Up @@ -170,11 +164,6 @@ def catch_exceptions(self, catch_exceptions: bool = True) -> None:
def is_single_command(self) -> bool:
return self._single_command

def set_solution_provider_repository(
self, solution_provider_repository: SolutionProviderRepository
) -> None:
self._solution_provider_repository = solution_provider_repository

def add(self, command: Command) -> Command | None:
self._init()

Expand Down Expand Up @@ -493,11 +482,9 @@ def create_io(
return IO(input, output, error_output)

def render_error(self, error: Exception, io: IO) -> None:
from cleo.ui.exception_trace import ExceptionTrace
from cleo.ui.exception_trace.component import ExceptionTrace

trace = ExceptionTrace(
error, solution_provider_repository=self._solution_provider_repository
)
trace = ExceptionTrace(error)
simple = not io.is_verbose() or isinstance(error, CleoUserError)
trace.render(io.error_output, simple)

Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,18 @@
import sys
import tokenize

from pathlib import Path
from typing import TYPE_CHECKING
from typing import ClassVar

from crashtest.frame_collection import FrameCollection

from cleo.formatters.formatter import Formatter
from cleo.ui.exception_trace.frame_collection import FrameCollection


if TYPE_CHECKING:
from crashtest.frame import Frame
from crashtest.solution_providers.solution_provider_repository import (
SolutionProviderRepository,
)

from cleo.io.io import IO
from cleo.io.outputs.output import Output
from cleo.ui.exception_trace.frame import Frame


class Highlighter:
Expand Down Expand Up @@ -231,10 +227,8 @@ class ExceptionTrace:
def __init__(
self,
exception: Exception,
solution_provider_repository: SolutionProviderRepository | None = None,
) -> None:
self._exception = exception
self._solution_provider_repository = solution_provider_repository
self._exc_info = sys.exc_info()
self._ignore: str | None = None

Expand All @@ -252,10 +246,8 @@ def render(self, io: IO | Output, simple: bool = False) -> None:
else:
self._render_exception(io, self._exception)

self._render_solution(io, self._exception)

def _render_exception(self, io: IO | Output, exception: BaseException) -> None:
from crashtest.inspector import Inspector
from cleo.ui.exception_trace.inspector import Inspector

inspector = Inspector(exception)
if not inspector.frames:
Expand Down Expand Up @@ -297,31 +289,6 @@ def _render_snippet(self, io: IO | Output, frame: Frame) -> None:
for code_line in code_lines:
self._render_line(io, code_line, indent=4)

def _render_solution(self, io: IO | Output, exception: Exception) -> None:
if self._solution_provider_repository is None:
return

solutions = self._solution_provider_repository.get_solutions_for_exception(
exception
)
symbol = "•" if io.supports_utf8() else "*"

for solution in solutions:
title = solution.solution_title
description = solution.solution_description
links = solution.documentation_links

description = description.replace("\n", "\n ").strip(" ")

joined_links = ",".join(f"\n <fg=blue>{link}</>" for link in links)
self._render_line(
io,
f"<fg=blue;options=bold>{symbol} </>"
f"<fg=default;options=bold>{title.rstrip('.')}</>:"
f" {description}{joined_links}",
True,
)

def _render_trace(self, io: IO | Output, frames: FrameCollection) -> None:
stack_frames = FrameCollection()
for frame in frames:
Expand All @@ -341,7 +308,7 @@ def _render_trace(self, io: IO | Output, frames: FrameCollection) -> None:
frame_collections = stack_frames.compact()
i = remaining_frames_length
for collection in frame_collections:
if collection.is_repeated():
if collection.is_repeated:
if len(collection) > 1:
frames_message = f"<fg=yellow>{len(collection)}</> frames"
else:
Expand All @@ -359,7 +326,7 @@ def _render_trace(self, io: IO | Output, frames: FrameCollection) -> None:

for frame in collection:
relative_file_path = self._get_relative_file_path(frame.filename)
relative_file_path_parts = relative_file_path.split(os.path.sep)
relative_file_path_parts = relative_file_path.split(os.sep)
relative_file_path = (
f"<fg=default;options=dark>{Formatter.escape(os.sep)}</>".join(
relative_file_path_parts[:-1]
Expand Down Expand Up @@ -411,22 +378,21 @@ def _render_trace(self, io: IO | Output, frames: FrameCollection) -> None:

i -= 1

@staticmethod
def _render_line(
self, io: IO | Output, line: str, new_line: bool = False, indent: int = 2
io: IO | Output, line: str, new_line: bool = False, indent: int = 2
) -> None:
if new_line:
io.write_line("")

io.write_line(f"{indent * ' '}{line}")

def _get_relative_file_path(self, filepath: str) -> str:
cwd = os.getcwd()

if cwd:
filepath = filepath.replace(cwd + os.path.sep, "")
@staticmethod
def _get_relative_file_path(filepath: str) -> str:
if cwd := Path.cwd():
filepath = filepath.replace(f"{cwd}{os.sep}", "")

home = os.path.expanduser("~")
if home:
filepath = filepath.replace(home + os.path.sep, "~" + os.path.sep)
if home := Path("~").expanduser():
filepath = filepath.replace(f"{home}{os.sep}", f"~{os.sep}")

return filepath
81 changes: 81 additions & 0 deletions src/cleo/ui/exception_trace/frame.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from __future__ import annotations

import operator

from functools import reduce
from pathlib import Path
from typing import TYPE_CHECKING
from typing import ClassVar


if TYPE_CHECKING:
import inspect

from types import FrameType


class Frame:
_content_cache: ClassVar[dict[str, str]] = {}

def __init__(self, frame_info: inspect.FrameInfo) -> None:
self._frame = frame_info.frame
self._frame_info = frame_info
self._lineno = frame_info.lineno
self._filename = frame_info.filename
self._function = frame_info.function
self._lines = None
self._file_content: str | None = None

@property
def frame(self) -> FrameType:
return self._frame

@property
def lineno(self) -> int:
return self._lineno

@property
def filename(self) -> str:
return self._filename

@property
def function(self) -> str:
return self._function

@property
def line(self) -> str:
if not self._frame_info.code_context:
return ""

return self._frame_info.code_context[0]

@property
def _key(self) -> tuple[str, str, int]:
return self._filename, self._function, self._lineno

@property
def file_content(self) -> str:
if self._file_content is not None:
return self._file_content
if not self._filename:
self._file_content = ""
return ""
if self._filename not in type(self)._content_cache:
try:
file_content = Path(self._filename).read_text()
except OSError:
file_content = ""
type(self)._content_cache[self._filename] = file_content
self._file_content = type(self)._content_cache[self._filename]
return self._file_content

def __hash__(self) -> int:
return reduce(operator.xor, map(hash, self._key))
Copy link
Contributor

@dimbleby dimbleby Nov 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simply return hash(self._key) was what I had intended, apologies if not clear

this in general is a better hash eg hash((1, 2)) is not the same as hash((2, 1)); and also it is just simpler


def __eq__(self, other: object) -> bool:
if not isinstance(other, Frame):
return NotImplemented
return self._key == other._key

def __repr__(self) -> str:
return f"<Frame {self._filename}, {self._function}, {self._lineno}>"
74 changes: 74 additions & 0 deletions src/cleo/ui/exception_trace/frame_collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from __future__ import annotations

from typing import List

from cleo.ui.exception_trace.frame import Frame


class FrameCollection(List[Frame]):
def __init__(self, frames: list[Frame] | None = None, count: int = 0) -> None:
if frames is None:
frames = []

super().__init__(frames)

self._count = count

@property
def repetitions(self) -> int:
return self._count - 1

@property
def is_repeated(self) -> bool:
return self._count > 1

def increment_count(self, increment: int = 1) -> FrameCollection:
self._count += increment

return self

def compact(self) -> list[FrameCollection]:
"""
Compacts the frames to deduplicate recursive calls.
"""
collections = []
current_collection = FrameCollection()

i = 0
while i < len(self) - 1:
frame = self[i]
if frame in self[i + 1 :]:
duplicate_indices = []
for sub_index, sub_frame in enumerate(self[i + 1 :]):
if frame == sub_frame:
duplicate_indices.append(sub_index + i + 1)

found_duplicate = False
for duplicate_index in duplicate_indices:
collection = FrameCollection(self[i:duplicate_index])
if collection == current_collection:
current_collection.increment_count()
i = duplicate_index
found_duplicate = True
break

if found_duplicate:
continue

collections.append(current_collection)
current_collection = FrameCollection(self[i : duplicate_indices[0]])

i = duplicate_indices[0]

continue

if current_collection.is_repeated:
collections.append(current_collection)
current_collection = FrameCollection()

current_collection.append(frame)
i += 1

collections.append(current_collection)

return collections
Loading