-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrolling_deletion.py
200 lines (162 loc) · 7.01 KB
/
rolling_deletion.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
"""
A (very) dangerous task that you may have to use. This task will delete files that
are older than a certain age, subject to some (optional) constraints:
a) The file must have $N$ remote instances available throughout the network
b) The checksums of those files must match the original checksum
"""
import time
from datetime import datetime, timedelta, timezone
from loguru import logger
from schedule import CancelJob
from sqlalchemy import select
from sqlalchemy.orm import Session
from librarian_server.api.validate import calculate_checksum_of_remote_copies
from librarian_server.database import get_session
from librarian_server.orm import Instance, Librarian, StoreMetadata
from .task import Task
class RollingDeletion(Task):
"""
A background task that deletes _instances_ (not files!) that are older than
a certain age.
"""
store_name: str
"Name of the store to delete instances from"
age_in_days: float
"Age of the instances to delete, in days; older instances will be deleted if they pass the checks"
number_of_remote_copies: int = 3
"Number of remote copies that must be available to delete the file"
verify_downstream_checksums: bool = True
"Whether to verify the checksums of the remote copies"
mark_unavailable: bool = True
"Whether to mark the instances as unavailable after deletion, or to delete them (False)"
force_deletion: bool = True
"Whether to over-ride the deletion policy of instances"
def get_store(self, name: str, session: Session) -> StoreMetadata:
possible_metadata = session.query(StoreMetadata).filter_by(name=name).first()
if not possible_metadata:
raise ValueError(f"Store {name} does not exist.")
return possible_metadata
def on_call(self):
with get_session() as session:
return self.core(session=session)
def core(self, session: Session):
core_begin = datetime.now(timezone.utc)
age_cutoff = core_begin - timedelta(days=self.age_in_days)
try:
store = self.get_store(self.store_name, session)
except ValueError as e:
logger.error("Error getting store: {}, cannot continue; cancelling job", e)
return CancelJob
logger.info(
"Beginning rolling deletion for store {} (ID: {})", store.name, store.id
)
# Get the instances that are older than the age
logger.info(
"Querying for created_times later than {} UTC ({} local)",
age_cutoff,
age_cutoff.astimezone(),
)
query_begin = time.perf_counter()
stmt = select(Instance).filter(
Instance.store_id == store.id,
Instance.created_time < age_cutoff.astimezone(timezone.utc),
Instance.available == True,
)
instances = session.execute(stmt).scalars().all()
query_end = time.perf_counter()
logger.info("Queried for old instances in {} seconds", query_end - query_begin)
logger.info(
"Found {} instances that are older than {} days",
len(instances),
self.age_in_days,
)
deleted = 0
for instance in instances:
# First, see if we've timed out.
if self.soft_timeout is not None:
if (datetime.now(timezone.utc) - core_begin) > self.soft_timeout:
logger.warning(
"Ran out of time in deletion task! Only successfully deleted "
"{n}/{m} instances; we will return later",
n=deleted,
m=len(instances),
)
return False
# Check that we got what we wanted.
valid_time = instance.created_time.replace(tzinfo=timezone.utc) < age_cutoff
valid_store = instance.store_id == store.id
all_ok = valid_time and valid_store and instance.available
if not all_ok:
logger.error(
"Instance {} does not meet the criteria, skipping", instance.id
)
continue
# Check if the file associated with the instance has enough copies.
remote_librarian_ids = {
remote_instance.librarian_id
for remote_instance in instance.file.remote_instances
}
logger.info(
"Calling up {} remote librarians to check for copies",
len(remote_librarian_ids),
)
downstream = []
for librarian_id in remote_librarian_ids:
stmt = select(Librarian).filter(Librarian.id == librarian_id)
librarian = session.execute(stmt).scalar()
if not librarian:
continue
downstream += calculate_checksum_of_remote_copies(
librarian=librarian, file_name=instance.file_name
)
# Now check if we have enough!
if len(downstream) < self.number_of_remote_copies:
logger.warning(
"Instance {} does not have enough remote copies {}/{}, skipping",
instance.id,
len(downstream),
self.number_of_remote_copies,
)
continue
# Now check if the checksums match
if self.verify_downstream_checksums:
for info in downstream:
if not info.computed_same_checksum:
logger.warning(
"Instance {} has a mismatched checksum on {}, skipping",
instance.id,
info.librarian,
)
continue
# If we're here, we can delete the instance.
logger.info(
"Verified that we have the correct number of copies of "
"{instance.id} ({instance.file_name}): {n}/{m}, proceeding to deletion",
instance=instance,
n=len(downstream),
m=self.number_of_remote_copies,
)
try:
instance.delete(
session=session,
commit=True,
force=self.force_deletion,
mark_unavailable=self.mark_unavailable,
)
logger.info("Deleted data for instance {} successfully", instance.id)
deleted += 1
except FileNotFoundError:
logger.error(
"Instance {} does not exist on disk, skipping", instance.id
)
continue
core_end = datetime.now(timezone.utc)
logger.info(
"Finished rolling deletion for store {} (ID: {}) in {} seconds, deleted {}/{} instances",
store.name,
store.id,
(core_end - core_begin).total_seconds(),
deleted,
len(instances),
)
return deleted == len(instances)