From 93f94dae1df62708f4ba9816aed2cff78ba3393a Mon Sep 17 00:00:00 2001 From: David Brochart Date: Mon, 8 Aug 2022 18:44:54 +0200 Subject: [PATCH] Support global awareness --- plugins/jupyterlab/setup.cfg | 2 +- plugins/yjs/fps_yjs/routes.py | 63 +++++++++++++++++++++++++---------- plugins/yjs/setup.cfg | 2 +- 3 files changed, 47 insertions(+), 20 deletions(-) diff --git a/plugins/jupyterlab/setup.cfg b/plugins/jupyterlab/setup.cfg index 28aa734c..e5d8496b 100644 --- a/plugins/jupyterlab/setup.cfg +++ b/plugins/jupyterlab/setup.cfg @@ -23,7 +23,7 @@ install_requires = fps >=0.0.8 fps-auth fps-lab - jupyterlab >=4.0.0a26 + jupyterlab >=4.0.0a27 [options.entry_points] fps_router = diff --git a/plugins/yjs/fps_yjs/routes.py b/plugins/yjs/fps_yjs/routes.py index 71cc50b6..be7e80c5 100644 --- a/plugins/yjs/fps_yjs/routes.py +++ b/plugins/yjs/fps_yjs/routes.py @@ -19,6 +19,7 @@ from ypy_websocket.ystore import BaseYStore, SQLiteYStore, YDocNotFound # type: ignore YFILE = YDOCS["file"] +AWARENESS = 1 RENAME_SESSION = 127 @@ -41,11 +42,9 @@ def to_datetime(iso_date: str) -> datetime: return datetime.fromisoformat(iso_date.rstrip("Z")) -@router.websocket("/api/yjs/{format}:{type}:{path:path}") +@router.websocket("/api/yjs/{path:path}") async def websocket_endpoint( websocket: WebSocket, - format, - type, path, auth_config=Depends(get_auth_config), user_manager: UserManager = Depends(get_user_manager), @@ -60,8 +59,7 @@ async def websocket_endpoint( accept_websocket = True if accept_websocket: await websocket.accept() - full_path = f"{format}:{type}:{path}" - socket = YDocWebSocketHandler(WebsocketAdapter(websocket, full_path), full_path) + socket = YDocWebSocketHandler(WebsocketAdapter(websocket, path), path) await socket.serve() else: await websocket.close(code=status.WS_1008_POLICY_VIOLATION) @@ -99,7 +97,11 @@ async def recv(self): return await self._websocket.receive_bytes() -class JupyterRoom(YRoom): +class DocumentRoom(YRoom): + """A Y room for a possibly stored document (e.g. a notebook).""" + + is_transient = False + def __init__(self, type: str, ystore: BaseYStore): super().__init__(ready=False, ystore=ystore) self.type = type @@ -108,14 +110,25 @@ def __init__(self, type: str, ystore: BaseYStore): self.document = YDOCS.get(type, YFILE)(self.ydoc) +class TransientRoom(YRoom): + """A Y room for sharing state (e.g. awareness).""" + + is_transient = True + + class JupyterWebsocketServer(WebsocketServer): - def get_room(self, path: str) -> JupyterRoom: - file_format, file_type, file_path = path.split(":", 2) + def get_room(self, path: str) -> YRoom: if path not in self.rooms.keys(): - p = Path(file_path) - updates_file_path = str(p.parent / f".{file_type}:{p.name}.y") - ystore = JupyterSQLiteYStore(path=updates_file_path) # FIXME: pass in config - self.rooms[path] = JupyterRoom(file_type, ystore) + if path.count(":") >= 2: + # it is a stored document (e.g. a notebook) + file_format, file_type, file_path = path.split(":", 2) + p = Path(file_path) + updates_file_path = str(p.parent / f".{file_type}:{p.name}.y") + ystore = JupyterSQLiteYStore(path=updates_file_path) # FIXME: pass in config + self.rooms[path] = DocumentRoom(file_type, ystore) + else: + # it is a transient document (e.g. awareness) + self.rooms[path] = TransientRoom() return self.rooms[path] @@ -144,10 +157,10 @@ async def serve(self): self.room.on_message = self.on_message # cancel the deletion of the room if it was scheduled - if self.room.cleaner is not None: + if not self.room.is_transient and self.room.cleaner is not None: self.room.cleaner.cancel() - if not self.room.ready: + if not self.room.is_transient and not self.room.ready: file_format, file_type, file_path = self.get_file_info() is_notebook = file_type == "notebook" model = await read_content(file_path, True, as_json=is_notebook) @@ -177,18 +190,32 @@ async def serve(self): self.room.document.observe(self.on_document_change) await self.websocket_server.serve(self.websocket) - if self.room.clients == [self.websocket]: + if not self.room.is_transient and self.room.clients == [self.websocket]: # no client in this room after we disconnect # keep the document for a while in case someone reconnects self.room.cleaner = asyncio.create_task(self.clean_room()) - async def on_message(self, message): - if message[0] == RENAME_SESSION: + async def on_message(self, message: bytes) -> Optional[bool]: + """ + Called whenever a message is received, before forwarding it to other clients. + + :param message: received message + :returns: True if the message must be discarded, False otherwise (default: False). + """ + byte = message[0] + msg = message[1:] + if byte == AWARENESS: + skip = False + # changes = self.room.awareness.get_changes(msg) + # filter out message depending on changes + if skip: + return skip + if byte == RENAME_SESSION: # The client moved the document to a different location. After receiving this message, # we make the current document available under a different url. # The other clients are automatically notified of this change because # the path is shared through the Yjs document as well. - new_room_name = message[1:].decode("utf-8") + new_room_name = msg.decode("utf-8") self.set_file_info(new_room_name) self.websocket_server.rename_room(new_room_name, from_room=self.room) # send rename acknowledge diff --git a/plugins/yjs/setup.cfg b/plugins/yjs/setup.cfg index 688ddf6d..85785cb3 100644 --- a/plugins/yjs/setup.cfg +++ b/plugins/yjs/setup.cfg @@ -24,7 +24,7 @@ install_requires = fps-auth fps-contents jupyter_ydoc >=0.1.16,<0.2.0 - ypy-websocket >=0.3.1,<0.4.0 + ypy-websocket >=0.3.2,<0.4.0 [options.entry_points] fps_router =