-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathscan_for_updates.py
executable file
·184 lines (148 loc) · 5.05 KB
/
scan_for_updates.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/env python3
#############################################################################
##
## This file is part of GAP, a system for computational discrete algebra.
##
## Copyright of GAP belongs to its developers, whose names are too numerous
## to list here. Please refer to the COPYRIGHT file for details.
##
## SPDX-License-Identifier: GPL-2.0-or-later
##
# pylint: disable=C0116, C0103
"""
This script is intended to iterate through package metadata in the repo:
https://github.com/gap-system/PackageDistro
and do the following:
TODO
"""
import hashlib
import json
import os
import subprocess
from os.path import join
import requests
from accepts import accepts
from utils import error, notice, warning
from download_packages import download_archive, metadata
@accepts(str)
def skip(string: str) -> bool:
return (
string.startswith(".")
or string.startswith("_")
or string == "README.md"
)
@accepts(str)
def sha256(fname: str) -> str:
hash_archive = hashlib.sha256()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(1024), b""):
hash_archive.update(chunk)
return hash_archive.hexdigest()
@accepts(str)
def download_pkg_info(pkg_name: str) -> str:
pkg_json = metadata(pkg_name)
url = pkg_json["PackageInfoURL"]
response = requests.get(url)
if response.status_code != 200:
warning(
"error trying to download {}, status code {}, skipping!".format(
url, response.status_code
)
)
return False
response.encoding = "utf-8"
return response.text.encode("utf-8")
@accepts(str, str)
def gap_exec(commands: str, gap="gap") -> int:
with subprocess.Popen(
r'echo "{}"'.format(commands),
stdout=subprocess.PIPE,
shell=True,
) as cmd:
with subprocess.Popen(
gap,
stdin=cmd.stdout,
shell=True,
stdout=subprocess.DEVNULL,
) as GAP:
GAP.wait()
return GAP.returncode
@accepts(str, str)
def scan_for_one_update(pkginfos_dir: str, pkg_name: str) -> None:
pkg_json = metadata(pkg_name)
try:
hash_distro = pkg_json["PackageInfoSHA256"]
except KeyError:
notice(pkg_name + ': missing key "PackageInfoSHA256"')
hash_distro = 0
pkg_info = download_pkg_info(pkg_name)
if not pkg_info:
return
hash_url = hashlib.sha256(pkg_info).hexdigest()
if hash_url != hash_distro:
notice(pkg_name + ": detected different sha256 hash of PackageInfo.g")
with open(join(pkginfos_dir, pkg_name + ".g"), "wb") as f:
f.write(pkg_info)
@accepts(str)
def scan_for_updates(pkginfos_dir: str) -> None:
if not os.path.exists(pkginfos_dir):
os.mkdir(pkginfos_dir)
assert os.path.isdir(pkginfos_dir)
for pkgname in sorted(os.listdir(os.getcwd())):
if not skip(pkgname) and os.path.isdir(pkgname):
scan_for_one_update(pkginfos_dir, pkgname)
@accepts(str)
def output_json(pkginfos_dir: str) -> None:
dir_of_this_file = os.path.dirname(os.path.realpath(__file__))
if (
gap_exec(
r"OutputJson(\"{}\");".format(pkginfos_dir),
gap="gap {}/scan_for_updates.g".format(dir_of_this_file),
)
!= 0
):
error("Something went wrong")
@accepts(str, str)
def download_all_archives(archive_dir: str, pkginfos_dir: str) -> dict:
if not os.path.exists(archive_dir):
os.mkdir(archive_dir)
assert os.path.isdir(archive_dir)
archive_name_lookup = {}
for pkginfo in sorted(os.listdir(pkginfos_dir)):
if skip(pkginfo):
continue
pkgname = pkginfo.split(".")[0]
archive_name_lookup[pkgname] = download_archive(archive_dir, pkgname)
return archive_name_lookup
@accepts(str, dict)
def add_sha256_to_json(pkginfos_dir: str, archive_name_lookup: dict) -> None:
for pkgname in sorted(os.listdir(pkginfos_dir)):
if skip(pkgname):
continue
pkgname = pkgname.split(".")[0]
pkg_json_file = "{}/meta.json".format(pkgname)
try:
pkg_archive = archive_name_lookup[pkgname]
except KeyError:
notice("Could not locate archive for " + pkgname)
continue
pkg_json = {}
with open(pkg_json_file, "rb") as f:
pkg_json = json.load(f)
pkg_json["PackageInfoSHA256"] = sha256(
join(pkginfos_dir, pkgname + ".g")
)
pkg_json["ArchiveSHA256"] = sha256(pkg_archive)
notice("{0}: writing updated {0}/meta.json".format(pkgname))
with open(pkg_json_file, "w", encoding="utf-8") as f:
json.dump(pkg_json, f, indent=2, ensure_ascii=False, sort_keys=True)
f.write("\n")
def main():
archive_dir = "_archives"
pkginfos_dir = "_pkginfos"
scan_for_updates(pkginfos_dir)
output_json(pkginfos_dir)
archive_name_lookup = download_all_archives(archive_dir, pkginfos_dir)
add_sha256_to_json(pkginfos_dir, archive_name_lookup)
if __name__ == "__main__":
main()