Skip to content

Commit

Permalink
Improves autosaving documents (#206)
Browse files Browse the repository at this point in the history
* Improves autosaving documents

* pre-commit
  • Loading branch information
hbcarlos authored Oct 17, 2023
1 parent 3ece53a commit 0a361fa
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 65 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ enable-extensions = G
extend-ignore =
G001, G002, G004, G200, G201, G202,
# black adds spaces around ':'
E203,
E203,E231
per-file-ignores =
# B011: Do not call assert False since python -O removes these calls
# F841 local variable 'foo' is assigned to but never used
Expand Down
2 changes: 1 addition & 1 deletion jupyter_collaboration/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async def open(self, room_id):
self.close(1004, f"File {path} not found.")
else:
self.log.error(f"Error initializing: {path}\n{e!r}", exc_info=e)
self.close(1003, f"Error initializing: {path}. You need to close the document.")
self.close(1005, f"Error initializing: {path}. You need to close the document.")

# Clean up the room and delete the file loader
if self.room is not None and len(self.room.clients) == 0 or self.room.clients == [self]:
Expand Down
2 changes: 1 addition & 1 deletion jupyter_collaboration/rooms/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def handle_msg(self, data: bytes) -> None:

def broadcast_msg(self, msg: bytes) -> None:
for client in self.clients:
self._task_group.start_soon(client.send, msg)
self._task_group.start_soon(client.send, msg) # type: ignore[union-attr]

async def _broadcast_updates(self):
# FIXME should be upstreamed
Expand Down
97 changes: 45 additions & 52 deletions jupyter_collaboration/rooms/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def initialize(self) -> None:
this setter will subscribe for updates on the shared document.
"""
async with self._initialization_lock:
if self.ready: # type: ignore[has-type]
if self.ready:
return

self.log.info("Initializing room %s", self._room_id)
Expand All @@ -88,7 +88,9 @@ async def initialize(self) -> None:
if self.ystore is not None and await self.ystore.exists(self._room_id):
# Load the content from the store
doc = await self.ystore.get(self._room_id)
assert doc
self._session_id = doc["session_id"]

await self.ystore.apply_updates(self._room_id, self.ydoc)
self._emit(
LogLevel.INFO,
Expand Down Expand Up @@ -207,18 +209,7 @@ async def _on_content_change(self, event: str, args: dict[str, Any]) -> None:
if event == "metadata" and (
self._last_modified is None or self._last_modified < args["last_modified"]
):
self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id)
self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room.")

msg_id = str(uuid.uuid4())
self._messages[msg_id] = asyncio.Lock()
await self._outofband_lock.acquire()
data = msg_id.encode()
self.broadcast_msg(
bytes([MessageType.ROOM, RoomMessages.FILE_CHANGED])
+ write_var_uint(len(data))
+ data
)
await self._send_confict_msg()

def _on_document_change(self, target: str, event: Any) -> None:
"""
Expand Down Expand Up @@ -247,34 +238,35 @@ def _on_document_change(self, target: str, event: Any) -> None:

async def _load_document(self) -> None:
try:
model = await self._file.load_content(self._file_format, self._file_type, True)
async with self._update_lock:
model = await self._file.load_content(self._file_format, self._file_type, True)
self._document.source = model["content"]
self._last_modified = model["last_modified"]
self._document.dirty = False

except Exception as e:
msg = f"Error loading content from file: {self._file.path}\n{e!r}"
self.log.error(msg, exc_info=e)
self._emit(LogLevel.ERROR, None, msg)
return None

async with self._update_lock:
self._document.source = model["content"]
self._last_modified = model["last_modified"]
self._document.dirty = False

async def _save_document(self) -> None:
"""
Saves the content of the document to disk.
"""
try:
self.log.info("Saving the content from room %s", self._room_id)
model = await self._file.save_content(
{
"format": self._file_format,
"type": self._file_type,
"last_modified": self._last_modified,
"content": self._document.source,
}
)
self._last_modified = model["last_modified"]

async with self._update_lock:
model = await self._file.save_content(
{
"format": self._file_format,
"type": self._file_type,
"last_modified": self._last_modified,
"content": self._document.source,
}
)
self._last_modified = model["last_modified"]
self._document.dirty = False

self._emit(LogLevel.INFO, "save", "Content saved.")
Expand All @@ -299,40 +291,41 @@ async def _maybe_save_document(self) -> None:
# save after X seconds of inactivity
await asyncio.sleep(self._save_delay)

if self._outofband_lock.locked():
return

try:
self.log.info("Saving the content from room %s", self._room_id)
model = await self._file.maybe_save_content(
{
"format": self._file_format,
"type": self._file_type,
"last_modified": self._last_modified,
"content": self._document.source,
}
)
self._last_modified = model["last_modified"]
async with self._update_lock:
model = await self._file.maybe_save_content(
{
"format": self._file_format,
"type": self._file_type,
"last_modified": self._last_modified,
"content": self._document.source,
}
)
self._last_modified = model["last_modified"]
self._document.dirty = False

self._emit(LogLevel.INFO, "save", "Content saved.")

except OutOfBandChanges:
self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id)
try:
model = await self._file.load_content(self._file_format, self._file_type, True)
except Exception as e:
msg = f"Error loading content from file: {self._file.path}\n{e!r}"
self.log.error(msg, exc_info=e)
self._emit(LogLevel.ERROR, None, msg)
return None

