Skip to content

Commit

Permalink
#3659 use dataclasses improve the code
Browse files Browse the repository at this point in the history
makes the file transfer class much more readable
  • Loading branch information
totaam committed Oct 20, 2022
1 parent aa3c7a8 commit 59f2456
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 100 deletions.
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
is_CentOS, is_AlmaLinux, is_RockyLinux, is_RedHat, is_openSUSE, is_OracleLinux,
)

if sys.version_info<(3, 6):
raise Exception("xpra no longer supports Python versions older than 3.6")
if sys.version_info<(3, 7):
raise Exception("xpra no longer supports Python versions older than 3.7")
if BITS!=64:
print(f"Warning: {BITS}-bit architecture, only 64-bits are officially supported")
for _ in range(5):
Expand Down
209 changes: 111 additions & 98 deletions xpra/net/file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
import hashlib
import uuid
from time import monotonic
from dataclasses import dataclass

from xpra.child_reaper import getChildReaper
from xpra.os_util import bytestostr, memoryview_to_bytes, umask_context, POSIX, WIN32
from xpra.os_util import bytestostr, strtobytes, memoryview_to_bytes, umask_context, POSIX, WIN32
from xpra.util import typedict, csv, envint, envbool, engs, net_utf8, u
from xpra.scripts.config import parse_bool, parse_with_unit
from xpra.simple_stats import std_unit
Expand All @@ -26,7 +27,6 @@
MAX_CONCURRENT_FILES = max(1, envint("XPRA_MAX_CONCURRENT_FILES", 10))
PRINT_JOB_TIMEOUT = max(60, envint("XPRA_PRINT_JOB_TIMEOUT", 3600))
SEND_REQUEST_TIMEOUT = max(300, envint("XPRA_SEND_REQUEST_TIMEOUT", 3600))
ALWAYS_CHUNK = envbool("XPRA_FILE_ALWAYS_CHUNK", False)
CHUNK_TIMEOUT = 10*1000

