Skip to content

Commit

Permalink
#3260 refactor compression functions
Browse files Browse the repository at this point in the history
  • Loading branch information
totaam committed Sep 8, 2021
1 parent 98bd331 commit 6cec577
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 80 deletions.
21 changes: 11 additions & 10 deletions xpra/client/client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,17 +486,18 @@ def make_hello(self):
capabilities["aliases"] = reverse_aliases
return capabilities

def compressed_wrapper(self, datatype, data, level=5):
def compressed_wrapper(self, datatype, data, level=5, **kwargs):
if level>0 and len(data)>=256:
#ugly assumptions here, should pass by name
zlib = "zlib" in self.server_compressors
lz4 = "lz4" in self.server_compressors
#never use brotli as a generic compressor
#brotli = "brotli" in self.server_compressors and compression.use_brotli
cw = compression.compressed_wrapper(datatype, data, level=level,
zlib=zlib, lz4=lz4,
brotli=False, none=True,
can_inline=False)
kw = {}
#brotli is not enabled by default as a generic compressor
#but callers may choose to enable it via kwargs:
for algo, defval in {
"zlib" : True,
"lz4" : True,
"brotli" : False,
}.items():
kw[algo] = algo in self.server_compressors and compression.use(algo) and kwargs.get(algo, defval)
cw = compression.compressed_wrapper(datatype, data, level=level, can_inline=False, **kw)
if len(cw)<len(data):
#the compressed version is smaller, use it:
return cw
Expand Down
42 changes: 25 additions & 17 deletions xpra/client/mixins/clipboard.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# This file is part of Xpra.
# Copyright (C) 2010-2020 Antoine Martin <antoine@xpra.org>
# Copyright (C) 2010-2021 Antoine Martin <antoine@xpra.org>
# Copyright (C) 2008, 2010 Nathaniel Smith <njs@pobox.com>
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.

from xpra.client.mixins.stub_client_mixin import StubClientMixin
from xpra.platform.features import CLIPBOARD_WANT_TARGETS, CLIPBOARD_GREEDY, CLIPBOARD_PREFERRED_TARGETS, CLIPBOARDS
from xpra.platform.gui import get_clipboard_native_class
from xpra.net import compression
from xpra.scripts.config import FALSE_OPTIONS, TRUE_OPTIONS
from xpra.util import flatten_dict, typedict
from xpra.os_util import bytestostr, is_Wayland
Expand Down Expand Up @@ -278,24 +279,14 @@ def clipboard_send(*parts):
if not self.clipboard_enabled:
log("clipboard is disabled, not sending clipboard packet")
return
#handle clipboard compression if needed:
from xpra.net.compression import Compressible
#replaces 'Compressible' items in a packet
#with a subclass that calls self.compressed_wrapper
#and which can therefore enable the brotli compressor:
packet = list(parts)
for v in packet:
if isinstance(v, Compressible):
register_clipboard_compress_cb(v)
for i, v in enumerate(packet):
if isinstance(v, compression.Compressible):
packet[i] = self.compressible_item(v)
self.send_now(*packet)
def register_clipboard_compress_cb(compressible):
#register the compressor which will fire in protocol.encode:
def compress_clipboard():
log("compress_clipboard() compressing %s, server compressors=%s",
compressible, self.server_compressors)
from xpra.net import compression
if "brotli" in self.server_compressors and compression.use("brotli"):
return compression.compressed_wrapper(compressible.datatype, compressible.data,
level=9, brotli=True, can_inline=False)
return self.compressed_wrapper(compressible.datatype, compressible.data)
compressible.compress = compress_clipboard
def clipboard_progress(local_requests, remote_requests):
log("clipboard_progress(%s, %s)", local_requests, remote_requests)
if local_requests is not None:
Expand All @@ -308,5 +299,22 @@ def clipboard_progress(local_requests, remote_requests):
hc.set_preferred_targets(self.server_clipboard_preferred_targets)
return hc

def compressible_item(self, compressible):
"""
converts a 'Compressible' item into something that will
call `self.compressed_wrapper` when compression is requested
by the network encode thread.
"""
client = self
class ProtocolCompressible(compression.Compressible):
__slots__ = ()
def compress(self):
log.error("compress() 1")
v = client.compressed_wrapper(self.datatype, self.data,
level=9, can_inline=False, brotli=True)
log.error("compress() 2")
return v
return ProtocolCompressible(compressible.datatype, compressible.data)

