Skip to content

Commit

Permalink
Merge pull request EmergenceAI#68 from EmergenceAI/dev
Browse files Browse the repository at this point in the history
New UI from DEV branch
  • Loading branch information
teaxio authored Jul 2, 2024
2 parents d1e9e85 + 01a6ba8 commit 9c2c163
Show file tree
Hide file tree
Showing 22 changed files with 735 additions and 371 deletions.
1 change: 0 additions & 1 deletion ae/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from ae.core import skills
from ae.core.autogen_wrapper import AutogenWrapper
from ae.core.playwright_manager import PlaywrightManager
from ae.core.post_process_responses import final_reply_callback_browser_agent
from ae.core.post_process_responses import final_reply_callback_user_proxy
from ae.core.prompts import LLM_PROMPTS
from ae.core.system_orchestrator import SystemOrchestrator
Expand Down
7 changes: 4 additions & 3 deletions ae/core/agents/browser_nav_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from ae.core.memory.static_ltm import get_user_ltm
from ae.core.prompts import LLM_PROMPTS
from ae.core.skills.click_using_selector import click as click_element
from ae.core.skills.enter_text_and_click import enter_text_and_click

# from ae.core.skills.enter_text_and_click import enter_text_and_click
from ae.core.skills.enter_text_using_selector import bulk_enter_text
from ae.core.skills.enter_text_using_selector import entertext
from ae.core.skills.get_dom_with_content_type import get_dom_with_content_type
Expand Down Expand Up @@ -67,9 +68,9 @@ def __register_skills(self):
self.browser_nav_executor.register_for_execution()(openurl)

# Register enter_text_and_click skill for LLM by assistant agent
self.agent.register_for_llm(description=LLM_PROMPTS["ENTER_TEXT_AND_CLICK_PROMPT"])(enter_text_and_click)
# self.agent.register_for_llm(description=LLM_PROMPTS["ENTER_TEXT_AND_CLICK_PROMPT"])(enter_text_and_click)
# Register enter_text_and_click skill for execution by user_proxy_agent
self.browser_nav_executor.register_for_execution()(enter_text_and_click)
# self.browser_nav_executor.register_for_execution()(enter_text_and_click)

# Register get_dom_with_content_type skill for LLM by assistant agent
self.agent.register_for_llm(description=LLM_PROMPTS["GET_DOM_WITH_CONTENT_TYPE_PROMPT"])(get_dom_with_content_type)
Expand Down
29 changes: 18 additions & 11 deletions ae/core/autogen_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Any

import autogen # type: ignore
import nest_asyncio # type: ignore # type: ignore
import nest_asyncio # type: ignore
import openai

#from autogen import Cache
Expand All @@ -17,12 +17,13 @@
from ae.config import SOURCE_LOG_FOLDER_PATH
from ae.core.agents.browser_nav_agent import BrowserNavAgent
from ae.core.agents.high_level_planner_agent import PlannerAgent
from ae.core.post_process_responses import final_reply_callback_planner_agent as print_message_from_planner # type: ignore
from ae.core.post_process_responses import final_reply_callback_planner_agent as notify_planner_messages # type: ignore
from ae.core.prompts import LLM_PROMPTS
from ae.core.skills.get_url import geturl
from ae.utils.autogen_sequential_function_call import UserProxyAgent_SequentialFunctionExecution
from ae.utils.logger import logger
from ae.utils.response_parser import parse_response
from ae.utils.ui_messagetype import MessageType

nest_asyncio.apply() # type: ignore

Expand Down Expand Up @@ -102,14 +103,13 @@ def trigger_nested_chat(manager: autogen.ConversableAgent):
next_step = content_json.get('next_step', None)
plan = content_json.get('plan', None)
if plan is not None:
print_message_from_planner("Plan: "+ plan)
print(f"Next Step: {next_step}")
notify_planner_messages(plan, message_type=MessageType.PLAN)

if next_step is None:
print_message_from_planner("Received no response, terminating..") # type: ignore
print("Trigger nested chat returned False")
notify_planner_messages("Received no response, terminating..", message_type=MessageType.INFO) # type: ignore
return False
else:
print_message_from_planner(next_step) # type: ignore
notify_planner_messages(next_step, message_type=MessageType.STEP) # type: ignore
return True

