Skip to content

Commit

Permalink
Add YDrive
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart committed Dec 15, 2023
1 parent 6b496cc commit 02f7104
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 8 deletions.
3 changes: 2 additions & 1 deletion jupyverse_api/jupyverse_api/contents/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union

from fastapi import APIRouter, Depends, Request, Response

Expand All @@ -15,6 +15,7 @@
class FileIdManager(ABC):
stop_watching_files: asyncio.Event
stopped_watching_files: asyncio.Event
Change: Any

@abstractmethod
async def get_path(self, file_id: str) -> str:
Expand Down
23 changes: 17 additions & 6 deletions plugins/contents/fps_contents/fileid.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ class FileIdManager(metaclass=Singleton):
initialized: asyncio.Event
watchers: Dict[str, List[Watcher]]
lock: asyncio.Lock
Change = Change

def __init__(self, db_path: str = ".fileid.db"):
def __init__(self, db_path: str = ".fileid.db", root_dir: str = "."):
self.db_path = db_path
self.root_dir = root_dir
self.initialized = asyncio.Event()
self.watchers = {}
self.watch_files_task = asyncio.create_task(self.watch_files())
Expand Down Expand Up @@ -90,7 +92,7 @@ async def watch_files(self):
# index files
async with self.lock:
async with aiosqlite.connect(self.db_path) as db:
async for path in Path().rglob("*"):
async for path in Path(self.root_dir).rglob("*"):
idx = uuid4().hex
mtime = (await path.stat()).st_mtime
await db.execute(
Expand All @@ -99,14 +101,16 @@ async def watch_files(self):
await db.commit()
self.initialized.set()

async for changes in awatch(".", stop_event=self.stop_watching_files):
async for changes in awatch(self.root_dir, stop_event=self.stop_watching_files):
async with self.lock:
async with aiosqlite.connect(self.db_path) as db:
deleted_paths = set()
added_paths = set()
for change, changed_path in changes:
# get relative path
changed_path = Path(changed_path).relative_to(await Path().absolute())
changed_path = Path(changed_path).relative_to(
await Path(self.root_dir).absolute()
)
changed_path_str = str(changed_path)

if change == Change.deleted:
Expand Down Expand Up @@ -156,9 +160,16 @@ async def watch_files(self):
for change in changes:
changed_path = change[1]
# get relative path
relative_changed_path = str(Path(changed_path).relative_to(await Path().absolute()))
relative_changed_path = Path(changed_path).relative_to(
await Path(self.root_dir).absolute()
)
relative_change = (change[0], relative_changed_path)
for watcher in self.watchers.get(relative_changed_path, []):
all_watchers = []
for path, watchers in self.watchers.items():
p = Path(path)
if p == relative_changed_path or p in relative_changed_path.parents:
all_watchers += watchers
for watcher in all_watchers:
watcher.notify(relative_change)

self.stopped_watching_files.set()
Expand Down
138 changes: 138 additions & 0 deletions plugins/yjs/fps_yjs/ydocs/ydrive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from __future__ import annotations

from contextlib import AsyncExitStack
from functools import partial
from pathlib import Path
from typing import Any, Callable

from anyio import create_task_group
from anyio.abc import TaskGroup
from pycrdt import Doc, Map, MapEvent

from jupyverse_api.auth import User
from jupyverse_api.contents import Contents

from .ybasedoc import YBaseDoc


class YDrive(YBaseDoc):
_starting: bool
_task_group: TaskGroup | None

def __init__(
self,
contents: Contents,
ydoc: Doc | None = None,
root_dir: Path | str | None = None,
):
super().__init__(ydoc)
self._root_dir = Path() if root_dir is None else Path(root_dir)
self._ydoc["content"] = self._ycontent = self._new_dir_content()
self._ycontent.observe_deep(self._callback)
self._user = User()
self._starting = False
self._task_group = None
self._contents = contents
self._watcher = contents.file_id_manager.watch(".")

async def __aenter__(self) -> YDrive:
if self._task_group is not None:
raise RuntimeError("YDrive already running")

async with AsyncExitStack() as exit_stack:
tg = create_task_group()
self._task_group = await exit_stack.enter_async_context(tg)
self._exit_stack = exit_stack.pop_all()

assert self._task_group is not None
self._task_group.start_soon(self._process_file_changes)

return self

async def _process_file_changes(self):
async for change in self._watcher:
change_, path = change
if change_ == self._contents.file_id_manager.Change.deleted:
parent_content = self._get(path.parent)
del parent_content["content"][path.name]

async def __aexit__(self, exc_type, exc_value, exc_tb):
if self._task_group is None:
raise RuntimeError("YDrive not running")

self._task_group.cancel_scope.cancel()
self._task_group = None
return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb)

def _callback(self, events):
for event in events:
if isinstance(event, MapEvent):
current = self._ycontent
for path in event.path:
current = current[path]
for key, val in event.keys.items():
if val.get("action") == "delete":
path = "/".join(event.path[1::2] + [key])
self._task_group.start_soon(self._contents.delete_content, path, self._user)

@property
def version(self) -> str:
return "1.0.0"

def _new_dir_content(self) -> Map:
return Map({"is_dir": True, "content": None})

def _new_file_content(self, size: int) -> Map:
return Map({"is_dir": False, "size": size})

def _get_directory_content(self, path: Path) -> Map:
res = {}
for entry in (self._root_dir / path).iterdir():
if entry.is_dir():
res[entry.name] = self._new_dir_content()
else:
stat = entry.stat()
res[entry.name] = self._new_file_content(
size=stat.st_size,
)
return Map(res)