def clipboard_notify(self, n):
log("clipboard_notify(%i)", n)
10 changes: 7 additions & 3 deletions xpra/client/mixins/remote_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,14 @@ def local_warn(*args):
try:
dtime = int(1000*(monotonic_time() - self.monotonic_start_time))
if args:
s = msg % args
data = msg % args
else:
s = msg
data = self.compressed_wrapper("text", s, level=1)
data = msg
if len(data)>=32:
try:
data = self.compressed_wrapper("text", data.encode("utf8"), level=1)
except Exception:
pass
self.send("logging", level, data, dtime)
exc_info = kwargs.get("exc_info")
if exc_info is True:
Expand Down
2 changes: 1 addition & 1 deletion xpra/client/mixins/stub_client_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def process_ui_capabilities(self, caps : typedict):
This runs in the UI thread.
"""

def compressed_wrapper(self, datatype, data, level=5):
def compressed_wrapper(self, datatype, data, level=5, **kwargs):
"""
Dummy utility method for compressing data.
Actual client implementations will provide compression
Expand Down
6 changes: 5 additions & 1 deletion xpra/clipboard/clipboard_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,11 @@ def _may_compress(self, dtype, dformat, wire_data):
log.warn("Warning: clipboard contents are too big and have not been sent")
log.warn(" %s compressed bytes dropped (maximum is %s)", len(wire_data), self.max_clipboard_packet_size)
return None
if isinstance(wire_data, (str, bytes)) and len(wire_data)>=MIN_CLIPBOARD_COMPRESS_SIZE:
#if isinstance(wire_data, str):
#compression requires bytes:
#wire_data = wire_data.encode("utf8")
#but this would require the receiving end to use net_utf8()
if isinstance(wire_data, bytes) and len(wire_data)>=MIN_CLIPBOARD_COMPRESS_SIZE:
return Compressible("clipboard: %s / %s" % (dtype, dformat), wire_data)
return wire_data

Expand Down
29 changes: 18 additions & 11 deletions xpra/net/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from xpra.net.header import LZ4_FLAG, ZLIB_FLAG, BROTLI_FLAG


#MIN_COMPRESS_SIZE = envint("XPRA_MAX_DECOMPRESSED_SIZE", 512)
MIN_COMPRESS_SIZE = envint("XPRA_MAX_DECOMPRESSED_SIZE", -1)
MAX_DECOMPRESSED_SIZE = envint("XPRA_MAX_DECOMPRESSED_SIZE", 256*1024*1024)

#all the compressors we know about, in best compatibility order:
Expand Down Expand Up @@ -182,24 +184,29 @@ def compress(self):
raise Exception("compress() not defined on %s" % self)


def compressed_wrapper(datatype, data, level=5, zlib=False, lz4=False, brotli=False, none=False, can_inline=True):
def compressed_wrapper(datatype, data, level=5, can_inline=True, **kwargs):
size = len(data)
def no():
return Compressed("raw %s" % datatype, data, can_inline=can_inline)
if size<=MIN_COMPRESS_SIZE:
#don't bother
return no()
if size>MAX_DECOMPRESSED_SIZE:
sizemb = size//1024//1024
maxmb = MAX_DECOMPRESSED_SIZE//1024//1024
raise Exception("uncompressed data is too large: %iMB, limit is %iMB" % (sizemb, maxmb))
if lz4 and use("lz4"):
algo = "lz4"
elif brotli and use("brotli"):
algo = "brotli"
elif zlib and use("zlib"):
algo = "zlib"
elif none and use("none"):
algo = "none"
else:
raise InvalidCompressionException("no compressors available")
algos = [x for x in ALL_COMPRESSORS if kwargs.get(x)]
if not algos:
return no()
#raise InvalidCompressionException("no compressors available")
#TODO: smarter selection of algo based on datatype
#ie: 'text' -> brotli
algo = algos[0]
c = COMPRESSION[algo]
cl, cdata = c.compress(data, level)
min_saving = kwargs.get("min_saving", 0)
if len(cdata)>=size+min_saving:
return no()
return LevelCompressed(datatype, cdata, cl, algo, can_inline=can_inline)


Expand Down
6 changes: 3 additions & 3 deletions xpra/net/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,17 +549,17 @@ def encode(self, packet_in):
l = len(item)
except TypeError as e:
raise TypeError("invalid type %s in %s packet at index %s: %s" % (ti, packet[0], i, e)) from None
if ti==LargeStructure:
if issubclass(ti, LargeStructure):
packet[i] = item.data
continue
if ti==Compressible:
if issubclass(ti, Compressible):
#this is a marker used to tell us we should compress it now
#(used by the client for clipboard data)
item = item.compress()
packet[i] = item
ti = type(item)
#(it may now be a "Compressed" item and be processed further)
if ti in (Compressed, LevelCompressed):
if issubclass(ti, (Compressed, LevelCompressed)):
#already compressed data (usually pixels, cursors, etc)
if not item.can_inline or l>INLINE_SIZE:
il = 0
Expand Down
12 changes: 8 additions & 4 deletions xpra/server/mixins/logging_server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# This file is part of Xpra.
# Copyright (C) 2010-2020 Antoine Martin <antoine@xpra.org>
# Copyright (C) 2010-2021 Antoine Martin <antoine@xpra.org>
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.
#pylint: disable-msg=E1101
Expand Down Expand Up @@ -117,9 +117,9 @@ def local_err(message):
try:
try:
if args:
s = msg % args
data = msg % args
else:
s = msg
data = msg
except Exception as e:
local_err("failed to format log message")
return
Expand All @@ -129,7 +129,11 @@ def local_err(message):
continue
try:
dtime = int(1000*(monotonic_time() - start_time))
data = source.compressed_wrapper("text", enc(s))
if len(data)>=32:
try:
data = source.compressed_wrapper("text", data.encode("utf8"), level=1)
except Exception:
pass
source.send("logging", level, data, dtime)
exc_info = kwargs.get("exc_info")
if exc_info is True:
Expand Down
2 changes: 1 addition & 1 deletion xpra/server/picture_encode.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# This file is part of Xpra.
# Copyright (C) 2010-2019 Antoine Martin <antoine@xpra.org>
# Copyright (C) 2010-2021 Antoine Martin <antoine@xpra.org>
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.