MIMETYPE_EXTS = {
Expand Down Expand Up @@ -93,6 +93,30 @@ def safe_open_download_file(basefilename, mimetype):
filelog(f"using filename {filename!r}, file descriptor={fd}")
return filename, fd

@dataclass
class ReceiveChunkState:
start: float
fd: int
filename: str
mimetype: str
printit : bool
openit : bool
filesize: int
options: dict
digest: object
written: int
cancelled: bool
send_id: str
timer: int
chunk: str
@dataclass
class SendChunkState:
start: float
data: object
chunk_size: int
timer: int
chunk: int


class FileTransferAttributes:

Expand Down Expand Up @@ -275,18 +299,18 @@ def _check_chunk_receiving(self, chunk_id, chunk_no):
if not chunk_state:
#transfer not found
return
if chunk_state[-4]:
if chunk_state.cancelled:
#transfer has been cancelled
return
chunk_state[-2] = 0 #this timer has been used
if chunk_state[-1]==chunk_no:
filelog.error("Error: chunked file transfer '%s' timed out", chunk_id)
chunk_state.timer = 0 #this timer has been used
if chunk_state.chunk==chunk_no:
filelog.error(f"Error: chunked file transfer f{chunk_id} timed out")
self.receive_chunks_in_progress.pop(chunk_id, None)

def cancel_download(self, send_id, message="Cancelled"):
filelog("cancel_download(%s, %s)", send_id, message)
for chunk_id, chunk_state in dict(self.receive_chunks_in_progress).items():
if chunk_state[-3]==send_id:
if chunk_state.send_id==send_id:
self.cancel_file(chunk_id, message)
return
filelog.error("Error: cannot cancel download %s, entry not found!", u(send_id))
Expand All @@ -296,113 +320,112 @@ def cancel_file(self, chunk_id, message, chunk=0):
chunk_state = self.receive_chunks_in_progress.get(chunk_id)
if chunk_state:
#mark it as cancelled:
chunk_state[-4] = True
timer = chunk_state[-2]
chunk_state.cancelled = True
timer = chunk_state.timer
if timer:
chunk_state[-2] = 0
chunk_state.timer = 0
self.source_remove(timer)
fd = chunk_state[1]
osclose(fd)
osclose(chunk_state.fd)
#remove this transfer after a little while,
#so in-flight packets won't cause errors
def clean_receive_state():
self.receive_chunks_in_progress.pop(chunk_id, None)
return False
self.timeout_add(20000, clean_receive_state)
filename = chunk_state[2]
filename = chunk_state.filename
try:
os.unlink(filename)
except OSError as e:
filelog("os.unlink(%s)", filename, exc_info=True)
filelog(f"os.unlink({filename})", exc_info=True)
filelog.error("Error: failed to delete temporary download file")
filelog.error(" '%s' : %s", filename, e)
filelog.error(f" {filename!r} : {e}")
self.send("ack-file-chunk", chunk_id, False, message, chunk)

def _process_send_file_chunk(self, packet):
chunk_id, chunk, file_data, has_more = packet[1:5]
chunk_id = net_utf8(chunk_id)
#if len(file_data)<1024:
# from xpra.os_util import hexstr
# filelog.warn("file_data=%s", hexstr(file_data))
#filelog(f"file_data={len(file_data)} {type(file_data)}")
filelog(f"file_data={len(file_data)} {type(file_data)}")
filelog("_process_send_file_chunk%s", (chunk_id, chunk, f"{len(file_data)} bytes", has_more))
chunk_state = self.receive_chunks_in_progress.get(chunk_id)
if not chunk_state:
filelog.error("Error: cannot find the file transfer id '%r'", chunk_id)
filelog.error(f"Error: cannot find the file transfer id {chunk_id!r}")
self.cancel_file(chunk_id, f"file transfer id {chunk_id!r} not found", chunk)
return
if chunk_state[-4]:
if chunk_state.cancelled:
filelog("got chunk for a cancelled file transfer, ignoring it")
return
def progress(position, error=None):
start = chunk_state[0]
send_id = chunk_state[-3]
filesize = chunk_state[6]
self.transfer_progress_update(False, send_id, monotonic()-start, position, filesize, error)
fd = chunk_state[1]
if chunk_state[-1]+1!=chunk:
filelog.error("Error: chunk number mismatch, expected %i but got %i", chunk_state[-1]+1, chunk)
elapsed = monotonic()-chunk_state.start
self.transfer_progress_update(False, chunk_state.send_id, elapsed, position, chunk_state.filesize, error)
fd = chunk_state.fd
if chunk_state.chunk+1!=chunk:
filelog.error("Error: chunk number mismatch, expected %i but got %i", chunk_state.chunk+1, chunk)
self.cancel_file(chunk_id, "chunk number mismatch", chunk)
osclose(fd)
progress(-1, "chunk no mismatch")
return
file_data = memoryview_to_bytes(file_data)
#this is for legacy packet encoders only:
if isinstance(file_data, str):
file_data = strtobytes(file_data)
#update chunk number:
chunk_state[-1] = chunk
digest = chunk_state[8]
written = chunk_state[9]
chunk_state.chunk = chunk
try:
os.write(fd, file_data)
if digest:
digest.update(file_data)
written += len(file_data)
chunk_state[9] = written
if chunk_state.digest:
chunk_state.digest.update(file_data)
chunk_state.written += len(file_data)
except OSError as e:
filelog.error("Error: cannot write file chunk")
filelog.estr(e)
self.cancel_file(chunk_id, f"write error: {e}", chunk)
osclose(fd)
progress(-1, f"write error ({e}")
return
filesize = chunk_state[6]
if written>filesize:
if chunk_state.written>chunk_state.filesize:
filelog.error("Error: too much data received")
progress(-1, "file data size mismatch")
return
self.send("ack-file-chunk", chunk_id, True, "", chunk)
if chunk_state[-4]:
if chunk_state.cancelled:
#check again if the transfer has been cancelled
filelog("got chunk for a cancelled file transfer, ignoring it")
return
if has_more:
progress(written)
timer = chunk_state[-2]
if timer:
self.source_remove(timer)
progress(chunk_state.written)
if chunk_state.timer:
self.source_remove(chunk_state.timer)
#remote end will send more after receiving the ack
timer = self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_receiving, chunk_id, chunk)
chunk_state[-2] = timer
chunk_state.timer = self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_receiving, chunk_id, chunk)
filelog("waiting for the next chunk, got %8i of %8i: %3i%%",
chunk_state.written, chunk_state.filesize, 100*chunk_state.written/chunk_state.filesize)
return
#we have received all the packets
self.receive_chunks_in_progress.pop(chunk_id, None)
osclose(fd)
if digest:
options = chunk_state[7]
expected_digest = options.strget(digest.name) #ie: "sha256"
filename = chunk_state[2]
if expected_digest and digest.hexdigest()!=expected_digest:
filename = chunk_state.filename
options = chunk_state.options
filelog(f"file {filename!r} complete")
if chunk_state.digest:
expected_digest = options.strget(chunk_state.digest.name) #ie: "sha256"
if expected_digest and chunk_state.digest.hexdigest()!=expected_digest:
progress(-1, "checksum mismatch")
self.digest_mismatch(filename, digest, expected_digest)
self.digest_mismatch(filename, chunk_state.digest, expected_digest)
return
filelog("%s digest matches: %s", digest.name, expected_digest)
filelog("%s digest matches: %s", chunk_state.digest.name, expected_digest)
#check file size and digest then process it:
filename, mimetype, printit, openit, filesize, options = chunk_state[2:8]
if written!=filesize:
filelog.error("Error: expected a file of %i bytes, got %i", filesize, written)
if chunk_state.written!=chunk_state.filesize:
filelog.error("Error: expected a file of %i bytes, got %i", chunk_state.filesize, chunk_state.written)
progress(-1, "file size mismatch")
return
progress(written)
start_time = chunk_state[0]
elapsed = monotonic()-start_time
mimetype = bytestostr(mimetype)
filelog("%i bytes received in %i chunks, took %ims", filesize, chunk, elapsed*1000)
self.process_downloaded_file(filename, mimetype, printit, openit, filesize, options)
progress(chunk_state.written)
elapsed = monotonic()-chunk_state.start
filelog("%i bytes received in %i chunks, took %ims", chunk_state.filesize, chunk, elapsed*1000)
self.process_downloaded_file(filename, chunk_state.mimetype,
chunk_state.printit, chunk_state.openit, chunk_state.filesize, options)

