Skip to content

Commit

Permalink
Support global awareness
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart committed Aug 8, 2022
1 parent eaf7f66 commit 93f94da
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 20 deletions.
2 changes: 1 addition & 1 deletion plugins/jupyterlab/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ install_requires =
fps >=0.0.8
fps-auth
fps-lab
jupyterlab >=4.0.0a26
jupyterlab >=4.0.0a27

[options.entry_points]
fps_router =
Expand Down
63 changes: 45 additions & 18 deletions plugins/yjs/fps_yjs/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ypy_websocket.ystore import BaseYStore, SQLiteYStore, YDocNotFound # type: ignore

YFILE = YDOCS["file"]
AWARENESS = 1
RENAME_SESSION = 127


Expand All @@ -41,11 +42,9 @@ def to_datetime(iso_date: str) -> datetime:
return datetime.fromisoformat(iso_date.rstrip("Z"))


@router.websocket("/api/yjs/{format}:{type}:{path:path}")
@router.websocket("/api/yjs/{path:path}")
async def websocket_endpoint(
websocket: WebSocket,
format,
type,
path,
auth_config=Depends(get_auth_config),
user_manager: UserManager = Depends(get_user_manager),
Expand All @@ -60,8 +59,7 @@ async def websocket_endpoint(
accept_websocket = True
if accept_websocket:
await websocket.accept()
full_path = f"{format}:{type}:{path}"
socket = YDocWebSocketHandler(WebsocketAdapter(websocket, full_path), full_path)
socket = YDocWebSocketHandler(WebsocketAdapter(websocket, path), path)
await socket.serve()
else:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
Expand Down Expand Up @@ -99,7 +97,11 @@ async def recv(self):
return await self._websocket.receive_bytes()


class JupyterRoom(YRoom):
class DocumentRoom(YRoom):
"""A Y room for a possibly stored document (e.g. a notebook)."""

is_transient = False

def __init__(self, type: str, ystore: BaseYStore):
super().__init__(ready=False, ystore=ystore)
self.type = type
Expand All @@ -108,14 +110,25 @@ def __init__(self, type: str, ystore: BaseYStore):
self.document = YDOCS.get(type, YFILE)(self.ydoc)


class TransientRoom(YRoom):
"""A Y room for sharing state (e.g. awareness)."""

is_transient = True


class JupyterWebsocketServer(WebsocketServer):
def get_room(self, path: str) -> JupyterRoom:
file_format, file_type, file_path = path.split(":", 2)
def get_room(self, path: str) -> YRoom:
if path not in self.rooms.keys():
p = Path(file_path)
updates_file_path = str(p.parent / f".{file_type}:{p.name}.y")
ystore = JupyterSQLiteYStore(path=updates_file_path) # FIXME: pass in config
self.rooms[path] = JupyterRoom(file_type, ystore)
if path.count(":") >= 2:
# it is a stored document (e.g. a notebook)
file_format, file_type, file_path = path.split(":", 2)
p = Path(file_path)
updates_file_path = str(p.parent / f".{file_type}:{p.name}.y")
ystore = JupyterSQLiteYStore(path=updates_file_path) # FIXME: pass in config
self.rooms[path] = DocumentRoom(file_type, ystore)
else:
# it is a transient document (e.g. awareness)
self.rooms[path] = TransientRoom()
return self.rooms[path]


Expand Down Expand Up @@ -144,10 +157,10 @@ async def serve(self):
self.room.on_message = self.on_message

# cancel the deletion of the room if it was scheduled
if self.room.cleaner is not None:
if not self.room.is_transient and self.room.cleaner is not None:
self.room.cleaner.cancel()

if not self.room.ready:
if not self.room.is_transient and not self.room.ready:
file_format, file_type, file_path = self.get_file_info()
is_notebook = file_type == "notebook"
model = await read_content(file_path, True, as_json=is_notebook)
Expand Down Expand Up @@ -177,18 +190,32 @@ async def serve(self):
self.room.document.observe(self.on_document_change)

await self.websocket_server.serve(self.websocket)
if self.room.clients == [self.websocket]:
if not self.room.is_transient and self.room.clients == [self.websocket]:
# no client in this room after we disconnect
# keep the document for a while in case someone reconnects
self.room.cleaner = asyncio.create_task(self.clean_room())

async def on_message(self, message):
if message[0] == RENAME_SESSION:
async def on_message(self, message: bytes) -> Optional[bool]:
"""
Called whenever a message is received, before forwarding it to other clients.
:param message: received message
:returns: True if the message must be discarded, False otherwise (default: False).
"""
byte = message[0]
msg = message[1:]
if byte == AWARENESS:
skip = False
# changes = self.room.awareness.get_changes(msg)
# filter out message depending on changes
if skip:
return skip
if byte == RENAME_SESSION:
# The client moved the document to a different location. After receiving this message,
# we make the current document available under a different url.
# The other clients are automatically notified of this change because
# the path is shared through the Yjs document as well.
new_room_name = message[1:].decode("utf-8")
new_room_name = msg.decode("utf-8")
self.set_file_info(new_room_name)
self.websocket_server.rename_room(new_room_name, from_room=self.room)
# send rename acknowledge
Expand Down
2 changes: 1 addition & 1 deletion plugins/yjs/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ install_requires =
fps-auth
fps-contents
jupyter_ydoc >=0.1.16,<0.2.0
ypy-websocket >=0.3.1,<0.4.0
ypy-websocket >=0.3.2,<0.4.0

[options.entry_points]
fps_router =
Expand Down

0 comments on commit 93f94da

Please sign in to comment.