Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rabbithole: progress percentages + bug fixes on URL ingestion #410

Merged
merged 15 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions core/cat/rabbit_hole.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import time
import math
import json
import mimetypes
from typing import List, Union
Expand Down Expand Up @@ -135,6 +136,7 @@ def ingest_file(
filename = file
else:
filename = file.filename

self.store_documents(docs=docs, source=filename)

def file_to_docs(
Expand Down Expand Up @@ -189,7 +191,7 @@ def file_to_docs(
source = file

# Make a request with a fake browser name
request = Request(file, headers={'User-Agent': "Magic Browser"})
request = Request(file, headers={"User-Agent": "Magic Browser"})

try:
# Get binary content of url
Expand All @@ -214,16 +216,36 @@ def file_to_docs(
mimetype=content_type,
source=source).from_data(data=file_bytes,
mime_type=content_type)

# Parser based on the mime type
parser = MimeTypeBasedParser(handlers=self.file_handlers)

# Parse the text
self.send_rabbit_thought("I'm parsing the content. Big content could require some minutes...")
text = parser.parse(blob)

self.send_rabbit_thought(f"Parsing completed. Now let's go with reading process...")
docs = self.split_text(text, chunk_size, chunk_overlap)
return docs

def send_rabbit_thought(self, thought):
"""Append a message to the notification list.

This method receive a string and create the message to append to the list of notifications.

Parameters
----------
thought : str
Text of the message to append to the notification list.
"""

self.cat.web_socket_notifications.append({
"error": False,
"type": "notification",
"content": thought,
"why": {},
})


def store_documents(self, docs: List[Document], source: str) -> None:
"""Add documents to the Cat's declarative memory.

Expand Down Expand Up @@ -255,7 +277,14 @@ def store_documents(self, docs: List[Document], source: str) -> None:
)

# classic embed
time_last_notification = time.time()
time_interval = 10 # a notification every 10 secs
for d, doc in enumerate(docs):
if time.time() - time_last_notification > time_interval:
time_last_notification = time.time()
perc_read = int( d / len(docs) * 100 )
self.send_rabbit_thought(f"Read {perc_read}% of {source}")

doc.metadata["source"] = source
doc.metadata["when"] = time.time()
doc = self.cat.mad_hatter.execute_hook(
Expand All @@ -279,14 +308,8 @@ def store_documents(self, docs: List[Document], source: str) -> None:
# notify client
finished_reading_message = f"Finished reading {source}, " \
f"I made {len(docs)} thoughts on it."
self.cat.web_socket_notifications.append(
{
"error": False,
"type": "notification",
"content": finished_reading_message,
"why": {},
}
)

self.send_rabbit_thought(finished_reading_message)

print(f"\n\nDone uploading {source}")

Expand Down
2 changes: 1 addition & 1 deletion core/cat/routes/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,4 +285,4 @@ async def upsert_plugin_settings(
return {
"name": plugin_id,
"value": final_settings
}
}
7 changes: 6 additions & 1 deletion core/cat/routes/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,14 @@ async def upload_url(
"""Upload a url. Website content will be extracted and segmented into chunks.
Chunks will be then vectorized and stored into documents memory."""
# check that URL is valid

try:
# Send a HEAD request to the specified URL
response = requests.head(url)
response = requests.head(
url,
headers={"User-Agent": "Magic Browser"},
allow_redirects=True
)
status_code = response.status_code

if status_code == 200:
Expand Down
104 changes: 71 additions & 33 deletions core/cat/routes/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,69 +7,107 @@

router = APIRouter()

# This constant sets the interval (in seconds) at which the system checks for notifications.
NOTIFICATION_CHECK_INTERVAL = 1 # seconds


class ConnectionManager:
"""
Manages active WebSocket connections.
"""

def __init__(self):
# List to store all active WebSocket connections.
self.active_connections: list[WebSocket] = []

async def connect(self, websocket: WebSocket):
"""
Accept the incoming WebSocket connection and add it to the active connections list.
"""
await websocket.accept()
self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
"""
Remove the given WebSocket from the active connections list.
"""
self.active_connections.remove(websocket)

async def send_personal_message(self, message: str, websocket: WebSocket):
"""
Send a personal message (in JSON format) to the specified WebSocket.
"""
await websocket.send_json(message)

async def broadcast(self, message: str):
"""
Send a message to all active WebSocket connections.
"""
for connection in self.active_connections:
await connection.send_json(message)


manager = ConnectionManager()

# main loop via websocket
@router.websocket_route("/ws")
async def websocket_endpoint(websocket: WebSocket):
ccat = websocket.app.state.ccat

await manager.connect(websocket)
async def receive_message(websocket: WebSocket, ccat: object):
"""
Continuously receive messages from the WebSocket and forward them to the `ccat` object for processing.
"""
while True:
user_message = await websocket.receive_json()

# Run the `ccat` object's method in a threadpool since it might be a CPU-bound operation.
cat_message = await run_in_threadpool(ccat, user_message)

async def receive_message():
while True:
# message received from specific user
user_message = await websocket.receive_json()
# Send the response message back to the user.
await manager.send_personal_message(cat_message, websocket)

# get response from the cat
cat_message = await run_in_threadpool(ccat, user_message)

# send output to specific user
await manager.send_personal_message(cat_message, websocket)
async def check_notification(websocket: WebSocket, ccat: object):
"""
Periodically check if there are any new notifications from the `ccat` object and send them to the user.
"""
while True:
if ccat.web_socket_notifications:
# extract from FIFO list websocket notification
notification = ccat.web_socket_notifications.pop(0)
await manager.send_personal_message(notification, websocket)

async def check_notification():
while True:
# chat notifications (i.e. finished uploading)
if len(ccat.web_socket_notifications) > 0:
# extract from FIFO list websocket notification
notification = ccat.web_socket_notifications.pop(0)
await manager.send_personal_message(notification, websocket)
# Sleep for the specified interval before checking for notifications again.
await asyncio.sleep(NOTIFICATION_CHECK_INTERVAL)

await asyncio.sleep(1) # wait for 1 seconds before checking again

@router.websocket_route("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""
Endpoint to handle incoming WebSocket connections, process messages, and check for notifications.
"""

# Retrieve the `ccat` instance from the application's state.
ccat = websocket.app.state.ccat

# Add the new WebSocket connection to the manager.
await manager.connect(websocket)

try:
await asyncio.gather(receive_message(), check_notification())
# Process messages and check for notifications concurrently.
await asyncio.gather(
receive_message(websocket, ccat),
check_notification(websocket, ccat)
)
except WebSocketDisconnect:
manager.disconnect(websocket)
# Handle the event where the user disconnects their WebSocket.
log("WebSocket connection closed", "INFO")
except Exception as e:
# Log any unexpected errors and send an error message back to the user.
log(e, "ERROR")
traceback.print_exc()

# send error to specific user
await manager.send_personal_message(
{
"type": "error",
"name": type(e).__name__,
"description": str(e),
},
websocket
)
await manager.send_personal_message({
"type": "error",
"name": type(e).__name__,
"description": str(e),
}, websocket)
finally:
# Always ensure the WebSocket is removed from the manager, regardless of how the above block exits.
manager.disconnect(websocket)
2 changes: 1 addition & 1 deletion core/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = "Cheshire-Cat"
description = "Open source and customizable AI architecture"
version = "1.0.0"
version = "1.0.1"
requires-python = ">=3.10"
license = { file="LICENSE" }
authors = [
Expand Down
Loading