def accept_data(self, send_id, dtype, basefilename, printit, openit):
#subclasses should check the flags,
Expand Down Expand Up @@ -491,15 +514,12 @@ def _process_send_file(self, packet):
osclose(fd)
return
timer = self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_receiving, chunk_id, chunk)
chunk_state = [
monotonic(),
fd, filename, mimetype,
printit, openit, filesize,
options, digest, 0, False, send_id,
timer, chunk,
]
self.receive_chunks_in_progress[chunk_id] = chunk_state
self.send("ack-file-chunk", chunk_id, True, "", chunk)
self.receive_chunks_in_progress[chunk_id] = ReceiveChunkState(monotonic(),
fd, filename, mimetype,
printit, openit, filesize,
options, digest, 0, False, send_id,
timer, chunk)
self.send("ack-file-chunk", chunk_id, True, b"", chunk)
return
#not chunked, full file:
if not file_data:
Expand Down Expand Up @@ -660,19 +680,13 @@ def open_done(*_args):
cr = getChildReaper()
cr.add_process(proc, f"Open file {url}", command, True, True, open_done)

def file_size_warning(self, action, location, basefilename, filesize, limit=0):
filelog.warn(f"Warning: cannot {action} the file {basefilename!r}")
if filesize<=0:
filelog.warn(" this file is empty")
else:
filelog.warn(" this file is too large: %sB", std_unit(filesize))
filelog.warn(f" the {location} file size limit is %sB", std_unit(limit))
def file_size_warning(self, action, location, basefilename, filesize, limit):
filelog.warn("Warning: cannot %s the file '%s'", action, basefilename)
filelog.warn(" this file is too large: %sB", std_unit(filesize))
filelog.warn(" the %s file size limit is %sB", location, std_unit(limit))

