Skip to content

Commit

Permalink
Refactor clipboard classes
Browse files Browse the repository at this point in the history
  • Loading branch information
exquo committed Jun 8, 2024
1 parent b013e5e commit 6a2f833
Showing 1 changed file with 153 additions and 131 deletions.
284 changes: 153 additions & 131 deletions scli
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import atexit
import base64
import bisect
import collections
import errno
import hashlib
import importlib
import json
Expand All @@ -21,6 +20,7 @@ import subprocess
import sys
import tempfile
import textwrap
import urllib
from abc import ABC, abstractmethod
from contextlib import contextmanager, suppress
from datetime import datetime, timezone
Expand Down Expand Up @@ -408,136 +408,167 @@ def hex_str_to_b64(hex_str):
# clipboard
# #############################################################################

TEMPFILE_PREFIX = '_scli-tmp.'

class ClipBase(ABC):
mime_order = ['image/png', 'image/jpeg', 'image/jpg', 'text/uri-list']
class ClipGetBase(ABC):

@staticmethod
def get_installed_clipb_manager():
for executable in ('wl-paste', 'xclip'):
if shutil.which(executable) is not None:
return executable
return None
@abstractmethod
def files_list(self):
"""Return a list of files in clipboard."""

_get_types_arg = None
_get_files_arg = None
_put_cmd = None
def _proc_run(self, *args, **kwargs):
"""proc_run() with capture_output by default"""
kwargs.setdefault("capture_output", True)
return proc_run(*args, **kwargs)

@abstractmethod
def _run_cmd(self, args):
pass

@abstractmethod
def _get_args(self, mime):
pass
class ClipGetCmd(ClipGetBase):

def _run(self, args):
try:
proc = subprocess.run(
self._run_cmd(args),
capture_output=True,
check=True,
)
except (OSError, subprocess.CalledProcessError):
return None
return proc.stdout
def __init__(self, get_cmd):
self._get_cmd = get_cmd

def _get(self, mime):
return self._run(self._get_args(mime))
def files_list(self):
return self._proc_run(self._get_cmd).stdout.splitlines()

def _run_lines(self, args):
out = self._run(args)
if out:
return out.decode('utf-8').splitlines()
return None

def _get_types(self):
return self._run_lines(self._get_types_arg)

def _get_files(self):
return [f for f in self._run_lines(self._get_files_arg) if f]

def _clip_files(self):
out = self._get_types()
if out is None:
return out
for otype in out:
for mtype in self.mime_order:
if mtype == otype:
if mtype.startswith('image/'):
content = self._get(mtype)
suffix = '.' + mtype.split('/')[1]
if cfg.save_history:
clip_file_path = (
SCLI_ATTACHMENT_DIR
/
f"clipboard_{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}{suffix}"
)
clip_file = open(clip_file_path, 'w+b')
else:
clip_file = tempfile.NamedTemporaryFile(
mode='w+b',
prefix=TEMPFILE_PREFIX,
suffix=suffix,
delete=False,
)
with clip_file:
clip_file.write(content)
return [clip_file.name]
elif mtype == 'text/uri-list':
content = self._get_files()
return [x.replace('file://', '') for x in content]
return None
class ClipGetBlank(ClipGetBase):
def files_list(self):
return []

def files(self):
cmd = cfg.clipboard_get_command
if cmd is None:
return self._clip_files()
return proc_run(cmd, capture_output=True).stdout.splitlines()

