-
Notifications
You must be signed in to change notification settings - Fork 84
/
cleanup-all.py
110 lines (87 loc) · 3.23 KB
/
cleanup-all.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import os
import re
from pinecone import Pinecone
from datetime import datetime, timedelta
import time
def delete_index(pc, index_name):
try:
print("Deleting index: " + index_name)
desc = pc.describe_index(index_name)
# Check whether index can be deleted
if desc.deletion_protection == "enabled":
pc.configure_index(index_name, deletion_protection="disabled")
# Wait for index to be ready before deleting
ready_to_delete = False
max_wait = 60
time_waited = 0
while not ready_to_delete:
desc = pc.describe_index(index_name)
if desc.status.state == "Ready":
ready_to_delete = True
break
else:
print("Index is not ready yet. Waiting for 2 seconds.")
time.sleep(2)
time_waited += 2
if time_waited > max_wait:
print(f"Timed out waiting for index {index_name} to be ready")
break
pc.delete_index(index_name)
except Exception as e:
print("Failed to delete index: " + index_name + " " + str(e))
pass
def delete_everything(pc):
for collection in pc.list_collections().names():
try:
print("Deleting collection: " + collection)
pc.delete_collection(collection)
except Exception as e:
print("Failed to delete collection: " + collection + " " + str(e))
pass
for index in pc.list_indexes().names():
delete_index(pc, index)
def parse_date(resource_name):
match = re.search(r"-\d{8}-", resource_name)
if match:
date_string = match.group(0).strip("-")
return datetime.strptime(date_string, "%Y%m%d")
else:
return None
def is_resource_old(resource_name):
print(f"Checking resource name: {resource_name}")
resource_datetime = parse_date(resource_name)
if resource_datetime is None:
return False
current_time = datetime.now()
# Calculate the difference
time_difference = current_time - resource_datetime
# Check if the time difference is greater than 24 hours
print(f"Resource timestamp: {resource_datetime}")
print(f"Time difference: {time_difference}")
return time_difference > timedelta(hours=24)
def delete_old(pc):
for collection in pc.list_collections().names():
if is_resource_old(collection):
try:
print("Deleting collection: " + collection)
pc.delete_collection(collection)
except Exception as e:
print("Failed to delete collection: " + collection + " " + str(e))
pass
else:
print("Skipping collection, not old enough: " + collection)
for index in pc.list_indexes().names():
if is_resource_old(index):
delete_index(pc, index)
else:
print("Skipping index, not old enough: " + index)
def main():
pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY", None))
if os.environ.get("DELETE_ALL", None) == "true":
print("Deleting everything")
delete_everything(pc)
else:
print("Deleting old resources")
delete_old(pc)
if __name__ == "__main__":
main()