Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support global awareness #202

Merged
merged 1 commit into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugins/jupyterlab/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ install_requires =
fps >=0.0.8
fps-auth
fps-lab
jupyterlab >=4.0.0a26
jupyterlab >=4.0.0a27

[options.entry_points]
fps_router =
Expand Down
64 changes: 46 additions & 18 deletions plugins/yjs/fps_yjs/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ypy_websocket.ystore import BaseYStore, SQLiteYStore, YDocNotFound # type: ignore

YFILE = YDOCS["file"]
AWARENESS = 1
RENAME_SESSION = 127


Expand All @@ -41,11 +42,9 @@ def to_datetime(iso_date: str) -> datetime:
return datetime.fromisoformat(iso_date.rstrip("Z"))


@router.websocket("/api/yjs/{format}:{type}:{path:path}")
@router.websocket("/api/yjs/{path:path}")
async def websocket_endpoint(
websocket: WebSocket,
format,
type,
path,
auth_config=Depends(get_auth_config),
user_manager: UserManager = Depends(get_user_manager),
Expand All @@ -60,8 +59,7 @@ async def websocket_endpoint(
accept_websocket = True
if accept_websocket:
await websocket.accept()
full_path = f"{format}:{type}:{path}"
socket = YDocWebSocketHandler(WebsocketAdapter(websocket, full_path), full_path)
socket = YDocWebSocketHandler(WebsocketAdapter(websocket, path), path)
await socket.serve()
else:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
Expand Down Expand Up @@ -99,7 +97,11 @@ async def recv(self):
return await self._websocket.receive_bytes()


class JupyterRoom(YRoom):
class DocumentRoom(YRoom):
"""A Y room for a possibly stored document (e.g. a notebook)."""

is_transient = False

def __init__(self, type: str, ystore: BaseYStore):
super().__init__(ready=False, ystore=ystore)
self.type = type
Expand All @@ -108,14 +110,25 @@ def __init__(self, type: str, ystore: BaseYStore):
self.document = YDOCS.get(type, YFILE)(self.ydoc)


class TransientRoom(YRoom):
"""A Y room for sharing state (e.g. awareness)."""

is_transient = True


class JupyterWebsocketServer(WebsocketServer):
def get_room(self, path: str) -> JupyterRoom:
file_format, file_type, file_path = path.split(":", 2)
def get_room(self, path: str) -> YRoom:
if path not in self.rooms.keys():
p = Path(file_path)
updates_file_path = str(p.parent / f".{file_type}:{p.name}.y")
ystore = JupyterSQLiteYStore(path=updates_file_path) # FIXME: pass in config
self.rooms[path] = JupyterRoom(file_type, ystore)
if path.count(":") >= 2:
# it is a stored document (e.g. a notebook)
file_format, file_type, file_path = path.split(":", 2)
p = Path(file_path)
updates_file_path = str(p.parent / f".{file_type}:{p.name}.y")
ystore = JupyterSQLiteYStore(path=updates_file_path) # FIXME: pass in config
self.rooms[path] = DocumentRoom(file_type, ystore)
else:
# it is a transient document (e.g. awareness)
self.rooms[path] = TransientRoom()
return self.rooms[path]


Expand Down Expand Up @@ -144,10 +157,10 @@ async def serve(self):
self.room.on_message = self.on_message

# cancel the deletion of the room if it was scheduled
if self.room.cleaner is not None:
if not self.room.is_transient and self.room.cleaner is not None:
self.room.cleaner.cancel()

if not self.room.ready:
if not self.room.is_transient and not self.room.ready:
file_format, file_type, file_path = self.get_file_info()
is_notebook = file_type == "notebook"
model = await read_content(file_path, True, as_json=is_notebook)
Expand Down Expand Up @@ -177,22 +190,37 @@ async def serve(self):
self.room.document.observe(self.on_document_change)

await self.websocket_server.serve(self.websocket)
if self.room.clients == [self.websocket]:
if not self.room.is_transient and self.room.clients == [self.websocket]:
# no client in this room after we disconnect
# keep the document for a while in case someone reconnects
self.room.cleaner = asyncio.create_task(self.clean_room())

async def on_message(self, message):
if message[0] == RENAME_SESSION:
async def on_message(self, message: bytes) -> bool:
"""
Called whenever a message is received, before forwarding it to other clients.

:param message: received message
:returns: True if the message must be discarded, False otherwise (default: False).
"""
byte = message[0]
msg = message[1:]
if byte == AWARENESS:
skip = False
# changes = self.room.awareness.get_changes(msg)
# filter out message depending on changes
if skip:
return True
if byte == RENAME_SESSION:
# The client moved the document to a different location. After receiving this message,
# we make the current document available under a different url.
# The other clients are automatically notified of this change because
# the path is shared through the Yjs document as well.
new_room_name = message[1:].decode("utf-8")
new_room_name = msg.decode("utf-8")
self.set_file_info(new_room_name)
self.websocket_server.rename_room(new_room_name, from_room=self.room)
# send rename acknowledge
await self.websocket.send(bytes([RENAME_SESSION, 1]))
return False

async def watch_file(self):
poll_interval = 1 # FIXME: pass in config
Expand Down
2 changes: 1 addition & 1 deletion plugins/yjs/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ install_requires =
fps-auth
fps-contents
jupyter_ydoc >=0.1.16,<0.2.0
ypy-websocket >=0.3.1,<0.4.0
ypy-websocket >=0.3.2,<0.4.0

[options.entry_points]
fps_router =
Expand Down