def _put(self, txt):
if not txt:
return
try:
proc = subprocess.Popen(
self._put_cmd,
stdin=subprocess.PIPE,
text=True,
class ClipGetTargets(ClipGetBase):

_MIME_TARGETS = (
"image/png",
"image/jpeg",
"image/webp",
"image/gif",
"audio/aac",
"audio/mpeg",
"video/mp4",
)

_WRITE_FILE_PREFIX = "clipb_"

def __init__(
self,
get_target_cmd,
get_targets_list_cmd,
write_dir,
):
self._get_target_cmd = get_target_cmd
self._get_targets_list_cmd = get_targets_list_cmd
self._write_dir = write_dir

def _list_available_targets(self):
return self._proc_run(
self._get_targets_list_cmd,
).stdout.splitlines()

def _get_target_val(self, target_name, **subprocess_kwargs):
return self._proc_run(
f"{self._get_target_cmd} {target_name}",
**subprocess_kwargs,
).stdout

@staticmethod
def _parse_file_uri(file_uri):
return urllib.parse.unquote(
file_uri[len("file://"):]
)

def _write_target_content(self, target):
suffix = target.partition('/')[-1]
datetime_str = datetime.now().strftime('%Y-%m-%d_%H-%M-%S.%fZ')
file_path = Path(
self._write_dir,
f"{self._WRITE_FILE_PREFIX}{datetime_str}.{suffix}",
)
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, 'wb') as outf:
self._get_target_val(
target,
text=False,
capture_output=False,
stdout=outf,
)
except OSError:
return
else:
with proc:
proc.stdin.write(txt)
return str(file_path)

def files_list(self):
targets_avail = self._list_available_targets()
if "text/uri-list" in targets_avail:
uri_list = self._get_target_val("text/uri-list")
return [self._parse_file_uri(line) for line in uri_list.splitlines()]
for target in self._MIME_TARGETS:
if target in targets_avail:
return [self._write_target_content(target)]
return []


def put(self, txt):
cmd = cfg.clipboard_put_command
if cmd is None:
return self._put(txt)
return proc_run(cmd, {'%s': txt})
class ClipPut:

def __init__(self, put_cmd):
self._put_cmd = put_cmd

def put(self, text):
proc_run(
self._put_cmd,
input=text,
)


class ClipPutBlank:
def put(self, text):
pass

class Xclip(ClipBase):
_get_types_arg = 'TARGETS'
_get_files_arg = 'text/uri-list'
_put_cmd = ['xclip', '-selection', 'clipboard']

def _run_cmd(self, args):
return ['xclip', '-selection', 'clipboard', '-t', args, '-o']
class Clip:

def _get_args(self, mime):
return mime
def __init__(self):
if cfg.clipboard_put_command:
put_cmd = cfg.clipboard_put_command
else:
if shutil.which("wl-copy"):
put_cmd = "wl-copy"
elif shutil.which("xclip"):
put_cmd = "xclip -i -sel c"
else:
put_cmd = None
logger.warning("No clipboard copy command found; disabling copying to clipboard.")
self._clip_put = ClipPut(put_cmd) if put_cmd else ClipPutBlank()

class WLclip(ClipBase):
_get_types_arg = ['-l']
_get_files_arg = ['-t', 'text/uri-list']
_put_cmd = ['wl-copy']
if cfg.clipboard_get_command:
self._clip_get = ClipGetCmd(cfg.clipboard_get_command)
else:
if cfg.save_history:
write_dir = SCLI_ATTACHMENT_DIR
Path(write_dir).mkdir(parents=True, exist_ok=True)
else:
write_dir = tempfile.mkdtemp(prefix="scli_")
atexit.register(
shutil.rmtree,
write_dir,
)
if shutil.which("wl-paste"):
self._clip_get = ClipGetTargets(
get_target_cmd="wl-paste -t",
get_targets_list_cmd="wl-paste -l",
write_dir=write_dir,
)
elif shutil.which("xclip"):
self._clip_get = ClipGetTargets(
get_target_cmd="xclip -o -sel c -t",
get_targets_list_cmd="xclip -o -sel c -t TARGETS",
write_dir=write_dir,
)
else:
self._clip_get = ClipGetBlank()
logger.warning("No clipboard paste command found; disabling querying clipboard for files.")

def _run_cmd(self, args):
return ['wl-paste', *args]
def files_list(self):
return self._clip_get.files_list()

