Skip to content

Commit

Permalink
other logs
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Dec 11, 2023
1 parent ec9b3ea commit 86d25c6
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 6 deletions.
34 changes: 28 additions & 6 deletions streamflow/deployment/aiotarstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,14 @@ def __init__(self, stream):
self.position: int = 0

async def read(self, size: int | None = None):
logger.info(
f"TellableStreamWrapper.read() -> position: {self.position} read: {size}"
)
buf = await self.stream.read(size)
self.position += len(buf)
logger.info(
f"TellableStreamWrapper.read() -> upd-pos: {self.position}, len_buf: {len(buf)}"
)
return buf

def tell(self):
Expand Down Expand Up @@ -294,19 +300,28 @@ async def write(self, data: Any):
class AioTarInfo(tarfile.TarInfo):
@classmethod
async def fromtarfile(cls, tarstream):
logger.info(f"fromtarfile: read -> blocksize {tarfile.BLOCKSIZE}")
buf = await tarstream.stream.read(tarfile.BLOCKSIZE)
try:
logger.info(f"buf: {buf}")
logger.info(f"stream_type: {type(tarstream.stream)}")
logger.info(f"buf: {len(buf)}, stream.pos: {tarstream.stream.position}")
logger.info(f"(fromtarfile) buf: {buf}")
logger.info(f"(fromtarfile) stream_type: {type(tarstream.stream)}")
logger.info(
f"(fromtarfile) len_buf: {len(buf)}, stream.pos: {tarstream.stream.position}"
)
obj = cls.frombuf(buf, tarstream.encoding, tarstream.errors)
except tarfile.InvalidHeaderError as e:
chksum = tarfile.nti(buf[148:156])
calc_chksums = tarfile.calc_chksums(buf)
logger.info(
f"chksum: {chksum}, calc_chksums {calc_chksums} -> Got {chksum not in calc_chksums}, expected True because the `tarfile.InvalidHeaderError` was thrown"
)
logger.info(f"detected error {e}")
logger.info(f"detected invalidHeaderError {e}")
raise e
except tarfile.TruncatedHeaderError as e:
logger.info(
f"len(buf): {len(buf)}, tarfile.BLOCKSIZE: {tarfile.BLOCKSIZE}, len(buf) != BLOCKSIZE: {len(buf) != tarfile.BLOCKSIZE}"
)
logger.info(f"detected TruncatedHeaderError {e}")
raise e
obj.offset = tarstream.stream.tell() - tarfile.BLOCKSIZE
return await obj._proc_member(tarstream)
Expand All @@ -323,6 +338,7 @@ def _proc_builtin(self, tarstream):
return self

async def _proc_gnulong(self, tarstream):
logger.info(f"_proc_gnulong: read -> size: {self.size}")
buf = await tarstream.stream.read(self._block(self.size))
try:
next = await self.fromtarfile(tarstream)
Expand All @@ -337,11 +353,15 @@ async def _proc_gnulong(self, tarstream):

async def _proc_gnusparse_10(self, next, pax_headers, tarstream):
sparse = []
logger.info(f"_proc_gnusparse_10: read -> blocksize {tarfile.BLOCKSIZE}")
buf = await tarstream.stream.read(tarfile.BLOCKSIZE)
fields, buf = buf.split(b"\n", 1)
fields = int(fields)
while len(sparse) < fields * 2:
if b"\n" not in buf:
logger.info(
f"_proc_gnusparse_10 while: read -> blocksize {tarfile.BLOCKSIZE}"
)
buf += await tarstream.stream.read(tarfile.BLOCKSIZE)
number, buf = buf.split(b"\n", 1)
sparse.append(int(number))
Expand All @@ -359,6 +379,7 @@ async def _proc_member(self, tarstream):
return self._proc_builtin(tarstream)

async def _proc_pax(self, tarstream):
logger.info(f"_proc_pax: read -> size: {self.size}")
buf = await tarstream.stream.read(self._block(self.size))
if self.type == tarfile.XGLTYPE:
pax_headers = tarstream.pax_headers
Expand Down Expand Up @@ -423,6 +444,7 @@ async def _proc_sparse(self, tarstream):
structs, isextended, origsize = self._sparse_structs
del self._sparse_structs
while isextended:
logger.info(f"_proc_sparse: read -> blocksize {tarfile.BLOCKSIZE}")
buf = await tarstream.stream.read(tarfile.BLOCKSIZE)
pos = 0
for _ in range(21):
Expand Down Expand Up @@ -984,12 +1006,12 @@ async def makefile(self, tarinfo, targetpath):
if tarinfo.sparse is not None:
for offset, size in tarinfo.sparse:
logger.info(
f"makefile call target.seek with offset {offset}, target.position {target.position}"
f"makefile call target.seek with offset {offset}, t_target {type(target)}, target.position {target.position}"
)
target.seek(offset)
await copyfileobj(self.stream, target, size, bufsize)
logger.info(
f"makefile call target.seek with tarinfo.size {tarinfo.size}, target.position: {target.position}"
f"makefile call target.seek with tarinfo.size {tarinfo.size}, t_target {type(target)}, target.position: {target.position}"
)
target.seek(tarinfo.size)
target.truncate()
Expand Down
15 changes: 15 additions & 0 deletions streamflow/deployment/connector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from streamflow.core.exception import WorkflowExecutionException
from streamflow.core.utils import get_local_to_remote_destination
from streamflow.data import remotepath

Check notice

Code scanning / CodeQL

Cyclic import Note

Import of module
streamflow.data.remotepath
begins an import cycle.
from streamflow.deployment import aiotarstream
from streamflow.deployment.future import FutureAware
from streamflow.deployment.stream import (
Expand All @@ -31,6 +32,19 @@
from typing import Any, MutableMapping


async def _inner_print_struct(conn, loc, path, lvl):
for entity in await remotepath.listdir(conn, loc, path):
logger.info("\t" * lvl + os.path.basename(entity))
if await remotepath.isdir(conn, loc, entity):
await _inner_print_struct(conn, loc, entity, lvl + 1)


async def _print_struct(conn, loc, path):
logger.info(f"loc {loc} -> root path: {path}")
if await remotepath.isdir(conn, loc, path):
await _inner_print_struct(conn, loc, path, 1)


async def extract_tar_stream(
tar: aiotarstream.AioTarStream,
src: str,
Expand Down Expand Up @@ -301,6 +315,7 @@ async def copy_remote_to_local(
logger.info(
f"COPYING {src} on location {locations[0]} to {dst} on local file-system"
)
await _print_struct(self, locations[0], src)
await self._copy_remote_to_local(
src=src, dst=dst, location=locations[0], read_only=read_only
)
Expand Down

0 comments on commit 86d25c6

Please sign in to comment.