Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor tasks cleanup #160

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions jupyter_collaboration/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,5 @@ def initialize_handlers(self):

async def stop_extension(self):
# Cancel tasks and clean up
await asyncio.wait(
[
asyncio.create_task(self.ywebsocket_server.clean()),
asyncio.create_task(self.file_loaders.clear()),
],
timeout=3,
)
await self.ywebsocket_server.clean()
await self.file_loaders.clean()
16 changes: 11 additions & 5 deletions jupyter_collaboration/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from .loaders import FileLoaderMapping
from .rooms import DocumentRoom, TransientRoom
from .utils import JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, decode_file_path
from .utils import JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, decode_file_path, cancel_task
from .websocketserver import JupyterWebsocketServer

YFILE = YDOCS["file"]
Expand Down Expand Up @@ -127,7 +127,10 @@ def __aiter__(self):

async def __anext__(self):
# needed to be compatible with WebsocketServer (async for message in websocket)
message = await self._message_queue.get()
try:
message = await self._message_queue.get()
except asyncio.CancelledError:
message = None
if not message:
raise StopAsyncIteration()
return message
Expand Down Expand Up @@ -160,7 +163,7 @@ async def open(self, room_id):

# cancel the deletion of the room if it was scheduled
if self.room.cleaner is not None:
self.room.cleaner.cancel()
await cancel_task(self.room.cleaner)

try:
# Initialize the room
Expand Down Expand Up @@ -189,8 +192,11 @@ async def recv(self):
"""
Receive a message from the client.
"""
message = await self._message_queue.get()
return message
try:
message = await self._message_queue.get()
return message
except asyncio.CancelledError:
return None

def on_message(self, message):
"""
Expand Down
16 changes: 6 additions & 10 deletions jupyter_collaboration/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from jupyter_server.utils import ensure_async
from jupyter_server_fileid.manager import BaseFileIdManager

from .utils import OutOfBandChanges
from .utils import OutOfBandChanges, cancel_task


class FileLoader:
Expand Down Expand Up @@ -73,9 +73,7 @@ async def clean(self) -> None:
Stops the watch task.
"""
if self._watcher is not None:
if not self._watcher.cancelled():
self._watcher.cancel()
await self._watcher
await cancel_task(self._watcher)

def observe(
self, id: str, callback: Callable[[str, dict[str, Any]], Coroutine[Any, Any, None]]
Expand Down Expand Up @@ -241,18 +239,16 @@ def __getitem__(self, file_id: str) -> FileLoader:

return file

async def __delitem__(self, file_id: str) -> None:
def __delitem__(self, file_id: str) -> None:
"""Delete a loader for a given file."""
await self.remove(file_id)
self.remove(file_id)

async def clear(self) -> None:
async def clean(self) -> None:
"""Clear all loaders."""
tasks = []
for id in list(self.__dict):
loader = self.__dict.pop(id)
tasks.append(loader.clean())

await asyncio.gather(*tasks)
await loader.clean()

async def remove(self, file_id: str) -> None:
"""Remove the loader for a given file."""
Expand Down
15 changes: 9 additions & 6 deletions jupyter_collaboration/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ypy_websocket.ystore import BaseYStore, YDocNotFound

from .loaders import FileLoader
from .utils import JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, OutOfBandChanges
from .utils import JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, OutOfBandChanges, cancel_task

YFILE = YDOCS["file"]

Expand Down Expand Up @@ -158,20 +158,23 @@ def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = No

self._logger.emit(schema_id=JUPYTER_COLLABORATION_EVENTS_URI, data=data)

def _clean(self) -> None:
async def _clean(self) -> None:
"""
Cleans the rooms.

Cancels the save task and unsubscribes from the file.
"""
super()._clean()
# TODO: Should we cancel or wait ?
if self._saving_document:
self._saving_document.cancel()
await super()._clean()

self._document.unobserve()
self._file.unobserve(self.room_id)

if self.cleaner:
await cancel_task(self.cleaner)

if self._saving_document:
await cancel_task(self._saving_document)

async def _broadcast_updates(self):
# FIXME should be upstreamed
try:
Expand Down
9 changes: 9 additions & 0 deletions jupyter_collaboration/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

import asyncio
import pathlib
from enum import Enum
from typing import Tuple
Expand Down Expand Up @@ -59,3 +60,11 @@ def encode_file_path(format: str, file_type: str, file_id: str) -> str:
path (str): File path.
"""
return f"{format}:{file_type}:{file_id}"


async def cancel_task(task: asyncio.Task):
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
49 changes: 6 additions & 43 deletions jupyter_collaboration/websocketserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from ypy_websocket.websocket_server import WebsocketServer, YRoom
from ypy_websocket.ystore import BaseYStore

from .utils import cancel_task


class RoomNotFound(LookupError):
pass
Expand Down Expand Up @@ -40,46 +42,10 @@ def __init__(
self.monitor_task: asyncio.Task | None = None

async def clean(self):
# TODO: should we wait for any save task?
self.log.info("Deleting all rooms.")
# FIXME some clean up should be upstreamed and the following does not
# prevent hanging stop process - it also requires some thinking about
# should the ystore write action be cancelled; I guess not as it could
# results in corrupted data.
# room_tasks = list()
# for name, room in list(self.rooms.items()):
# for task in room.background_tasks:
# task.cancel() # FIXME should be upstreamed
# room_tasks.append(task)
# if room_tasks:
# _, pending = await asyncio.wait(room_tasks, timeout=3)
# if pending:
# msg = f"{len(pending)} room task(s) are pending."
# self.log.warning(msg)
# self.log.debug("Pending tasks: %r", pending)

tasks = []
for name, room in list(self.rooms.items()):
try:
self.delete_room(name=name)
except Exception as e: # Capture exception as room may be auto clean
msg = f"Failed to delete room {name}"
self.log.debug(msg, exc_info=e)
else:
tasks.append(room._broadcast_task) # FIXME should be upstreamed
await super().clean()

if self.monitor_task is not None:
self.monitor_task.cancel()
tasks.append(self.monitor_task)
for task in self.background_tasks:
task.cancel() # FIXME should be upstreamed
tasks.append(task)

if tasks:
_, pending = await asyncio.wait(tasks, timeout=3)
if pending:
msg = f"{len(pending)} task(s) are pending."
self.log.warning(msg)
self.log.debug("Pending tasks: %r", pending)
await cancel_task(self.monitor_task)

def room_exists(self, path: str) -> bool:
"""
Expand Down Expand Up @@ -139,10 +105,7 @@ async def _monitor(self):
This method runs in a coroutine for debugging purposes.
"""
while True:
try:
await asyncio.sleep(60)
except asyncio.CancelledError:
break
await asyncio.sleep(60)
clients_nb = sum(len(room.clients) for room in self.rooms.values())
self.log.info("Processed %s Y patches in one minute", self.ypatch_nb)
self.log.info("Connected Y users: %s", clients_nb)
Expand Down