Expand Down
16 changes: 5 additions & 11 deletions xpra/server/proxy/proxy_instance.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# This file is part of Xpra.
# Copyright (C) 2013-2020 Antoine Martin <antoine@xpra.org>
# Copyright (C) 2013-2021 Antoine Martin <antoine@xpra.org>
# Copyright (C) 2008 Nathaniel Smith <njs@pobox.com>
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.
Expand All @@ -9,7 +9,7 @@
from queue import Queue

from xpra.net.net_util import get_network_caps
from xpra.net.compression import Compressed, compressed_wrapper
from xpra.net.compression import Compressed, compressed_wrapper, MIN_COMPRESS_SIZE
from xpra.net.protocol import Protocol
from xpra.net.common import MAX_PACKET_SIZE
from xpra.net.digest import get_salt, gendigest
Expand Down Expand Up @@ -369,17 +369,11 @@ def get_server_packet(self):
def _packet_recompress(self, packet, index, name):
if len(packet)>index:
data = packet[index]
if len(data)<512:
if len(data)<MIN_COMPRESS_SIZE:
return
#this is ugly and not generic!
zlib = self.caps.boolget("zlib", True)
lz4 = self.caps.boolget("lz4", False)
#lzo = self.caps.boolget("lzo", False)
if zlib or lz4:
packet[index] = compressed_wrapper(name, data, zlib=zlib, lz4=lz4, can_inline=False)
else:
#prevent warnings about large uncompressed data
packet[index] = Compressed("raw %s" % name, data, can_inline=True)
kw = dict((k, self.caps.boolget(k)) for k in ("zlib", "lz4"))
packet[index] = compressed_wrapper(name, data, can_inline=False, **kw)


def cancel_server_ping_timer(self):
Expand Down
16 changes: 6 additions & 10 deletions xpra/server/source/client_connection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# This file is part of Xpra.
# Copyright (C) 2011 Serviware (Arthur Huillet, <ahuillet@serviware.com>)
# Copyright (C) 2010-2020 Antoine Martin <antoine@xpra.org>
# Copyright (C) 2010-2021 Antoine Martin <antoine@xpra.org>
# Copyright (C) 2008 Nathaniel Smith <njs@pobox.com>
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.
Expand Down Expand Up @@ -141,15 +141,11 @@ def may_notify(self, *args, **kwargs):
notification_mixin.NotificationMixin.may_notify(self, *args, **kwargs)


def compressed_wrapper(self, datatype, data, min_saving=128):
if self.zlib or self.lz4:
cw = compressed_wrapper(datatype, data, zlib=self.zlib, lz4=self.lz4, can_inline=False)
if len(cw)+min_saving<=len(data):
#the compressed version is smaller, use it:
return cw
#skip compressed version: fall through
#we can't compress, so at least avoid warnings in the protocol layer:
return Compressed(datatype, data, can_inline=True)
def compressed_wrapper(self, datatype, data, **kwargs):
#set compression flags based on self.zlib and self.lz4:
kw = dict((k, getattr(self, k, False)) for k in ("zlib", "lz4"))
kw.update(kwargs)
return compressed_wrapper(datatype, data, can_inline=False, **kw)


def update_bandwidth_limits(self):
Expand Down
6 changes: 1 addition & 5 deletions xpra/server/source/clipboard_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,5 @@ def compress_clipboard(self, packet):
packet = list(packet)
for i, item in enumerate(packet):
if isinstance(item, Compressible):
if self.brotli:
packet[i] = compressed_wrapper(item.datatype, item.data,
level=9, brotli=True, can_inline=False)
else:
packet[i] = self.compressed_wrapper(item.datatype, item.data)
packet[i] = compressed_wrapper(item.datatype, item.data, level=9, can_inline=False, brotli=True)
self.queue_packet(packet)
5 changes: 2 additions & 3 deletions xpra/sound/src.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,9 @@ def on_new_sample(self, _bus):

def _emit_buffer(self, data, metadata):
if self.stream_compressor and data:
cdata = compressed_wrapper("sound", data, level=9,
cdata = compressed_wrapper("sound", data, level=9, can_inline=True,
zlib=False,
lz4=self.stream_compressor=="lz4",
can_inline=True)
lz4=self.stream_compressor=="lz4")
if len(cdata)<len(data)*90//100:
log("compressed using %s from %i bytes down to %i bytes", self.stream_compressor, len(data), len(cdata))
metadata["compress"] = self.stream_compressor
Expand Down

0 comments on commit 6cec577

Please sign in to comment.