This repository has been archived by the owner on Nov 19, 2024. It is now read-only.
forked from dandi/dandi-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paths3-gc-stats
executable file
·184 lines (161 loc) · 5.75 KB
/
s3-gc-stats
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/env python3
# Assumptions made by this code:
# - list_object_versions sorts all versions (including delete markers) by key
# (in ascending order) and last modified date (in descending order, sometimes
# with consecutive equal timestamps) and returns them in chunks of 1000 (by
# default), with each chunk divided into proper versions and delete markers.
# - Objects that have been deleted will have a delete marker as their latest
# version and at least one proper version dated before that.
__requires__ = ["boto3", "click >= 7.0", "humanize"]
from bisect import bisect
from datetime import datetime
import re
import sys
from typing import List, NamedTuple, Tuple
from urllib.parse import urlparse
import boto3
from botocore import UNSIGNED
from botocore.client import Config
import click
from humanize import naturalsize
class Version(NamedTuple):
bucket: str
key: str
version_id: str
size: int
last_modified: datetime
@classmethod
def from_data(cls, bucket, data):
return cls(
bucket=bucket,
key=data["Key"],
version_id=data["VersionId"],
size=data["Size"],
last_modified=data["LastModified"],
)
@property
def key_url(self):
return f"s3://{self.bucket}/{self.key}"
@property
def url(self):
return f"s3://{self.bucket}/{self.key}?versionId={self.version_id}"
def __str__(self):
return f"{self.url} {self.size}"
class BucketStats:
def __init__(self, bucket, prefix, list_files=False, stat=("all",), exclude=()):
self.bucket: str = bucket
self.prefix: str = prefix
self.list_files: bool = list_files
self.stat: Tuple[str, ...] = stat
self.exclude: Tuple[str, ...] = exclude
#: Versions of the current key
self.versions: List[Version] = []
# Deleted keys, in ascending order:
self.deleted: List[str] = []
self.qtys = {
"all": 0,
"visible": 0,
"invisible": 0,
"old": 0,
}
self.sizes = {
"all": 0,
"visible": 0,
"invisible": 0,
"old": 0,
}
self.found_any = False
def run(self):
# Use s3 anonymously/without credentials:
client = boto3.client("s3", config=Config(signature_version=UNSIGNED))
for page in client.get_paginator("list_object_versions").paginate(
Bucket=self.bucket, Prefix=self.prefix
):
# TODO: Filter out keys that ends with slashes?
for dm in page.get("DeleteMarkers", []):
if dm["IsLatest"]:
self.mark_deleted(dm["Key"])
for v in page["Versions"]:
self.add_version(v)
self.end_key()
for rtype in self.stat:
print(
f"{rtype.title()} files: {self.qtys[rtype]}",
"/",
f"Size: {self.sizes[rtype]} ({naturalsize(self.sizes[rtype])})",
)
def add_version(self, data):
v = Version.from_data(self.bucket, data)
if self.versions:
if self.versions[-1].key == v.key:
assert (
self.versions[-1].last_modified >= v.last_modified
), f"Versions for key {v.key!r} not in reverse chronological order"
else:
assert self.versions[-1].key < v.key, (
"Keys not in lexicographic order;"
f" {self.versions[-1].key!r} listed before {v.key!r}"
)
self.end_key()
self.versions.append(v)
def mark_deleted(self, key):
if self.deleted:
assert (
self.deleted[-1] < key
), f"DeleteMarkers not in lexicographic order; {self.deleted[-1]} listed before {key}"
self.deleted.append(key)
def end_key(self):
if self.versions:
key = self.versions[0].key
if self.deleted:
i = bisect(self.deleted, key)
# Error/warn if i>1?
deleted = i > 0 and self.deleted[i - 1] == key
del self.deleted[:i]
else:
deleted = False
if deleted:
self.report({"all", "invisible"}, self.versions)
else:
self.report({"all", "visible"}, self.versions[:1])
self.report({"all", "old", "invisible"}, self.versions[1:])
self.versions = []
def report(self, rtypes, versions):
if not versions:
return
if any(re.search(rgx, versions[0].key_url) for rgx in self.exclude):
return
if rtypes.intersection(self.stat):
for v in versions:
self.found_any = True
if self.list_files:
print(v)
for t in rtypes:
self.qtys[t] += 1
self.sizes[t] += v.size
@click.command()
@click.option("--exclude", metavar="URLREGEX", multiple=True)
@click.option("--fail-if-any", is_flag=True)
@click.option("--list", "list_files", is_flag=True)
@click.option(
"--stat",
type=click.Choice(["all", "visible", "invisible", "old"]),
multiple=True,
default=["all"],
)
@click.argument("url")
def main(stat, list_files, fail_if_any, url, exclude):
bucket, prefix = parse_s3_url(url)
stats = BucketStats(
bucket, prefix, list_files=list_files, stat=stat, exclude=exclude
)
stats.run()
if fail_if_any and stats.found_any:
sys.exit(1)
def parse_s3_url(url):
parts = urlparse(url, allow_fragments=False)
if parts.scheme != "s3":
raise ValueError(f"not an S3 URL: {url}")
return (parts.netloc, parts.path.lstrip("/"))
if __name__ == "__main__":
main()