Skip to content

Commit

Permalink
Merge pull request EmergenceAI#83 from EmergenceAI/add_web_server_inp…
Browse files Browse the repository at this point in the history
…ut_modality

Add web server input modality
  • Loading branch information
teaxio authored Jul 25, 2024
2 parents db65443 + f3c3c94 commit 179f351
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 6 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ To personalize this agent, there is a need for Long Term Memory (LTM) that track
`python -m ae.main` (if you are on a Mac, `python -u -m ae.main` See blocking IO issues above)
Once the program is running, you should see an icon on the browser. The icon expands to chat-like interface where you can enter natural language requests. For example, `open youtube`, `search youtube for funny cat videos`, `find Nothing Phone 2 on Amazon and sort the results by best seller`, etc.

### Launch via web endpoint
There is a FastAPI wrapper for Agent-E. It allows the user to send commands via HTTP and receive streaming results.
- Run `uvicorn ae.server.api_routes:app --reload --loop asyncio`
- Send POST requests to: `http://127.0.0.1:8000/execute_task`
- Sample cURL:
```
curl --location 'http://127.0.0.1:8000/execute_task' \
--header 'Content-Type: application/json' \
--data '{
"command": "go to espn, look for soccer news, report the names of the most recent soccer champs"
}'
```


## Demos

Expand Down
53 changes: 53 additions & 0 deletions ae/core/notification_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from collections.abc import Callable


class NotificationManager:
"""
NotificationManager handles the dispatching of notifications to registered listeners.
Attributes:
listeners (list[Callable[[dict[str, str]], None]]): A list of listener callbacks to notify.
"""

def __init__(self):
"""
Initialize the NotificationManager with no listeners.
"""
self.listeners: list[Callable[[dict[str, str]], None]] = []

def notify(self, message: str, message_type: str) -> None:
"""
Notify all registered listeners with a message and its type.
Args:
message (str): The message to notify.
message_type (str): The type of the message.
"""
notification = {
"message": message,
"type": message_type,
}

if self.listeners:
for listener in self.listeners:
listener(notification)
else:
print(f"No listeners available, discarding message: {notification}")

def register_listener(self, listener: Callable[[dict[str, str]], None]) -> None:
"""
Register a new listener to receive notifications.
Args:
listener (Callable[[dict[str, str]], None]): The listener callback to register.
"""
self.listeners.append(listener)

def unregister_listener(self, listener: Callable[[dict[str, str]], None]) -> None:
"""
Unregister a listener from receiving notifications.
Args:
listener (Callable[[dict[str, str]], None]): The listener callback to unregister.
"""
self.listeners.remove(listener)
4 changes: 4 additions & 0 deletions ae/core/playwright_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from playwright.async_api import Page
from playwright.async_api import Playwright