def _maybe_populate_dir(self, path: Path, content: Map):
if content["content"] is None:
content["content"] = self._get_directory_content(path)

def _get(self, path: Path | str | None = None) -> Map:
path = Path() if path is None else Path(path)
current_content = self._ycontent
self._maybe_populate_dir(path, self._ycontent)
cwd = Path()
last_idx = len(path.parts) - 1
for idx, part in enumerate(path.parts):
try:
current_content = current_content["content"][part]
except KeyError:
raise FileNotFoundError(f'No entry "{part}" in "{cwd}".')
if current_content["is_dir"]:
cwd /= part
self._maybe_populate_dir(cwd, current_content)
elif idx < last_idx:
raise RuntimeError(f'Entry "{part}" in "{cwd}" is not a directory.')
return current_content

def get(self, path: Path | str | None = None) -> dict:
return dict(self._get(path))

def delete(self, path: Path | str):
path = Path(path) if isinstance(path, str) else path
if not path.parts:
raise RuntimeError("Cannot delete root directory")
parent_content = self._get(path.parent)
del parent_content["content"][path.name]

def set(self, value) -> None:
raise RuntimeError("Cannot set a YDrive")

def observe(self, callback: Callable[[str, Any], None]) -> None:
self.unobserve()
self._subscriptions[self._ystate] = self._ystate.observe(partial(callback, "state"))
self._subscriptions[self._ycontent] = self._ycontent.observe_deep(partial(callback, "content"))
10 changes: 9 additions & 1 deletion plugins/yjs/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,18 @@ description = "An FPS plugin for the Yjs API"
keywords = [ "jupyter", "server", "fastapi", "plugins" ]
requires-python = ">=3.8"
dependencies = [
"pycrdt >=0.3.4,<0.4.0",
"anyio >=3.6.2,<5",
"pycrdt >=0.7.2,<0.8.0",
"jupyverse-api >=0.1.2,<1",
]
dynamic = [ "version",]

[project.optional-dependencies]
test = [
"pytest",
"fps-contents",
]

[[project.authors]]
name = "Jupyter Development Team"
email = "jupyter@googlegroups.com"
Expand Down
6 changes: 6 additions & 0 deletions plugins/yjs/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import pytest


@pytest.fixture
def anyio_backend():
return "asyncio"
14 changes: 14 additions & 0 deletions plugins/yjs/tests/fake_contents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from anyio import create_memory_object_stream
from anyio.streams.stapled import StapledObjectStream
from fps_contents.fileid import FileIdManager


class Contents:
def __init__(self, db_path, root_dir):
send_stream, recv_stream = create_memory_object_stream[str]()
self.event_stream = StapledObjectStream(send_stream, recv_stream)
self.file_id_manager = FileIdManager(db_path=db_path, root_dir=root_dir)
self.watcher = self.file_id_manager.watch(".")

async def delete_content(self, path, user):
await self.event_stream.send(f"delete {path}")
69 changes: 69 additions & 0 deletions plugins/yjs/tests/test_ydocs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import tempfile
from pathlib import Path

import pytest
from anyio import sleep
from fake_contents import Contents
from fps_yjs.ydocs.ydrive import YDrive


@pytest.mark.anyio
async def test_ydrive():
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_dir = Path(tmp_dir)
(tmp_dir / "file0").write_text(" " * 1)
(tmp_dir / "file1").write_text(" " * 2)
(tmp_dir / "dir0").mkdir()
(tmp_dir / "dir0" / "file2").write_text(" " * 3)
(tmp_dir / "dir1").mkdir()
(tmp_dir / "dir1" / "dir2").mkdir()
(tmp_dir / "dir1" / "dir2" / "file3").write_text(" " * 4)
(tmp_dir / "dir1" / "dir2" / "file4").write_text(" " * 5)

contents = Contents(db_path=str(tmp_dir / ".fileid.db"), root_dir=str(tmp_dir))

async with YDrive(contents=contents, root_dir=tmp_dir) as ydrive:

with pytest.raises(FileNotFoundError):
ydrive.get("doesnt_exist")

root_dir = ydrive.get()
assert len(root_dir["content"]) == 4
assert "file0" in root_dir["content"]
assert "file1" in root_dir["content"]
assert "dir0" in root_dir["content"]
assert "dir1" in root_dir["content"]

dir0 = ydrive.get("dir0")
assert len(dir0["content"]) == 1
assert "file2" in dir0["content"]

dir1 = ydrive.get("dir1")
assert len(dir1["content"]) == 1
assert "dir2" in dir1["content"]

dir2 = ydrive.get("dir1/dir2")
assert len(dir2["content"]) == 2
assert "file3" in dir2["content"]
assert "file4" in dir2["content"]
assert dict(dir1["content"]["dir2"]["content"]["file3"]) == {"is_dir": False, "size": 4}

# the fake contents actually doesn't delete files
path = "file0"
ydrive.delete(path)
assert await contents.event_stream.receive() == f"delete {path}"
path = "dir1/dir2/file3"
ydrive.delete(path)
assert await contents.event_stream.receive() == f"delete {path}"

await contents.file_id_manager.initialized.wait()
await sleep(0.1)
assert "file1" in root_dir["content"]
(tmp_dir / "file1").unlink()
await sleep(0.2)
assert "file1" not in root_dir["content"]

assert "file4" in dir2["content"]
(tmp_dir / "dir1" / "dir2" / "file4").unlink()
await sleep(0.1)
assert "file4" not in dir2["content"]

0 comments on commit 02f7104

Please sign in to comment.