def _get_args(self, mime):
return ['-t', mime]
def put(self, text):
return self._clip_put.put(text)


# #############################################################################
Expand Down Expand Up @@ -1041,7 +1072,7 @@ class Daemon(AsyncContext):
if attachments is None:
attachments = []
if not all(os.path.exists(attch) for attch in attachments):
logger.error('send_message: Attached file(s) does not exist: %s', attachments)
logger.error('send_message: Attached file(s) do not exist: %s', attachments)
return

timestamp = get_current_timestamp_ms()
Expand All @@ -1057,13 +1088,6 @@ class Daemon(AsyncContext):
}

def after_send_proc_returns(proc):
# Remove temproary attachments
for attachment in envelope['dataMessage']['attachments']:
if attachment.startswith(
os.path.join(tempfile.gettempdir(), TEMPFILE_PREFIX)
):
os.remove(attachment)

# Check if send command succeeded
self._parse_send_proc_output(proc, envelope, 'sending_done')

Expand Down Expand Up @@ -4577,7 +4601,7 @@ class Actions:
self._contacts = contacts
self._chats_data = chats_data
self._urwid_ui = urwid_ui
self._clip = WLclip() if ClipBase.get_installed_clipb_manager() == 'wl-paste' else Xclip()
self._clip = Clip()

def reload(self, callback=None, **callback_kwargs):
if self._daemon.is_dbus_service_running:
Expand Down Expand Up @@ -4704,11 +4728,11 @@ class Actions:
self.send_message_curr_contact(string, attachments=files)

def attach_clip(self, *message):
files = self._clip.files()
files = self._clip.files_list()
if files:
self.send_message_curr_contact(*message, attachments=files)
else:
self.set_status_line('Clipboard is empty.')
self.set_status_line('No files in clipboard.')

def copy_to_clipb(self, text):
self._clip.put(text)
Expand Down Expand Up @@ -5709,13 +5733,13 @@ def make_arg_parser():
parser.add_argument(
'-G',
'--clipboard-get-command',
help='Command used by `:attachClip` to get a list of files to send as attachments. Should return one absolute file path per line. If not set, `xclip` or `wl-clipboard` is used.',
help='Command used by `:attachClip` to get the list of files to send as message attachments. Should return absolute file paths separated by newline characters. If not set, %(prog)s checks if `wl-clipboard` or `xclip` are installed.',
)

parser.add_argument(
'-P',
'--clipboard-put-command',
help='Command to put text on clipboard. %%s will be replaced with the text. If not set, `xclip` or `wl-clipboard` is used.',
help="Command used to copy text to clipboard. Text will be sent to command's stdin. If not set, %(prog)s checks if `wl-clipboard` or `xclip` are installed. (example: xsel -ib)",
)

parser.add_argument(
Expand Down Expand Up @@ -6043,8 +6067,12 @@ def parse_args():
if args.save_history:
args.save_history = os.path.expanduser(args.save_history)
args.wrap_at = parse_wrap_at_arg(args.wrap_at)
if args.group_contacts:
print("Warning: `--group-contacts` option is deprecated; use `--partition-constants` instead.")
args.partition_contacts = args.partition_contacts or args.group_contacts
del args.__dict__['group_contacts']
if args.clipboard_put_command and "%s" in args.clipboard_put_command:
print("Warning: `--clipboard-put-command` does not use replacement tokens ('%s'). See `--help`.")
return args, modified_args


Expand Down Expand Up @@ -6216,12 +6244,6 @@ def detect_username():


def main():
try:
os.makedirs(SCLI_ATTACHMENT_DIR)
except OSError as exc:
if not (exc.errno == errno.EEXIST and os.path.isdir(SCLI_DATA_DIR)):
sys.exit("ERROR: Could not create a directory in " + SCLI_DATA_DIR)

args, modified_args = parse_args()
cfg.set(args)

Expand Down

0 comments on commit 6a2f833

Please sign in to comment.