forked from EmergenceAI/Agent-E
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
begon impl for support of api endpoint
- Loading branch information
teaxio
committed
Jul 23, 2024
1 parent
db65443
commit 5862ea0
Showing
7 changed files
with
269 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
from collections.abc import Callable | ||
from queue import Empty | ||
from queue import Queue | ||
|
||
|
||
class NotificationManager: | ||
""" | ||
NotificationManager handles the dispatching of notifications to registered listeners. | ||
Attributes: | ||
notification_queue (Queue): A queue to hold notifications. | ||
listeners (list[Callable[[dict[str, str]], None]]): A list of listener callbacks to notify. | ||
""" | ||
|
||
def __init__(self): | ||
""" | ||
Initialize the NotificationManager with an empty queue and no listeners. | ||
""" | ||
self.notification_queue = Queue() # type: ignore | ||
self.listeners: list[Callable[[dict[str, str]], None]] = [] | ||
|
||
def notify(self, message: str, message_type: str) -> None: | ||
""" | ||
Notify all registered listeners with a message and its type. | ||
Args: | ||
message (str): The message to notify. | ||
message_type (str): The type of the message. | ||
""" | ||
notification = { | ||
"message": message, | ||
"type": message_type, | ||
} | ||
|
||
if self.listeners: | ||
self.notification_queue.put(notification) # type: ignore | ||
for listener in self.listeners: | ||
listener(notification) | ||
else: | ||
print(f"No listeners available, discarding message: {notification}") | ||
|
||
def get_next_notification(self) -> dict[str, str] | None: | ||
""" | ||
Get the next notification from the queue, if available. | ||
Returns: | ||
dict[str, str] | None: The next notification, or None if the queue is empty. | ||
""" | ||
try: | ||
return self.notification_queue.get_nowait() # type: ignore | ||
except Empty: | ||
return None | ||
|
||
def register_listener(self, listener: Callable[[dict[str, str]], None]) -> None: | ||
""" | ||
Register a new listener to receive notifications. | ||
Args: | ||
listener (Callable[[dict[str, str]], None]): The listener callback to register. | ||
""" | ||
self.listeners.append(listener) | ||
|
||
def unregister_listener(self, listener: Callable[[dict[str, str]], None]) -> None: | ||
""" | ||
Unregister a listener from receiving notifications. | ||
Args: | ||
listener (Callable[[dict[str, str]], None]): The listener callback to unregister. | ||
""" | ||
self.listeners.remove(listener) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
import asyncio | ||
import json | ||
import logging | ||
import os | ||
|
||
import uvicorn | ||
from fastapi import FastAPI | ||
from fastapi import Request | ||
from fastapi.middleware.cors import CORSMiddleware | ||
from fastapi.responses import StreamingResponse | ||
from pydantic import BaseModel | ||
from pydantic import Field | ||
|
||
import ae.core.playwright_manager as browserManager | ||
from ae.core.autogen_wrapper import AutogenWrapper | ||
from ae.utils.ui_messagetype import MessageType | ||
|
||
browser_manager = browserManager.PlaywrightManager(headless=False) | ||
|
||
APP_VERSION = "1.0.0" | ||
APP_NAME = "Agent-E Web API" | ||
API_PREFIX = "/api" | ||
IS_DEBUG = False | ||
HOST = os.getenv("HOST", "0.0.0.0") | ||
PORT = int(os.getenv("PORT", 8080)) | ||
WORKERS = 1 | ||
|
||
class CommandQueryModel(BaseModel): | ||
command: str = Field(..., description="The command related to web navigation to execute.") # Required field with description | ||
|
||
|
||
def get_app() -> FastAPI: | ||
'''Starts the Application''' | ||
fast_app = FastAPI( | ||
title=APP_NAME, | ||
version=APP_VERSION, | ||
debug=IS_DEBUG) | ||
|
||
fast_app.add_middleware( | ||
CORSMiddleware, | ||
allow_origins=["*"], | ||
allow_credentials=True, | ||
allow_methods=["*"], | ||
allow_headers=["*"]) | ||
|
||
return fast_app | ||
|
||
app = get_app() | ||
|
||
@app.on_event("startup") # type: ignore | ||
async def startup_event(): | ||
""" | ||
Startup event handler to initialize browser manager asynchronously. | ||
""" | ||
await browser_manager.async_initialize() | ||
|
||
|
||
@app.post("/execute_task", description="Execute a given command related to web navigation and return the result.") | ||
async def execute_task(request: Request, | ||
query_model: CommandQueryModel): | ||
|
||
return StreamingResponse(run_task(query_model.command, browser_manager), media_type='application/json') | ||
|
||
|
||
def run_task(command: str, playwright_manager: browserManager.PlaywrightManager): | ||
""" | ||
Run the task to process the command and generate events. | ||
Args: | ||
command (str): The command to execute. | ||
playwright_manager (PlaywrightManager): The manager handling browser interactions and notifications. | ||
Yields: | ||
str: JSON-encoded string representing a notification. | ||
""" | ||
async def event_generator(): | ||
# Start the process command task | ||
task = asyncio.create_task(process_command(command, playwright_manager)) | ||
|
||
while not task.done(): | ||
notification = playwright_manager.notification_manager.get_next_notification() | ||
if notification: | ||
yield f"{json.dumps(notification)}\n" | ||
await asyncio.sleep(0.1) | ||
|
||
# Once the task is done, yield any remaining notifications | ||
while True: | ||
notification = playwright_manager.notification_manager.get_next_notification() | ||
if notification: | ||
yield f"{json.dumps(notification)}\n" | ||
else: | ||
break | ||
|
||
# Ensure the task is awaited to propagate any exceptions | ||
await task | ||
|
||
return event_generator() | ||
|
||
|
||
async def process_command(command: str, playwright_manager: browserManager.PlaywrightManager): | ||
""" | ||
Process the command and send notifications. | ||
Args: | ||
command (str): The command to process. | ||
playwright_manager (PlaywrightManager): The manager handling browser interactions and notifications. | ||
""" | ||
current_url = await playwright_manager.get_current_url() | ||
await playwright_manager.notify_user("Processing command", MessageType.INFO) | ||
|
||
ag = await AutogenWrapper.create() | ||
command_exec_result = await ag.process_command(command, current_url) | ||
|
||
# TODO: See how to extract the actual final answer from here | ||
final_answer = "Final answer" | ||
|
||
# Notify about the completion of the command | ||
await playwright_manager.notify_user("Command completed", MessageType.INFO) | ||
|
||
await playwright_manager.notify_user(final_answer, MessageType.FINAL) | ||
|
||
# Configure logging | ||
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") | ||
logger = logging.getLogger("uvicorn") | ||
|
||
|
||
if __name__ == "__main__": | ||
logger.info('**********Application Started**********') | ||
uvicorn.run( | ||
"main:app", | ||
host=HOST, | ||
port=PORT, | ||
workers=WORKERS, reload=IS_DEBUG, log_level="info") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.