Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Command chaining #103

Merged
merged 4 commits into from
Aug 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions tests/end2end/features/command/chaining.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
Feature: Chain commands together.

Background:
Given I open a directory with 5 paths

Scenario: Chain two commands together.
When I run scroll down && scroll down
Then the library row should be 3

Scenario: Chain three commands together.
When I run scroll down && scroll down && scroll down
Then the library row should be 4

Scenario: Chain commands with count together.
When I run 2scroll down && scroll down
Then the library row should be 4

Scenario: Fail first of two chained commands
When I run something wrong && scroll down
Then the library row should be 1
And the message
'something: unknown command for mode library'
should be displayed

Scenario: Fail second of two chained commands
When I run scroll down && something wrong
Then the library row should be 2
And the message
'something: unknown command for mode library'
should be displayed

Scenario: Fail second of three chained commands
When I run scroll down && something wrong && scroll down
Then the library row should be 2
And the message
'something: unknown command for mode library'
should be displayed

Scenario: Run an alias of chained commands
When I run alias double_trouble scroll down \&\& scroll down
And I run double_trouble
# Run twice as running once also works if the alias is only aliased to scroll
# down and the first scroll down is executed right after the alias command
And I run double_trouble
Then the library row should be 5

Scenario: Run a chain of aliases where each alias consists of a chain of commands
When I run alias double_trouble scroll down \&\& scroll down
And I run alias reverse_double_trouble scroll up \&\& scroll up
And I run double_trouble && double_trouble && reverse_double_trouble
Then the library row should be 3
10 changes: 10 additions & 0 deletions tests/end2end/features/command/test_chaining_bdd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# vim: ft=python fileencoding=utf-8 sw=4 et sts=4

# This file is part of vimiv.
# Copyright 2017-2019 Christian Karl (karlch) <karlch at protonmail dot com>
# License: GNU GPL v3, see the "LICENSE" and "AUTHORS" files for details.

import pytest_bdd as bdd


bdd.scenarios("chaining.feature")
17 changes: 17 additions & 0 deletions tests/unit/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,23 @@ def test_flatten():
assert utils.flatten(list_of_lists) == [1, 2, 3, 4]


def test_recursive_split():
def updater(text):
"""Return a text containing two numbers decremented by one, break at 0.

This results in doubling the number of numbers in every iteration and
decrementing all numbers by one. Eventually 2**(N - 1) zeros are left.
"""
number = int(text) - 1
if number > 0:
return f"{number}.{number}"
return "0"

for number in range(1, 6):
expected = ["0"] * 2 ** (number - 1)
assert utils.recursive_split(str(number), ".", updater) == expected


def test_remove_prefix():
assert utils.remove_prefix("start hello", "start") == " hello"

Expand Down
1 change: 1 addition & 0 deletions vimiv/commands/aliases.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(self):
# Add defaults
self[api.modes.GLOBAL]["q"] = "quit"
self[api.modes.IMAGE]["w"] = "write"
self[api.modes.IMAGE]["wq"] = "write && quit"

def get(self, mode):
"""Return all aliases for one mode."""
Expand Down
70 changes: 55 additions & 15 deletions vimiv/commands/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""Classes and functions to run commands.

Module Attributes:
SEPARATOR: String used to separate chained commands.
external: ExternalRunner instance to run shell commands.

_last_command: Dictionary storing the last command for each mode.
Expand All @@ -17,14 +18,16 @@
import re
import shlex
import subprocess
from typing import Dict, List, NamedTuple, Optional
from typing import Dict, List, NamedTuple, Optional, Callable

from PyQt5.QtCore import QRunnable, QObject, QThreadPool, pyqtSignal

from vimiv import api, utils
from vimiv.commands import aliases


SEPARATOR = "&&"

_last_command: Dict[api.modes.Mode, "LastCommand"] = {}


Expand All @@ -36,35 +39,69 @@ class LastCommand(NamedTuple):
Arguments: List[str]


class CommandPartFailed(Exception):
"""Raised if a command part fails, e.g. due to the command being unknown."""


def text_non_whitespace(func: Callable[..., None]):
"""Decorator to only run function if text argument is more than plain whitespace."""

def inner(text: str, *args, **kwargs) -> None:
text = text.strip()
if not text:
return None
return func(text, *args, **kwargs)

return inner


@text_non_whitespace
def run(text, count=None, mode=None):
"""Run a (chain of) command(s).

The text to run is split at SEPARATOR and each part is handled individually by
_run_single. If one part fails, the remaining parts are not executed.

Args:
text: Complete text given to command line or keybinding.
count: Count given if any.
mode: Mode to run the command in.
"""
logging.debug("%s: Running '%s'", __name__, text)
# Expand percent here as it only needs to be done once and is rather expensive
text = expand_percent(text, mode)
logging.debug("%s: Expanded text to '%s'", __name__, text)
# Split text parts recursively updating aliases in the individual parts

def replace_aliases(text):
return text if SEPARATOR in text else alias(text.strip(), mode)

textparts = utils.recursive_split(text, SEPARATOR, replace_aliases)
logging.debug("%s: Split text into parts '%s'", __name__, textparts)
try:
for i, cmdpart in enumerate(textparts):
logging.debug("%s: Handling part %d '%s'", __name__, i, cmdpart)
_run_single(cmdpart, count, mode)
except CommandPartFailed:
logging.debug("%s: Stopping at %d as '%s' failed", __name__, i, cmdpart)


@text_non_whitespace
def _run_single(text, count=None, mode=None):
"""Run either external or internal command.

Args:
text: Complete text given to command line or keybinding.
count: Count given if any.
mode: Mode to run the command in.
"""
text = text.strip()
if not text:
return
text = _update_command(text, mode=mode)
if text.startswith("!"):
external(text.lstrip("!"))
else:
count = str(count) if count is not None else ""
command(count + text, mode)


def _update_command(text, mode):
"""Update command with aliases and percent wildcard.

Args:
text: String passed as command.
mode: Mode in which the command is supposed to run.
"""
return expand_percent(alias(text, mode), mode)


def command(text, mode=None):
"""Run internal command when called.

Expand Down Expand Up @@ -119,10 +156,13 @@ def _run_command(count, cmdname, args, mode):
api.status.update()
except api.commands.CommandNotFound as e:
logging.error(str(e))
raise CommandPartFailed from e
except (api.commands.ArgumentError, api.commands.CommandError) as e:
logging.error("%s: %s", cmdname, str(e))
raise CommandPartFailed from e
except api.commands.CommandWarning as w:
logging.warning("%s: %s", cmdname, str(w))
raise CommandPartFailed from w


def _parse(text):
Expand Down
15 changes: 15 additions & 0 deletions vimiv/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,21 @@ def flatten(list_of_lists: List[List[Any]]) -> List[Any]:
return [elem for sublist in list_of_lists for elem in sublist]


def recursive_split(
text: str, separator: str, updater: Callable[[str], str]
) -> List[str]:
"""Recursively split a string at separator applying an update callable.

The string is split into parts and the update callable is applied to each part. The
function is then called again on the updated text until no more separators remain.
"""
splits = updater(text).split(separator)
if len(splits) == 1: # Updater did not add any new separators
return splits
nested = [recursive_split(part, separator, updater) for part in splits]
return flatten(nested)


def remove_prefix(text: str, prefix: str) -> str:
"""Remove a prefix of a given string."""
if text.startswith(prefix):
Expand Down