-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcopy-to-sha256.py
executable file
·94 lines (73 loc) · 2.73 KB
/
copy-to-sha256.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#!/usr/bin/env python3
import os
import logging
import stat
import argparse
import hashlib
import shutil
import tarfile
HASH_LENGTH = 8
def hash_file(filename) -> str:
with open(filename, "rb", buffering=0) as f:
return hash_fileobj(f)
def hash_fileobj(f) -> str:
h = hashlib.sha256()
for b in iter(lambda: f.read(128*1024), b""):
h.update(b)
return h.hexdigest()
def main():
logging.basicConfig(format="%(message)s")
logger = logging.getLogger("copy")
logger.setLevel(logging.DEBUG)
args = argparse.ArgumentParser(description="...",
formatter_class=argparse.RawTextHelpFormatter)
args.add_argument("from_path", metavar="from", help="from")
args.add_argument("to_path", metavar="to", help="to")
args = args.parse_args()
from_path = os.path.normpath(args.from_path)
to_path = os.path.normpath(args.to_path)
try:
tar = tarfile.open(from_path, "r")
except IsADirectoryError:
tar = None
if tar:
handle_tar(logger, tar, to_path)
else:
handle_dir(logger, from_path, to_path)
def handle_dir(logger, from_path: str, to_path: str):
def onerror(oserror):
logger.warning(oserror)
files = os.walk(from_path, onerror=onerror)
for f in files:
dirpath, dirnames, filenames = f
for filename in filenames:
absname = os.path.join(dirpath, filename)
st = os.lstat(absname)
mode = st.st_mode
assert not stat.S_ISDIR(mode)
if stat.S_ISLNK(mode) or stat.S_ISCHR(mode) or stat.S_ISBLK(mode) or stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode):
continue
file_hash = hash_file(absname)
filename = file_hash[0:HASH_LENGTH] + ".bin"
to_abs = os.path.join(to_path, filename)
if os.path.exists(to_abs):
logger.info("Exists, skipped {} ({})".format(to_abs, absname))
else:
logger.info("cp {} {}".format(absname, to_abs))
shutil.copyfile(absname, to_abs)
def handle_tar(logger, tar, to_path: str):
for member in tar.getmembers():
if member.isfile() or member.islnk():
f = tar.extractfile(member)
file_hash = hash_fileobj(f)
filename = file_hash[0:HASH_LENGTH] + ".bin"
to_abs = os.path.join(to_path, filename)
if os.path.exists(to_abs):
logger.info("Exists, skipped {} ({})".format(to_abs, member.name))
else:
logger.info("Extracted {} ({})".format(to_abs, member.name))
to_file = open(to_abs, "wb")
f.seek(0)
shutil.copyfileobj(f, to_file)
if __name__ == "__main__":
main()