def check_file_size(self, action, filename, filesize):
basefilename = os.path.basename(filename)
if filesize<=0:
self.file_size_warning(action, "n/a", basefilename, filesize)
return False
if filesize>self.file_size_limit:
self.file_size_warning(action, "local", basefilename, filesize, self.file_size_limit)
return False
Expand Down Expand Up @@ -902,7 +916,7 @@ def do_send_file(self, filename, mimetype, data, filesize=0, printit=False, open
options = options or {}
options["sha256"] = h.hexdigest()
chunk_size = min(self.file_chunks, self.remote_file_chunks)
if 0<chunk_size<filesize or ALWAYS_CHUNK:
if 0<chunk_size<filesize:
in_progress = len(self.send_chunks_in_progress)
if in_progress>=MAX_CONCURRENT_FILES:
raise Exception(f"too many file transfers in progress: {in_progress}")
Expand All @@ -912,9 +926,8 @@ def do_send_file(self, filename, mimetype, data, filesize=0, printit=False, open
#timer to check that the other end is requesting more chunks:
chunk_no = 0
timer = self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_sending, chunk_id, chunk_no)
chunk_state = [monotonic(), data, chunk_size, timer, chunk_no]
self.send_chunks_in_progress[chunk_id] = chunk_state
cdata = ""
self.send_chunks_in_progress[chunk_id] = SendChunkState(monotonic(), data, chunk_size, timer, chunk_no)
cdata = b""
filelog("using chunks, sending initial file-chunk-id=%s, for chunk size=%s",
chunk_id, chunk_size)
else:
Expand All @@ -932,20 +945,20 @@ def _check_chunk_sending(self, chunk_id, chunk_no):
if not chunk_state:
#transfer already removed
return
chunk_state[3] = 0 #timer has fired
if chunk_state[-1]==chunk_no:
filelog.error("Error: chunked file transfer '%s' timed out", chunk_id)
filelog.error(" on chunk %i", chunk_no)
chunk_state.timer = 0 #timer has fired
if chunk_state.chunk==chunk_no:
filelog.error(f"Error: chunked file transfer {chunk_id} timed out")
filelog.error(f" on chunk {chunk_no}")
self.cancel_sending(chunk_id)

def cancel_sending(self, chunk_id):
chunk_state = self.send_chunks_in_progress.pop(chunk_id, None)
filelog("cancel_sending(%s) chunk state found: %s", chunk_id, bool(chunk_state))
if not chunk_state:
return
timer = chunk_state[3]
timer = chunk_state.timer
if timer:
chunk_state[3] = 0
chunk_state.timer = 0
self.source_remove(timer)

def _process_ack_file_chunk(self, packet):
Expand All @@ -963,28 +976,28 @@ def _process_ack_file_chunk(self, packet):
if not chunk_state:
filelog.error(f"Error: cannot find the file transfer id {chunk_id!r}")
return
if chunk_state[-1]!=chunk:
filelog.error("Error: chunk number mismatch (%i vs %i)", chunk_state[-1], chunk)
if chunk_state.chunk!=chunk:
filelog.error("Error: chunk number mismatch (%i vs %i)", chunk_state.chunk, chunk)
self.cancel_sending(chunk_id)
return
start_time, data, chunk_size, timer, chunk = chunk_state
if not data:
chunk_size = chunk_state.chunk_size
if not chunk_state.data:
#all sent!
elapsed = monotonic()-start_time
elapsed = monotonic()-chunk_state.start
filelog("%i chunks of %i bytes sent in %ims (%sB/s)",
chunk, chunk_size, elapsed*1000, std_unit(chunk*chunk_size/elapsed))
self.cancel_sending(chunk_id)
return
assert chunk_size>0
#carve out another chunk:
cdata = self.compressed_wrapper("file-data", data[:chunk_size])
data = data[chunk_size:]
cdata = self.compressed_wrapper("file-data", chunk_state.data[:chunk_size])
chunk_state.data = chunk_state.data[chunk_size:]
chunk += 1
if timer:
self.source_remove(timer)
timer = self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_sending, chunk_id, chunk)
self.send_chunks_in_progress[chunk_id] = [start_time, data, chunk_size, timer, chunk]
self.send("send-file-chunk", chunk_id, chunk, cdata, bool(data))
chunk_state.chunk = chunk
if chunk_state.timer:
self.source_remove(chunk_state.timer)
chunk_state.timer = self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_sending, chunk_id, chunk)
self.send("send-file-chunk", chunk_id, chunk, cdata, bool(chunk_state.data))

def send(self, *parts):
raise NotImplementedError()
Expand Down

0 comments on commit 59f2456

Please sign in to comment.