Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/weilink/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,8 +684,11 @@ def _prune_locked(self) -> int:
)
deleted += cur.rowcount

# Always commit to release any implicit transaction started by DELETE,
# even when zero rows were deleted. Without this, Python's sqlite3
# module leaves an open write transaction that blocks other connections.
self._conn.commit()
if deleted:
self._conn.commit()
logger.debug("Pruned %d old message(s)", deleted)

return deleted
Expand Down
77 changes: 77 additions & 0 deletions tests/test_store_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,80 @@ def test_start_dispatcher_uses_poll_loop_when_free(self, tmp_path: Path) -> None
finally:
wl.stop()
wl.close()

def test_external_writer_with_store_watcher(self, tmp_path: Path) -> None:
"""Simulate MCP server scenario: external process writes to db,
SDK store watcher picks up new messages.
"""
db_path = tmp_path / "messages.db"

wl = WeiLink(
token_path=tmp_path / "token.json",
message_store=db_path,
)
received: list[Message] = []

@wl.on_message
def handler(msg: Message) -> None:
received.append(msg)

# Simulate MCP server holding the poll lock.
external_lock = FileLock(tmp_path / ".poll.lock")
external_lock.lock()
try:
wl.run_background()

# Simulate MCP server writing to the same db via a separate
# MessageStore connection (like a real multi-process scenario).
external_store = MessageStore(db_path)
external_store.store([_make_msg(text="from mcp", message_id=77)])
external_store.close()

# Store watcher should pick it up.
deadline = time.monotonic() + 5.0
while not received and time.monotonic() < deadline:
time.sleep(0.2)

assert len(received) == 1
assert received[0].text == "from mcp"
finally:
wl.stop()
external_lock.unlock()
external_lock.close()
wl.close()

def test_handler_exception_does_not_block_others(self, tmp_path: Path) -> None:
"""A failing handler should not prevent other handlers from running."""
wl = WeiLink(
token_path=tmp_path / "token.json",
message_store=tmp_path / "messages.db",
)
received: list[Message] = []

@wl.on_message
def bad_handler(msg: Message) -> None:
raise ValueError("boom")

@wl.on_message
def good_handler(msg: Message) -> None:
received.append(msg)

external_lock = FileLock(tmp_path / ".poll.lock")
external_lock.lock()
try:
wl.run_background()

assert wl._message_store is not None
wl._message_store.store([_make_msg(text="exception test", message_id=88)])

deadline = time.monotonic() + 5.0
while not received and time.monotonic() < deadline:
time.sleep(0.2)

assert len(received) == 1
assert received[0].text == "exception test"
finally:
wl.stop()
external_lock.unlock()
external_lock.close()
wl.close()
Loading