From 6cec577a8cc65baca753f7aa62774f8b7fd45981 Mon Sep 17 00:00:00 2001 From: totaam Date: Wed, 8 Sep 2021 21:26:06 +0700 Subject: [PATCH] #3260 refactor compression functions --- xpra/client/client_base.py | 21 +++++------ xpra/client/mixins/clipboard.py | 42 +++++++++++++--------- xpra/client/mixins/remote_logging.py | 10 ++++-- xpra/client/mixins/stub_client_mixin.py | 2 +- xpra/clipboard/clipboard_core.py | 6 +++- xpra/net/compression.py | 29 +++++++++------ xpra/net/protocol.py | 6 ++-- xpra/server/mixins/logging_server.py | 12 ++++--- xpra/server/picture_encode.py | 2 +- xpra/server/proxy/proxy_instance.py | 16 +++------ xpra/server/source/client_connection.py | 16 ++++----- xpra/server/source/clipboard_connection.py | 6 +--- xpra/sound/src.py | 5 ++- 13 files changed, 93 insertions(+), 80 deletions(-) diff --git a/xpra/client/client_base.py b/xpra/client/client_base.py index 2ccab538c7..213939ee66 100644 --- a/xpra/client/client_base.py +++ b/xpra/client/client_base.py @@ -486,17 +486,18 @@ def make_hello(self): capabilities["aliases"] = reverse_aliases return capabilities - def compressed_wrapper(self, datatype, data, level=5): + def compressed_wrapper(self, datatype, data, level=5, **kwargs): if level>0 and len(data)>=256: - #ugly assumptions here, should pass by name - zlib = "zlib" in self.server_compressors - lz4 = "lz4" in self.server_compressors - #never use brotli as a generic compressor - #brotli = "brotli" in self.server_compressors and compression.use_brotli - cw = compression.compressed_wrapper(datatype, data, level=level, - zlib=zlib, lz4=lz4, - brotli=False, none=True, - can_inline=False) + kw = {} + #brotli is not enabled by default as a generic compressor + #but callers may choose to enable it via kwargs: + for algo, defval in { + "zlib" : True, + "lz4" : True, + "brotli" : False, + }.items(): + kw[algo] = algo in self.server_compressors and compression.use(algo) and kwargs.get(algo, defval) + cw = compression.compressed_wrapper(datatype, data, level=level, can_inline=False, **kw) if len(cw) +# Copyright (C) 2010-2021 Antoine Martin # Copyright (C) 2008, 2010 Nathaniel Smith # Xpra is released under the terms of the GNU GPL v2, or, at your option, any # later version. See the file COPYING for details. @@ -7,6 +7,7 @@ from xpra.client.mixins.stub_client_mixin import StubClientMixin from xpra.platform.features import CLIPBOARD_WANT_TARGETS, CLIPBOARD_GREEDY, CLIPBOARD_PREFERRED_TARGETS, CLIPBOARDS from xpra.platform.gui import get_clipboard_native_class +from xpra.net import compression from xpra.scripts.config import FALSE_OPTIONS, TRUE_OPTIONS from xpra.util import flatten_dict, typedict from xpra.os_util import bytestostr, is_Wayland @@ -278,24 +279,14 @@ def clipboard_send(*parts): if not self.clipboard_enabled: log("clipboard is disabled, not sending clipboard packet") return - #handle clipboard compression if needed: - from xpra.net.compression import Compressible + #replaces 'Compressible' items in a packet + #with a subclass that calls self.compressed_wrapper + #and which can therefore enable the brotli compressor: packet = list(parts) - for v in packet: - if isinstance(v, Compressible): - register_clipboard_compress_cb(v) + for i, v in enumerate(packet): + if isinstance(v, compression.Compressible): + packet[i] = self.compressible_item(v) self.send_now(*packet) - def register_clipboard_compress_cb(compressible): - #register the compressor which will fire in protocol.encode: - def compress_clipboard(): - log("compress_clipboard() compressing %s, server compressors=%s", - compressible, self.server_compressors) - from xpra.net import compression - if "brotli" in self.server_compressors and compression.use("brotli"): - return compression.compressed_wrapper(compressible.datatype, compressible.data, - level=9, brotli=True, can_inline=False) - return self.compressed_wrapper(compressible.datatype, compressible.data) - compressible.compress = compress_clipboard def clipboard_progress(local_requests, remote_requests): log("clipboard_progress(%s, %s)", local_requests, remote_requests) if local_requests is not None: @@ -308,5 +299,22 @@ def clipboard_progress(local_requests, remote_requests): hc.set_preferred_targets(self.server_clipboard_preferred_targets) return hc + def compressible_item(self, compressible): + """ + converts a 'Compressible' item into something that will + call `self.compressed_wrapper` when compression is requested + by the network encode thread. + """ + client = self + class ProtocolCompressible(compression.Compressible): + __slots__ = () + def compress(self): + log.error("compress() 1") + v = client.compressed_wrapper(self.datatype, self.data, + level=9, can_inline=False, brotli=True) + log.error("compress() 2") + return v + return ProtocolCompressible(compressible.datatype, compressible.data) + def clipboard_notify(self, n): log("clipboard_notify(%i)", n) diff --git a/xpra/client/mixins/remote_logging.py b/xpra/client/mixins/remote_logging.py index 869bb86c7e..bab5afd83e 100644 --- a/xpra/client/mixins/remote_logging.py +++ b/xpra/client/mixins/remote_logging.py @@ -112,10 +112,14 @@ def local_warn(*args): try: dtime = int(1000*(monotonic_time() - self.monotonic_start_time)) if args: - s = msg % args + data = msg % args else: - s = msg - data = self.compressed_wrapper("text", s, level=1) + data = msg + if len(data)>=32: + try: + data = self.compressed_wrapper("text", data.encode("utf8"), level=1) + except Exception: + pass self.send("logging", level, data, dtime) exc_info = kwargs.get("exc_info") if exc_info is True: diff --git a/xpra/client/mixins/stub_client_mixin.py b/xpra/client/mixins/stub_client_mixin.py index ceb5383185..5d136661af 100644 --- a/xpra/client/mixins/stub_client_mixin.py +++ b/xpra/client/mixins/stub_client_mixin.py @@ -87,7 +87,7 @@ def process_ui_capabilities(self, caps : typedict): This runs in the UI thread. """ - def compressed_wrapper(self, datatype, data, level=5): + def compressed_wrapper(self, datatype, data, level=5, **kwargs): """ Dummy utility method for compressing data. Actual client implementations will provide compression diff --git a/xpra/clipboard/clipboard_core.py b/xpra/clipboard/clipboard_core.py index e0c6473153..1641a4ceb2 100644 --- a/xpra/clipboard/clipboard_core.py +++ b/xpra/clipboard/clipboard_core.py @@ -458,7 +458,11 @@ def _may_compress(self, dtype, dformat, wire_data): log.warn("Warning: clipboard contents are too big and have not been sent") log.warn(" %s compressed bytes dropped (maximum is %s)", len(wire_data), self.max_clipboard_packet_size) return None - if isinstance(wire_data, (str, bytes)) and len(wire_data)>=MIN_CLIPBOARD_COMPRESS_SIZE: + #if isinstance(wire_data, str): + #compression requires bytes: + #wire_data = wire_data.encode("utf8") + #but this would require the receiving end to use net_utf8() + if isinstance(wire_data, bytes) and len(wire_data)>=MIN_CLIPBOARD_COMPRESS_SIZE: return Compressible("clipboard: %s / %s" % (dtype, dformat), wire_data) return wire_data diff --git a/xpra/net/compression.py b/xpra/net/compression.py index f267bdc122..9b1eb6e194 100644 --- a/xpra/net/compression.py +++ b/xpra/net/compression.py @@ -11,6 +11,8 @@ from xpra.net.header import LZ4_FLAG, ZLIB_FLAG, BROTLI_FLAG +#MIN_COMPRESS_SIZE = envint("XPRA_MAX_DECOMPRESSED_SIZE", 512) +MIN_COMPRESS_SIZE = envint("XPRA_MAX_DECOMPRESSED_SIZE", -1) MAX_DECOMPRESSED_SIZE = envint("XPRA_MAX_DECOMPRESSED_SIZE", 256*1024*1024) #all the compressors we know about, in best compatibility order: @@ -182,24 +184,29 @@ def compress(self): raise Exception("compress() not defined on %s" % self) -def compressed_wrapper(datatype, data, level=5, zlib=False, lz4=False, brotli=False, none=False, can_inline=True): +def compressed_wrapper(datatype, data, level=5, can_inline=True, **kwargs): size = len(data) + def no(): + return Compressed("raw %s" % datatype, data, can_inline=can_inline) + if size<=MIN_COMPRESS_SIZE: + #don't bother + return no() if size>MAX_DECOMPRESSED_SIZE: sizemb = size//1024//1024 maxmb = MAX_DECOMPRESSED_SIZE//1024//1024 raise Exception("uncompressed data is too large: %iMB, limit is %iMB" % (sizemb, maxmb)) - if lz4 and use("lz4"): - algo = "lz4" - elif brotli and use("brotli"): - algo = "brotli" - elif zlib and use("zlib"): - algo = "zlib" - elif none and use("none"): - algo = "none" - else: - raise InvalidCompressionException("no compressors available") + algos = [x for x in ALL_COMPRESSORS if kwargs.get(x)] + if not algos: + return no() + #raise InvalidCompressionException("no compressors available") + #TODO: smarter selection of algo based on datatype + #ie: 'text' -> brotli + algo = algos[0] c = COMPRESSION[algo] cl, cdata = c.compress(data, level) + min_saving = kwargs.get("min_saving", 0) + if len(cdata)>=size+min_saving: + return no() return LevelCompressed(datatype, cdata, cl, algo, can_inline=can_inline) diff --git a/xpra/net/protocol.py b/xpra/net/protocol.py index 5c3038ae3a..d5952f0c22 100644 --- a/xpra/net/protocol.py +++ b/xpra/net/protocol.py @@ -549,17 +549,17 @@ def encode(self, packet_in): l = len(item) except TypeError as e: raise TypeError("invalid type %s in %s packet at index %s: %s" % (ti, packet[0], i, e)) from None - if ti==LargeStructure: + if issubclass(ti, LargeStructure): packet[i] = item.data continue - if ti==Compressible: + if issubclass(ti, Compressible): #this is a marker used to tell us we should compress it now #(used by the client for clipboard data) item = item.compress() packet[i] = item ti = type(item) #(it may now be a "Compressed" item and be processed further) - if ti in (Compressed, LevelCompressed): + if issubclass(ti, (Compressed, LevelCompressed)): #already compressed data (usually pixels, cursors, etc) if not item.can_inline or l>INLINE_SIZE: il = 0 diff --git a/xpra/server/mixins/logging_server.py b/xpra/server/mixins/logging_server.py index 96b2a4cf8f..6cb1f001bb 100644 --- a/xpra/server/mixins/logging_server.py +++ b/xpra/server/mixins/logging_server.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # This file is part of Xpra. -# Copyright (C) 2010-2020 Antoine Martin +# Copyright (C) 2010-2021 Antoine Martin # Xpra is released under the terms of the GNU GPL v2, or, at your option, any # later version. See the file COPYING for details. #pylint: disable-msg=E1101 @@ -117,9 +117,9 @@ def local_err(message): try: try: if args: - s = msg % args + data = msg % args else: - s = msg + data = msg except Exception as e: local_err("failed to format log message") return @@ -129,7 +129,11 @@ def local_err(message): continue try: dtime = int(1000*(monotonic_time() - start_time)) - data = source.compressed_wrapper("text", enc(s)) + if len(data)>=32: + try: + data = source.compressed_wrapper("text", data.encode("utf8"), level=1) + except Exception: + pass source.send("logging", level, data, dtime) exc_info = kwargs.get("exc_info") if exc_info is True: diff --git a/xpra/server/picture_encode.py b/xpra/server/picture_encode.py index 2d5f5ddc04..ad34f973a9 100644 --- a/xpra/server/picture_encode.py +++ b/xpra/server/picture_encode.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # This file is part of Xpra. -# Copyright (C) 2010-2019 Antoine Martin +# Copyright (C) 2010-2021 Antoine Martin # Xpra is released under the terms of the GNU GPL v2, or, at your option, any # later version. See the file COPYING for details. diff --git a/xpra/server/proxy/proxy_instance.py b/xpra/server/proxy/proxy_instance.py index 161ceccc0f..269be5806f 100644 --- a/xpra/server/proxy/proxy_instance.py +++ b/xpra/server/proxy/proxy_instance.py @@ -1,5 +1,5 @@ # This file is part of Xpra. -# Copyright (C) 2013-2020 Antoine Martin +# Copyright (C) 2013-2021 Antoine Martin # Copyright (C) 2008 Nathaniel Smith # Xpra is released under the terms of the GNU GPL v2, or, at your option, any # later version. See the file COPYING for details. @@ -9,7 +9,7 @@ from queue import Queue from xpra.net.net_util import get_network_caps -from xpra.net.compression import Compressed, compressed_wrapper +from xpra.net.compression import Compressed, compressed_wrapper, MIN_COMPRESS_SIZE from xpra.net.protocol import Protocol from xpra.net.common import MAX_PACKET_SIZE from xpra.net.digest import get_salt, gendigest @@ -369,17 +369,11 @@ def get_server_packet(self): def _packet_recompress(self, packet, index, name): if len(packet)>index: data = packet[index] - if len(data)<512: + if len(data)) -# Copyright (C) 2010-2020 Antoine Martin +# Copyright (C) 2010-2021 Antoine Martin # Copyright (C) 2008 Nathaniel Smith # Xpra is released under the terms of the GNU GPL v2, or, at your option, any # later version. See the file COPYING for details. @@ -141,15 +141,11 @@ def may_notify(self, *args, **kwargs): notification_mixin.NotificationMixin.may_notify(self, *args, **kwargs) - def compressed_wrapper(self, datatype, data, min_saving=128): - if self.zlib or self.lz4: - cw = compressed_wrapper(datatype, data, zlib=self.zlib, lz4=self.lz4, can_inline=False) - if len(cw)+min_saving<=len(data): - #the compressed version is smaller, use it: - return cw - #skip compressed version: fall through - #we can't compress, so at least avoid warnings in the protocol layer: - return Compressed(datatype, data, can_inline=True) + def compressed_wrapper(self, datatype, data, **kwargs): + #set compression flags based on self.zlib and self.lz4: + kw = dict((k, getattr(self, k, False)) for k in ("zlib", "lz4")) + kw.update(kwargs) + return compressed_wrapper(datatype, data, can_inline=False, **kw) def update_bandwidth_limits(self): diff --git a/xpra/server/source/clipboard_connection.py b/xpra/server/source/clipboard_connection.py index e8b8716e0f..a632d35fcf 100644 --- a/xpra/server/source/clipboard_connection.py +++ b/xpra/server/source/clipboard_connection.py @@ -126,9 +126,5 @@ def compress_clipboard(self, packet): packet = list(packet) for i, item in enumerate(packet): if isinstance(item, Compressible): - if self.brotli: - packet[i] = compressed_wrapper(item.datatype, item.data, - level=9, brotli=True, can_inline=False) - else: - packet[i] = self.compressed_wrapper(item.datatype, item.data) + packet[i] = compressed_wrapper(item.datatype, item.data, level=9, can_inline=False, brotli=True) self.queue_packet(packet) diff --git a/xpra/sound/src.py b/xpra/sound/src.py index 176389bbb9..111e475402 100755 --- a/xpra/sound/src.py +++ b/xpra/sound/src.py @@ -266,10 +266,9 @@ def on_new_sample(self, _bus): def _emit_buffer(self, data, metadata): if self.stream_compressor and data: - cdata = compressed_wrapper("sound", data, level=9, + cdata = compressed_wrapper("sound", data, level=9, can_inline=True, zlib=False, - lz4=self.stream_compressor=="lz4", - can_inline=True) + lz4=self.stream_compressor=="lz4") if len(cdata)