async with self._update_lock:
self._document.source = model["content"]
self._last_modified = model["last_modified"]
self._document.dirty = False

self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes while saving.")
await self._send_confict_msg()

except Exception as e:
msg = f"Error saving file: {self._file.path}\n{e!r}"
self.log.error(msg, exc_info=e)
self._emit(LogLevel.ERROR, None, msg)

async def _send_confict_msg(self) -> None:
self.log.info("Out-of-band changes in room %s", self._room_id)
self._emit(LogLevel.INFO, "overwrite", f"Out-of-band changes in room {self._room_id}")

msg_id = str(uuid.uuid4())
self._messages[msg_id] = asyncio.Lock()
await self._outofband_lock.acquire()
data = msg_id.encode()
self.broadcast_msg(
bytes([MessageType.ROOM, RoomMessages.FILE_CHANGED]) + write_var_uint(len(data)) + data
)
14 changes: 7 additions & 7 deletions jupyter_collaboration/rooms/yroom.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def on_message(self) -> Callable[[bytes], Awaitable[bool] | bool] | None:
return self._on_message

@on_message.setter
def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None):
def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None) -> None:
"""
Arguments:
value: An optional callback to call when a message is received. If the callback returns True, the message is skipped.
Expand All @@ -125,17 +125,17 @@ def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None):
async def _broadcast_updates(self):
async with self._update_receive_stream:
async for update in self._update_receive_stream:
if self._task_group.cancel_scope.cancel_called:
if self._task_group.cancel_scope.cancel_called: # type: ignore[union-attr]
return
# broadcast internal ydoc's update to all clients, that includes changes from the
# clients and changes from the backend (out-of-band changes)
for client in self.clients:
self.log.debug("Sending Y update to client with endpoint: %s", client.path)
message = create_update_message(update)
self._task_group.start_soon(client.send, message)
self._task_group.start_soon(client.send, message) # type: ignore[union-attr]
if self.ystore:
self.log.debug("Writing Y update to YStore")
self._task_group.start_soon(self.ystore.write, client.path, update)
self._task_group.start_soon(self.ystore.write, client.path, update) # type: ignore[union-attr]

async def __aenter__(self) -> YRoom:
if self._task_group is not None:
Expand All @@ -158,7 +158,7 @@ async def __aexit__(self, exc_type, exc_value, exc_tb):
self._task_group = None
return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb)

async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) -> None:
"""Start the room.
Arguments:
Expand All @@ -178,15 +178,15 @@ async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
self._starting = False
task_status.started()

def stop(self):
def stop(self) -> None:
"""Stop the room."""
if self._task_group is None:
raise RuntimeError("YRoom not running")

self._task_group.cancel_scope.cancel()
self._task_group = None

async def serve(self, websocket: Websocket):
async def serve(self, websocket: Websocket) -> None:
"""Serve a client.
Arguments:
Expand Down
6 changes: 3 additions & 3 deletions packages/docprovider/src/yprovider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ export class WebSocketProvider implements IDocumentProvider {
}

private _onConnectionClosed = (event: any): void => {
if (event.code === 1003) {
console.error('Document provider closed:', event.reason);
if (event.code >= 1003 && event.code < 1006) {
console.error('Document provider closed:', event.code, event.reason);

showErrorMessage(this._trans.__('Document session error'), event.reason, [
showErrorMessage(this._trans.__('Document error'), event.reason, [
Dialog.okButton()
]);

Expand Down

0 comments on commit 0a361fa

Please sign in to comment.