def get_url() -> str:
Expand All @@ -124,14 +124,15 @@ def my_custom_summary_method(sender: autogen.ConversableAgent,recipient: autogen
elif "##TERMINATE TASK##" in last_message:
last_message=last_message.replace("##TERMINATE TASK##", "") # type: ignore
last_message=last_message+" "+ get_url() # type: ignore
print_message_from_planner("Response: "+ last_message) # type: ignore
notify_planner_messages(last_message, message_type=MessageType.ACTION) # type: ignore
return last_message # type: ignore
return recipient.last_message(sender)["content"] # type: ignore

def reflection_message(recipient, messages, sender, config): # type: ignore
last_message=messages[-1]["content"] # type: ignore
content_json = parse_response(last_message) # type: ignore
next_step = content_json.get('next_step', None)

if next_step is None:
print ("Message to nested chat returned None")
return None
Expand Down Expand Up @@ -227,6 +228,10 @@ async def __create_user_delegate_agent(self) -> autogen.ConversableAgent:
"""
def is_planner_termination_message(x: dict[str, str])->bool: # type: ignore
should_terminate = False
function: Any = x.get("function", None)
if function is not None:
return False

content:Any = x.get("content", "")
if content is None:
content = ""
Expand All @@ -235,15 +240,18 @@ def is_planner_termination_message(x: dict[str, str])->bool: # type: ignore
try:
content_json = parse_response(content)
_terminate = content_json.get('terminate', "no")
final_response = content_json.get('final_response', None)
if(_terminate == "yes"):
should_terminate = True
if final_response:
notify_planner_messages(final_response, message_type=MessageType.ANSWER)
except json.JSONDecodeError:
print("Error decoding JSON content")
logger.error("Error decoding JSON response:\n{content}.\nTerminating..")
should_terminate = True

return should_terminate # type: ignore

task_delegate_agent = autogen.ConversableAgent(
task_delegate_agent = UserProxyAgent_SequentialFunctionExecution(
name="user",
llm_config=False,
system_message=LLM_PROMPTS["USER_AGENT_PROMPT"],
Expand Down Expand Up @@ -331,7 +339,6 @@ async def process_command(self, command: str, current_url: str | None = None) ->
try:
if self.agents_map is None:
raise ValueError("Agents map is not initialized.")
# print(self.agents_map["browser_nav_executor"].function_map) # type: ignore

result=await self.agents_map["user"].a_initiate_chat( # type: ignore
self.agents_map["planner_agent"], # self.manager # type: ignore
Expand Down
69 changes: 58 additions & 11 deletions ae/core/playwright_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
from ae.core.ui_manager import UIManager
from ae.utils.dom_mutation_observer import dom_mutation_change_detected
from ae.utils.dom_mutation_observer import handle_navigation_for_mutation_observer
from ae.utils.js_helper import beautify_plan_message
from ae.utils.js_helper import escape_js_message
from ae.utils.logger import logger
from ae.utils.ui_messagetype import MessageType

# Enusres that playwright does not wait for font loading when taking screenshots. Reference: https://github.com/microsoft/playwright/issues/28995
os.environ["PW_TEST_SCREENSHOT_NO_FONTS_READY"] = "1"
Expand Down Expand Up @@ -251,36 +253,70 @@ async def set_overlay_state_handler(self):
logger.debug("Setting overlay state handler")
context = await self.get_browser_context()
await context.expose_function('overlay_state_changed', self.overlay_state_handler) # type: ignore
await context.expose_function('show_steps_state_changed',self.show_steps_state_handler) # type: ignore

async def overlay_state_handler(self, is_collapsed: bool):
page = await self.get_current_page()
self.ui_manager.update_overlay_state(is_collapsed)
if not is_collapsed:
await self.ui_manager.update_overlay_chat_history(page)

async def show_steps_state_handler(self, show_details: bool):
page = await self.get_current_page()
await self.ui_manager.update_overlay_show_details(show_details, page)

async def set_user_response_handler(self):
context = await self.get_browser_context()
await context.expose_function('user_response', self.receive_user_response) # type: ignore


async def notify_user(self, message: str):
async def notify_user(self, message: str, message_type: MessageType = MessageType.STEP):
"""
Notify the user with a message.
Args:
message (str): The message to notify the user with.
message_type (enum, optional): Values can be 'PLAN', 'QUESTION', 'ANSWER', 'INFO', 'STEP'. Defaults to 'STEP'.
To Do: Convert to Enum.
"""
logger.debug(f"Notification: \"{message}\" being sent to the user.")

if message.startswith(":"):
message = message[1:]

if message.endswith(","):
message = message[:-1]

if message_type == MessageType.PLAN:
message = beautify_plan_message(message)
message = "Plan:\n" + message
elif message_type == MessageType.STEP:
if "confirm" in message.lower():
message = "Verify: " + message
else:
message = "Next step: " + message
elif message_type == MessageType.QUESTION:
message = "Question: " + message
elif message_type == MessageType.ANSWER:
message = "Response: " + message

safe_message = escape_js_message(message)
self.ui_manager.new_system_message(safe_message)
self.ui_manager.new_system_message(safe_message, message_type)

if self.ui_manager.overlay_show_details == False: # noqa: E712
if message_type not in (MessageType.PLAN, MessageType.QUESTION, MessageType.ANSWER, MessageType.INFO):
return

if self.ui_manager.overlay_show_details == True: # noqa: E712
if message_type not in (MessageType.PLAN, MessageType.QUESTION , MessageType.ANSWER, MessageType.INFO, MessageType.STEP):
return

safe_message_type = escape_js_message(message_type.value)
try:
js_code = f"addSystemMessage({safe_message}, false);"
js_code = f"addSystemMessage({safe_message}, is_awaiting_user_response=false, message_type={safe_message_type});"
page = await self.get_current_page()
await page.evaluate(js_code)
logger.debug("User notification completed")
except Exception as e:
logger.debug(f"Failed to notify user with message \"{message}\". However, most likey this will work itself out after the page loads: {e}")
logger.error(f"Failed to notify user with message \"{message}\". However, most likey this will work itself out after the page loads: {e}")

async def highlight_element(self, selector: str, add_highlight: bool):
try:
Expand Down Expand Up @@ -325,10 +361,11 @@ async def prompt_user(self, message: str) -> str:
page = await self.get_current_page()

await self.ui_manager.show_overlay(page)
self.log_system_message(message) # add the message to history after the overlay is opened to avoid double adding it. add_system_message below will add it
self.log_system_message(message, MessageType.QUESTION) # add the message to history after the overlay is opened to avoid double adding it. add_system_message below will add it

safe_message = escape_js_message(message)
js_code = f"addSystemMessage({safe_message}, is_awaiting_user_response=true);"

js_code = f"addSystemMessage({safe_message}, is_awaiting_user_response=true, message_type='question');"
await page.evaluate(js_code)

await self.user_response_event.wait()
Expand Down Expand Up @@ -367,7 +404,7 @@ async def take_screenshots(self, name: str, page: Page|None, full_page: bool = T
try:
await page.wait_for_load_state(state=load_state, timeout=take_snapshot_timeout) # type: ignore
await page.screenshot(path=screenshot_path, full_page=full_page, timeout=take_snapshot_timeout, caret="initial", scale="device")
print(f"Screen shot saved to: {screenshot_path}")
logger.debug(f"Screen shot saved to: {screenshot_path}")
except Exception as e:
logger.error(f"Failed to take screenshot and save to \"{screenshot_path}\". Error: {e}")

Expand All @@ -382,15 +419,25 @@ def log_user_message(self, message: str):
self.ui_manager.new_user_message(message)


def log_system_message(self, message: str):
def log_system_message(self, message: str, type: MessageType = MessageType.STEP):
"""
Log a system message.
Args:
message (str): The system message to log.
"""
self.ui_manager.new_system_message(message)
self.ui_manager.new_system_message(message, type)

async def update_processing_state(self, processing_state: str):
"""
Update the processing state of the overlay.
Args:
is_processing (str): "init", "processing", "done"
"""
page = await self.get_current_page()

await self.ui_manager.update_processing_state(processing_state, page)

async def command_completed(self, command: str, elapsed_time: float | None = None):
"""
Expand Down
42 changes: 6 additions & 36 deletions ae/core/post_process_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from ae.core.playwright_manager import PlaywrightManager
from ae.utils.logger import logger
from ae.utils.ui_messagetype import MessageType


def final_reply_callback_user_proxy(recipient: autogen.ConversableAgent, messages: list[dict[str, Any]], sender: autogen.Agent, config: dict[str, Any]):
Expand Down Expand Up @@ -35,39 +36,8 @@ def final_reply_callback_user_proxy(recipient: autogen.ConversableAgent, message

return False, None



async def final_reply_callback_browser_agent(recipient: autogen.ConversableAgent, messages: list[dict[str, Any]], sender: autogen.Agent, config: dict[str, Any]):
"""
Callback function that is called each time the browser agent receives a message.
It picks the last message from the list of messages and checks if it contains the termination signal.
If the termination signal is found, it extracts the final response and outputs it.
Args:
recipient (autogen.ConversableAgent): The recipient of the message.
messages (Optional[list[dict[str, Any]]]): The list of messages received by the agent.
sender (Optional[autogen.Agent]): The sender of the message.
config (Optional[Any]): Additional configuration parameters.
Returns:
Tuple[bool, None]: A tuple indicating whether the processing should stop and the response to be sent.
"""
global last_agent_response
last_message = messages[-1]
if last_message.get('content') and "##TERMINATE##" in last_message['content']:
last_agent_response = last_message['content'].replace("##TERMINATE##", "").strip()
if last_agent_response:
browser_manager = PlaywrightManager(browser_type='chromium', headless=False)
await browser_manager.notify_user(last_agent_response)
logger.debug("*****Final Reply*****")
logger.debug(f"Final Response: {last_agent_response}")
logger.debug("*********************")
return True, None
return False, None


def final_reply_callback_planner_agent(plan:str): # type: ignore
browser_manager = PlaywrightManager(browser_type='chromium', headless=False)
loop = asyncio.get_event_loop()
loop.run_until_complete(browser_manager.notify_user(plan))
return False, None # required to ensure the agent communication flow continues
def final_reply_callback_planner_agent(message:str, message_type:MessageType = MessageType.STEP): # type: ignore
browser_manager = PlaywrightManager(browser_type='chromium', headless=False)
loop = asyncio.get_event_loop()
loop.run_until_complete(browser_manager.notify_user(message, message_type=message_type))
return False, None # required to ensure the agent communication flow continues
Loading

0 comments on commit 9c2c163

Please sign in to comment.