from ae.core.notification_manager import NotificationManager
from ae.core.ui_manager import UIManager
from ae.utils.dom_mutation_observer import dom_mutation_change_detected
from ae.utils.dom_mutation_observer import handle_navigation_for_mutation_observer
Expand Down Expand Up @@ -62,6 +63,7 @@ def __init__(self, browser_type: str = "chromium", headless: bool = False, gui_i
self.browser_type = browser_type
self.isheadless = headless
self.__initialized = True
self.notification_manager = NotificationManager()
self.user_response_event = asyncio.Event()
if gui_input_mode:
self.ui_manager: UIManager = UIManager()
Expand Down Expand Up @@ -318,6 +320,8 @@ async def notify_user(self, message: str, message_type: MessageType = MessageTyp
except Exception as e:
logger.error(f"Failed to notify user with message \"{message}\". However, most likey this will work itself out after the page loads: {e}")

self.notification_manager.notify(message, message_type.value)

async def highlight_element(self, selector: str, add_highlight: bool):
try:
page: Page = await self.get_current_page()
Expand Down
Empty file added ae/server/__init__.py
Empty file.
134 changes: 134 additions & 0 deletions ae/server/api_routes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import asyncio
import json
import logging
import os
from queue import Empty
from queue import Queue

import uvicorn
from fastapi import FastAPI
from fastapi import Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from pydantic import Field

import ae.core.playwright_manager as browserManager
from ae.core.autogen_wrapper import AutogenWrapper
from ae.utils.ui_messagetype import MessageType

browser_manager = browserManager.PlaywrightManager(headless=False)

APP_VERSION = "1.0.0"
APP_NAME = "Agent-E Web API"
API_PREFIX = "/api"
IS_DEBUG = False
HOST = os.getenv("HOST", "0.0.0.0")
PORT = int(os.getenv("PORT", 8080))
WORKERS = 1

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("uvicorn")


class CommandQueryModel(BaseModel):
command: str = Field(..., description="The command related to web navigation to execute.") # Required field with description


def get_app() -> FastAPI:
'''Starts the Application'''
fast_app = FastAPI(
title=APP_NAME,
version=APP_VERSION,
debug=IS_DEBUG)

fast_app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"])

return fast_app

app = get_app()

@app.on_event("startup") # type: ignore
async def startup_event():
"""
Startup event handler to initialize browser manager asynchronously.
"""
await browser_manager.async_initialize()


@app.post("/execute_task", description="Execute a given command related to web navigation and return the result.")
async def execute_task(request: Request, query_model: CommandQueryModel):
notification_queue = Queue() # type: ignore
register_notification_listener(notification_queue)
return StreamingResponse(run_task(query_model.command, browser_manager, notification_queue), media_type='text/event-stream')


def run_task(command: str, playwright_manager: browserManager.PlaywrightManager, notification_queue: Queue):
"""
Run the task to process the command and generate events.
Args:
command (str): The command to execute.
playwright_manager (PlaywrightManager): The manager handling browser interactions and notifications.
notification_queue (Queue): The queue to hold notifications for this request.
Yields:
str: JSON-encoded string representing a notification.
"""
async def event_generator():
# Start the process command task
task = asyncio.create_task(process_command(command, playwright_manager))

while not task.done() or not notification_queue.empty():
try:
notification = notification_queue.get_nowait() # type: ignore
yield f"data: {json.dumps(notification)}\n\n" # Using 'data: ' to follow the SSE format
except Empty:
await asyncio.sleep(0.1)

# Ensure the task is awaited to propagate any exceptions
await task

return event_generator()


async def process_command(command: str, playwright_manager: browserManager.PlaywrightManager):
"""
Process the command and send notifications.
Args:
command (str): The command to process.
playwright_manager (PlaywrightManager): The manager handling browser interactions and notifications.
"""
current_url = await playwright_manager.get_current_url()
await playwright_manager.notify_user("Processing command", MessageType.INFO)

ag = await AutogenWrapper.create()
command_exec_result = await ag.process_command(command, current_url) # type: ignore

# Notify about the completion of the command
await playwright_manager.notify_user("DONE", MessageType.DONE)


def register_notification_listener(notification_queue: Queue): # type: ignore
"""
Register the event generator as a listener in the NotificationManager.
"""
def listener(notification: dict[str, str]) -> None:
notification_queue.put(notification) # type: ignore

browser_manager.notification_manager.register_listener(listener)

if __name__ == "__main__":
logger.info('**********Application Started**********')
uvicorn.run(
"main:app",
host=HOST,
port=PORT,
workers=WORKERS, reload=IS_DEBUG, log_level="info")
4 changes: 3 additions & 1 deletion ae/utils/ui_messagetype.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,7 @@ class MessageType(Enum):
STEP = "step"
ACTION ="action"
ANSWER = "answer"
QUESTION= "question"
QUESTION = "question"
INFO = "info"
FINAL = "final"
DONE = "transaction_done"
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ dependencies = [
"pydantic==2.6.2",
"python-dotenv==1.0.0",
"tabulate==0.9.0",
"nest-asyncio==1.6.0"
"nest-asyncio==1.6.0",
"fastapi==0.111.1",
"uvicorn==0.30.3"
]

[project.optional-dependencies]
Expand Down
Loading

0 comments on commit 179f351

Please sign in to comment.