Skip to content

Commit

Permalink
Make FileIdManager a singleton, optimize DB accesses, index in the ba…
Browse files Browse the repository at this point in the history
…ckground, add logger, handle modified files
  • Loading branch information
davidbrochart committed Nov 22, 2022
1 parent be25497 commit d661f73
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 30 deletions.
72 changes: 47 additions & 25 deletions plugins/contents/fps_contents/fileid.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import asyncio
import logging
from pathlib import Path
from typing import Callable, Dict, List, Optional
from uuid import uuid4

import aiosqlite
from aiopath import AsyncPath # type: ignore
from fps.logging import get_configured_logger # type: ignore
from watchfiles import Change, awatch


watchfiles_logger = get_configured_logger("watchfiles.main")
watchfiles_logger.setLevel(logging.CRITICAL)
logger = get_configured_logger("contents")


class Watcher:
def __init__(self, path: str) -> None:
self.path = path
Expand All @@ -26,7 +33,15 @@ def notify(self, change):
self._event.set()


class FileIdManager:
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]


class FileIdManager(metaclass=Singleton):

db_path: str
initialized: asyncio.Event
Expand Down Expand Up @@ -69,12 +84,14 @@ async def index(self, path: str) -> Optional[str]:

async def watch_files(self):
async with aiosqlite.connect(self.db_path) as db:
await db.execute("DROP TABLE IF EXISTS fileids")
await db.execute(
"CREATE TABLE IF NOT EXISTS fileids "
"(id TEXT PRIMARY KEY, path TEXT NOT NULL, mtime REAL NOT NULL)"
"CREATE TABLE fileids "
"(id TEXT PRIMARY KEY, path TEXT NOT NULL UNIQUE, mtime REAL NOT NULL)"
)
await db.commit()

# index files
async with aiosqlite.connect(self.db_path) as db:
async for path in AsyncPath().rglob("*"):
idx = uuid4().hex
Expand All @@ -88,21 +105,36 @@ async def watch_files(self):
added_paths = []
for change, changed_path in changes:
# get relative path
changed_path = str(Path(changed_path).relative_to(Path().absolute()))
changed_path = AsyncPath(changed_path).relative_to(Path().absolute())
changed_path_str = str(changed_path)

if change == Change.deleted:
logger.debug("File %s was deleted", changed_path_str)
async with db.execute(
"SELECT * FROM fileids WHERE path = ?", (changed_path,)
"SELECT COUNT(*) FROM fileids WHERE path = ?", (changed_path_str,)
) as cursor:
async for _ in cursor:
break
else:
if not (await cursor.fetchone())[0]:
# path is not indexed, ignore
logger.debug("File %s is not indexed, ignoring", changed_path_str)
continue
# path is indexed
await maybe_rename(db, changed_path, deleted_paths, added_paths, False)
await maybe_rename(db, changed_path_str, deleted_paths, added_paths, False)
elif change == Change.added:
await maybe_rename(db, changed_path, added_paths, deleted_paths, True)
logger.debug("File %s was added", changed_path_str)
await maybe_rename(db, changed_path_str, added_paths, deleted_paths, True)
elif change == Change.modified:
logger.debug("File %s was modified", changed_path_str)
if changed_path_str == self.db_path:
continue
async with db.execute(
"SELECT COUNT(*) FROM fileids WHERE path = ?", (changed_path_str,)
) as cursor:
if not (await cursor.fetchone())[0]:
# path is not indexed, ignore
logger.debug("File %s is not indexed, ignoring", changed_path_str)
continue
mtime = (await changed_path.stat()).st_mtime
await db.execute("UPDATE fileids SET mtime = ? WHERE path = ?", (mtime, changed_path_str))

for path in deleted_paths + added_paths:
await db.execute("DELETE FROM fileids WHERE path = ?", (path,))
Expand All @@ -125,8 +157,8 @@ def watch(self, path: str) -> Watcher:

async def get_mtime(path, db) -> Optional[float]:
if db:
async with db.execute("SELECT * FROM fileids WHERE path = ?", (path,)) as cursor:
async for _, _, mtime in cursor:
async with db.execute("SELECT mtime FROM fileids WHERE path = ?", (path,)) as cursor:
async for mtime, in cursor:
return mtime
# deleted file is not in database, shouldn't happen
return None
Expand Down Expand Up @@ -154,22 +186,12 @@ async def maybe_rename(
path1, path2 = changed_path, other_path
if is_added_path:
path1, path2 = path2, path1
await db.execute("UPDATE fileids SET path = REPLACE(path, ?, ?)", (path1, path2))
logger.debug("File %s was renamed to %s", path1, path2)
await db.execute("UPDATE fileids SET path = ? WHERE path = ?", (path2, path1))
other_paths.remove(other_path)
return
changed_paths.append(changed_path)


FILE_ID_MANAGER: Optional[FileIdManager] = None


def get_file_id_manager() -> FileIdManager:
global FILE_ID_MANAGER
if FILE_ID_MANAGER is None:
FILE_ID_MANAGER = FileIdManager()
assert FILE_ID_MANAGER is not None
return FILE_ID_MANAGER


def get_watch() -> Callable[[str], Watcher]:
return get_file_id_manager().watch
return FileIdManager().watch
1 change: 0 additions & 1 deletion plugins/contents/fps_contents/watchfiles.py

This file was deleted.

14 changes: 10 additions & 4 deletions plugins/yjs/fps_yjs/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from fastapi.responses import PlainTextResponse
from fps.hooks import register_router # type: ignore
from fps_auth_base import User, current_user
from fps_contents.fileid import get_file_id_manager
from fps_contents.fileid import FileIdManager
from fps_contents.routes import read_content, write_content # type: ignore

try:
Expand Down Expand Up @@ -49,6 +49,12 @@ def to_datetime(iso_date: str) -> datetime:
return datetime.fromisoformat(iso_date.rstrip("Z"))


@router.on_event("startup")
async def startup():
# start indexing in the background
FileIdManager()


@router.websocket("/api/yjs/{path:path}")
async def websocket_endpoint(
path,
Expand Down Expand Up @@ -143,7 +149,7 @@ def __init__(self, websocket, path, permissions):
async def get_file_info(self) -> Tuple[str, str, str]:
room_name = self.websocket_server.get_room_name(self.room)
file_format, file_type, file_id = room_name.split(":", 2)
file_path = await get_file_id_manager().get_path(file_id)
file_path = await FileIdManager().get_path(file_id)
if file_path is None:
raise RuntimeError(f"File {self.room.document.path} cannot be found anymore")
if file_path != self.room.document.path:
Expand Down Expand Up @@ -323,11 +329,11 @@ async def create_roomid(
# see https://github.com/tiangolo/fastapi/issues/3373#issuecomment-1306003451
create_room_id = CreateRoomId(**(await request.json()))
ws_url = f"{create_room_id.format}:{create_room_id.type}:"
idx = await get_file_id_manager().get_id(path)
idx = await FileIdManager().get_id(path)
if idx is not None:
return ws_url + idx

idx = await get_file_id_manager().index(path)
idx = await FileIdManager().index(path)
if idx is None:
raise HTTPException(status_code=404, detail=f"File {path} does not exist")

Expand Down

0 comments on commit d661f73

Please sign in to comment.