-
Notifications
You must be signed in to change notification settings - Fork 28
/
fileid.py
207 lines (180 loc) · 8.19 KB
/
fileid.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import asyncio
import logging
from typing import Dict, List, Optional
from uuid import uuid4
import aiosqlite
from anyio import Path
from jupyverse_api import Singleton
from watchfiles import Change, awatch
logger = logging.getLogger("contents")
class Watcher:
def __init__(self, path: str) -> None:
self.path = path
self._event = asyncio.Event()
def __aiter__(self):
return self
async def __anext__(self):
await self._event.wait()
self._event.clear()
return self._change
def notify(self, change):
self._change = change
self._event.set()
class FileIdManager(metaclass=Singleton):
db_path: str
initialized: asyncio.Event
watchers: Dict[str, List[Watcher]]
lock: asyncio.Lock
def __init__(self, db_path: str = "fileid.db"):
self.db_path = db_path
self.initialized = asyncio.Event()
self.watchers = {}
self.watch_files_task = asyncio.create_task(self.watch_files())
self.stop_watching_files = asyncio.Event()
self.stopped_watching_files = asyncio.Event()
self.lock = asyncio.Lock()
async def get_id(self, path: str) -> Optional[str]:
await self.initialized.wait()
async with self.lock:
async with aiosqlite.connect(self.db_path) as db:
async with db.execute("SELECT id FROM fileids WHERE path = ?", (path,)) as cursor:
async for idx, in cursor:
return idx
return None
async def get_path(self, idx: str) -> Optional[str]:
await self.initialized.wait()
async with self.lock:
async with aiosqlite.connect(self.db_path) as db:
async with db.execute("SELECT path FROM fileids WHERE id = ?", (idx,)) as cursor:
async for path, in cursor:
return path
return None
async def index(self, path: str) -> Optional[str]:
await self.initialized.wait()
async with self.lock:
async with aiosqlite.connect(self.db_path) as db:
apath = Path(path)
if not await apath.exists():
return None
idx = uuid4().hex
mtime = (await apath.stat()).st_mtime
await db.execute("INSERT INTO fileids VALUES (?, ?, ?)", (idx, path, mtime))
await db.commit()
return idx
async def watch_files(self):
async with self.lock:
async with aiosqlite.connect(self.db_path) as db:
await db.execute("DROP TABLE IF EXISTS fileids")
await db.execute(
"CREATE TABLE fileids "
"(id TEXT PRIMARY KEY, path TEXT NOT NULL UNIQUE, mtime REAL NOT NULL)"
)
await db.commit()
# index files
async with self.lock:
async with aiosqlite.connect(self.db_path) as db:
async for path in Path().rglob("*"):
idx = uuid4().hex
mtime = (await path.stat()).st_mtime
await db.execute(
"INSERT INTO fileids VALUES (?, ?, ?)", (idx, str(path), mtime)
)
await db.commit()
self.initialized.set()
async for changes in awatch(".", stop_event=self.stop_watching_files):
async with self.lock:
async with aiosqlite.connect(self.db_path) as db:
deleted_paths = []
added_paths = []
for change, changed_path in changes:
# get relative path
changed_path = Path(changed_path).relative_to(await Path().absolute())
changed_path_str = str(changed_path)
if change == Change.deleted:
logger.debug("File %s was deleted", changed_path_str)
async with db.execute(
"SELECT COUNT(*) FROM fileids WHERE path = ?", (changed_path_str,)
) as cursor:
if not (await cursor.fetchone())[0]:
# path is not indexed, ignore
logger.debug(
"File %s is not indexed, ignoring", changed_path_str
)
continue
# path is indexed
await maybe_rename(
db, changed_path_str, deleted_paths, added_paths, False
)
elif change == Change.added:
logger.debug("File %s was added", changed_path_str)
await maybe_rename(
db, changed_path_str, added_paths, deleted_paths, True
)
elif change == Change.modified:
logger.debug("File %s was modified", changed_path_str)
if changed_path_str == self.db_path:
continue
async with db.execute(
"SELECT COUNT(*) FROM fileids WHERE path = ?", (changed_path_str,)
) as cursor:
if not (await cursor.fetchone())[0]:
# path is not indexed, ignore
logger.debug(
"File %s is not indexed, ignoring", changed_path_str
)
continue
mtime = (await changed_path.stat()).st_mtime
await db.execute(
"UPDATE fileids SET mtime = ? WHERE path = ?",
(mtime, changed_path_str),
)
for path in deleted_paths + added_paths:
await db.execute("DELETE FROM fileids WHERE path = ?", (path,))
await db.commit()
for change in changes:
changed_path = change[1]
# get relative path
changed_path = str(Path(changed_path).relative_to(await Path().absolute()))
for watcher in self.watchers.get(changed_path, []):
watcher.notify(change)
self.stopped_watching_files.set()
def watch(self, path: str) -> Watcher:
watcher = Watcher(path)
self.watchers.setdefault(path, []).append(watcher)
return watcher
def unwatch(self, path: str, watcher: Watcher):
self.watchers[path].remove(watcher)
async def get_mtime(path, db) -> Optional[float]:
if db:
async with db.execute("SELECT mtime FROM fileids WHERE path = ?", (path,)) as cursor:
async for mtime, in cursor:
return mtime
# deleted file is not in database, shouldn't happen
return None
try:
mtime = (await Path(path).stat()).st_mtime
except FileNotFoundError:
return None
return mtime
async def maybe_rename(
db, changed_path: str, changed_paths: List[str], other_paths: List[str], is_added_path
) -> None:
# check if the same file was added/deleted, this would be a rename
db_or_fs1, db_or_fs2 = db, None
if is_added_path:
db_or_fs1, db_or_fs2 = db_or_fs2, db_or_fs1
mtime1 = await get_mtime(changed_path, db_or_fs1)
if mtime1 is None:
return
for other_path in other_paths:
mtime2 = await get_mtime(other_path, db_or_fs2)
if mtime1 == mtime2:
# same files, according to modification times
path1, path2 = changed_path, other_path
if is_added_path:
path1, path2 = path2, path1
logger.debug("File %s was renamed to %s", path1, path2)
await db.execute("UPDATE fileids SET path = ? WHERE path = ?", (path2, path1))
other_paths.remove(other_path)
return
changed_paths.append(changed_path)