From 5108312b8962a9a3442a33d9091d4f4837b435f1 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 23 Oct 2024 17:12:04 +0200 Subject: [PATCH] Add forking API --- .../jupyter_server_ydoc/app.py | 8 +++ .../jupyter_server_ydoc/handlers.py | 63 +++++++++++++++++++ .../jupyter_server_ydoc/pytest_plugin.py | 36 +++++++++++ tests/test_handlers.py | 58 +++++++++++++++++ 4 files changed, 165 insertions(+) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py index e286cc13..3efc6e31 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py @@ -14,6 +14,7 @@ from traitlets import Bool, Float, Type from .handlers import ( + DocForkHandler, DocSessionHandler, TimelineHandler, UndoRedoHandler, @@ -123,6 +124,13 @@ def initialize_handlers(self): self.handlers.extend( [ + ( + r"/api/collaboration/fork/(.*)", + DocForkHandler, + { + "ywebsocket_server": self.ywebsocket_server, + }, + ), ( r"/api/collaboration/room/(.*)", YDocWebSocketHandler, diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index 586e4dc1..3003361a 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -39,6 +39,7 @@ SERVER_SESSION = str(uuid.uuid4()) FORK_DOCUMENTS = {} +FORK_ROOMS = {} class YDocWebSocketHandler(WebSocketHandler, JupyterHandler): @@ -600,3 +601,65 @@ async def _cleanup_undo_manager(self, room_id: str) -> None: if room_id in FORK_DOCUMENTS: del FORK_DOCUMENTS[room_id] self.log.info(f"Fork Document for {room_id} has been removed.") + + +class DocForkHandler(APIHandler): + """ + Jupyter Server handler to: + - create a fork of a document (optionally synchronizing with the root document), + - delete a fork of a document (optionally merging in the root document). + """ + + auth_resource = "contents" + + def initialize( + self, + ywebsocket_server: JupyterWebsocketServer, + ) -> None: + self._websocket_server = ywebsocket_server + + @web.authenticated + @authorized + async def put(self, root_roomid): + """ + Creates a fork of a root document and returns its ID. + Optionally keeps the fork in sync with the root. + """ + fork_roomid = uuid4().hex + FORK_ROOMS[fork_roomid] = root_roomid + root_room = await self._websocket_server.get_room(root_roomid) + update = root_room.ydoc.get_update() + fork_ydoc = Doc() + fork_ydoc.apply_update(update) + model = self.get_json_body() + if model.get("sync"): + root_room.ydoc.observe(lambda event: fork_ydoc.apply_update(event.update)) + fork_room = YRoom(ydoc=fork_ydoc) + self._websocket_server.rooms[fork_roomid] = fork_room + await self._websocket_server.start_room(fork_room) + data = json.dumps( + { + "sessionId": SERVER_SESSION, + "roomId": fork_roomid, + } + ) + self.set_status(201) + return self.finish(data) + + @web.authenticated + @authorized + async def delete(self, fork_roomid): + """ + Deletes a forked document, and optionally merges it back in the root document. + """ + root_roomid = FORK_ROOMS[fork_roomid] + del FORK_ROOMS[fork_roomid] + if int(self.get_query_argument("merge")): + root_room = await self._websocket_server.get_room(root_roomid) + root_ydoc = root_room.ydoc + fork_room = await self._websocket_server.get_room(fork_roomid) + fork_ydoc = fork_room.ydoc + fork_update = fork_ydoc.get_update() + root_ydoc.apply_update(fork_update) + await self._websocket_server.delete_room(name=fork_roomid) + self.set_status(200) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/pytest_plugin.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/pytest_plugin.py index efdac129..8070430f 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/pytest_plugin.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/pytest_plugin.py @@ -145,6 +145,42 @@ async def _inner(format: str, type: str, path: str) -> Any: return _inner +@pytest.fixture +def rtc_connect_fork_client(jp_http_port, jp_base_url, rtc_fetch_session): + async def _inner(room_id: str) -> Any: + return connect( + f"ws://127.0.0.1:{jp_http_port}{jp_base_url}api/collaboration/room/{room_id}" + ) + + return _inner + + +@pytest.fixture +def rtc_create_fork_client(jp_fetch): + async def _inner(root_roomid: str, sync: bool) -> Any: + return await jp_fetch( + "/api/collaboration/fork", + root_roomid, + method="PUT", + body=json.dumps({"sync": sync}), + ) + + return _inner + + +@pytest.fixture +def rtc_delete_fork_client(jp_fetch): + async def _inner(fork_roomid: str, merge: int) -> Any: + return await jp_fetch( + "/api/collaboration/fork", + fork_roomid, + method="DELETE", + params={"merge": merge}, + ) + + return _inner + + @pytest.fixture def rtc_add_doc_to_store(rtc_connect_doc_client): event = Event() diff --git a/tests/test_handlers.py b/tests/test_handlers.py index 96aaded5..050e9d9e 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -9,6 +9,7 @@ from jupyter_events.logger import EventLogger from jupyter_ydoc import YUnicode +from pycrdt import Text from pycrdt_websocket import WebsocketProvider @@ -215,3 +216,60 @@ async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None: await jp_serverapp.web_app.settings["jupyter_server_ydoc"].stop_extension() del jp_serverapp.web_app.settings["file_id_manager"] + + +async def test_fork_handler( + rtc_create_file, + rtc_connect_doc_client, + rtc_connect_fork_client, + rtc_create_fork_client, + rtc_delete_fork_client, + rtc_fetch_session, +): + path, _ = await rtc_create_file("test.txt", "Hello") + + root_connect_event = Event() + + def _on_root_change(topic: str, event: Any) -> None: + if topic == "source": + root_connect_event.set() + + root_ydoc = YUnicode() + root_ydoc.observe(_on_root_change) + + resp = await rtc_fetch_session("text", "file", path) + data = json.loads(resp.body.decode("utf-8")) + file_id = data["fileId"] + + async with await rtc_connect_doc_client("text", "file", path) as ws, WebsocketProvider( + root_ydoc.ydoc, ws + ): + await root_connect_event.wait() + resp = await rtc_create_fork_client(f"text:file:{file_id}", True) + data = json.loads(resp.body.decode("utf-8")) + fork_roomid = data["roomId"] + fork_ydoc = YUnicode() + fork_connect_event = Event() + + def _on_fork_change(topic: str, event: Any) -> None: + if topic == "source": + fork_connect_event.set() + + fork_ydoc.observe(_on_fork_change) + fork_text = fork_ydoc.ydoc.get("source", type=Text) + + async with await rtc_connect_fork_client(fork_roomid) as ws, WebsocketProvider( + fork_ydoc.ydoc, ws + ): + await fork_connect_event.wait() + root_text = root_ydoc.ydoc.get("source", type=Text) + root_text += ", World!" + await sleep(0.1) + assert str(fork_text) == "Hello, World!" + fork_text += " Hi!" + + await sleep(0.1) + assert str(root_text) == "Hello, World!" + await rtc_delete_fork_client(fork_roomid, 1) + await sleep(0.1) + assert str(root_text) == "Hello, World! Hi!"