Skip to content

Commit

Permalink
Merge pull request EmergenceAI#45 from EmergenceAI/screenshot_capabil…
Browse files Browse the repository at this point in the history
…ity_for_benchmarks

Screenshot capability for benchmarks
  • Loading branch information
deepak-akkil authored Jun 7, 2024
2 parents 52f4fec + 054ccb2 commit f49076c
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 29 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ This will take time to run. Alternatlively to run a particular example(s), modif
- `--max_task_index`: Maximum task index to end tests with, non-inclusive
- `--test_results_id`: A unique identifier for the test results. If not provided, a timestamp is used
- `--test_config_file`: Path to the test configuration file. Default is "test/tasks/test.json" in the project root.
- `wait_time_non_headless`: The amount of time to wait between headless tests
- `--wait_time_non_headless`: The amount of time to wait between headless tests
- `--take_screenshots`: Takes screenshots after every operation performed. Example: `--take_screenshots true` Default to `false`
For example: `python -m test.run_tests --min_task_index 0 --max_task_index 28 --test_results_id first_28_tests` _(add `-u` for Mac)_


Expand Down
1 change: 0 additions & 1 deletion ae/core/autogen_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ async def process_command(self, command: str, current_url: str | None = None) ->
)
# reset usage summary for all agents after each command
for agent in self.agents_map.values():
print(">>> agent:", agent)
if hasattr(agent, "client") and agent.client is not None:
agent.client.clear_usage_summary() # type: ignore
return result
Expand Down
41 changes: 38 additions & 3 deletions ae/core/playwright_manager.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import asyncio
import os
import tempfile
import time

from playwright.async_api import async_playwright as playwright
from playwright.async_api import BrowserContext
from playwright.async_api import Page
from playwright.async_api import Playwright

from ae.core.ui_manager import UIManager
from ae.utils.dom_mutation_observer import dom_mutation_change_detected
from ae.utils.dom_mutation_observer import handle_navigation_for_mutation_observer
from ae.utils.js_helper import escape_js_message
from ae.utils.logger import logger
from ae.utils.dom_mutation_observer import handle_navigation_for_mutation_observer
from ae.utils.dom_mutation_observer import dom_mutation_change_detected


class PlaywrightManager:
Expand All @@ -29,6 +30,8 @@ class PlaywrightManager:
_playwright = None # type: ignore
_browser_context = None
__async_initialize_done = False
_take_screenshots = False
_screenshots_dir = None

def __new__(cls, *args, **kwargs): # type: ignore
"""
Expand All @@ -41,7 +44,7 @@ def __new__(cls, *args, **kwargs): # type: ignore
return cls._instance


def __init__(self, browser_type: str = "chromium", headless: bool = False, gui_input_mode: bool=True):
def __init__(self, browser_type: str = "chromium", headless: bool = False, gui_input_mode: bool = True, screenshots_dir: str = "", take_screenshots: bool = False):
"""
Initializes the PlaywrightManager with the specified browser type and headless mode.
Initialization occurs only once due to the singleton pattern.
Expand All @@ -59,6 +62,9 @@ def __init__(self, browser_type: str = "chromium", headless: bool = False, gui_i
if gui_input_mode:
self.ui_manager: UIManager = UIManager()

self.set_take_screenshots(take_screenshots)
self.set_screenshots_dir(screenshots_dir)


async def async_initialize(self):
"""
Expand Down Expand Up @@ -335,6 +341,35 @@ async def prompt_user(self, message: str) -> str:
self.ui_manager.new_user_message(result)
return result

def set_take_screenshots(self, take_screenshots: bool):
self._take_screenshots = take_screenshots

def get_take_screenshots(self):
return self._take_screenshots

def set_screenshots_dir(self, screenshots_dir: str):
self._screenshots_dir = screenshots_dir

def get_screenshots_dir(self):
return self._screenshots_dir

async def take_screenshots(self, name: str, page: Page|None, full_page: bool = True, include_timestamp: bool = True):
if not self._take_screenshots:
return
if page is None:
page = await self.get_current_page()

screenshot_name = name

if include_timestamp:
screenshot_name += f"_{int(time.time())}"
screenshot_name += ".png"

await page.screenshot(path=f"{self.get_screenshots_dir()}/{screenshot_name}", full_page=full_page)

print(f"Screen shot saved to: {self.get_screenshots_dir()}/{screenshot_name}")


def log_user_message(self, message: str):
"""
Log the user's message.
Expand Down
9 changes: 6 additions & 3 deletions ae/core/skills/click_using_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@

from ae.core.playwright_manager import PlaywrightManager
from ae.utils.dom_helper import get_element_outer_html
from ae.utils.dom_mutation_observer import subscribe
from ae.utils.dom_mutation_observer import unsubscribe
from ae.utils.logger import logger
from ae.utils.dom_mutation_observer import subscribe
from ae.utils.dom_mutation_observer import unsubscribe


async def click(selector: Annotated[str, "The properly formed query selector string to identify the element for the click action. When \"mmid\" attribute is present, use it for the query selector."],
wait_before_execution: Annotated[float, "Optional wait time in seconds before executing the click event logic.", float] = 0.0) -> Annotated[str, "A message indicating success or failure of the click."]:
Expand All @@ -34,7 +35,7 @@ async def click(selector: Annotated[str, "The properly formed query selector str
raise ValueError('No active page found. OpenURL command opens a new page.')

await browser_manager.highlight_element(selector, True)

dom_changes_detected=None
def detect_dom_changes(changes:str): # type: ignore
nonlocal dom_changes_detected
Expand All @@ -44,7 +45,9 @@ def detect_dom_changes(changes:str): # type: ignore
result = await do_click(page, selector, wait_before_execution)
await asyncio.sleep(0.1) # sleep for 100ms to allow the mutation observer to detect changes
unsubscribe(detect_dom_changes)
await browser_manager.take_screenshots("click_using_selector", page)
await browser_manager.notify_user(result["summary_message"])

if dom_changes_detected:
return f"{result['detailed_message']}.\n As a consequence of this action, new elements have appeared in view: {dom_changes_detected}. Get all_fields to interact with the elements."
return result["detailed_message"]
Expand Down
10 changes: 7 additions & 3 deletions ae/core/skills/enter_text_and_click.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import asyncio
from typing import Annotated

from ae.core.playwright_manager import PlaywrightManager
from ae.core.skills.click_using_selector import do_click
from ae.core.skills.enter_text_using_selector import do_entertext
from ae.core.skills.press_key_combination import do_press_key_combination
from ae.utils.logger import logger
import asyncio


async def enter_text_and_click(
text_selector: Annotated[str, "The properly formatted DOM selector query, for example [mmid='1234'], where the text will be entered. Use mmid attribute."],
Expand Down Expand Up @@ -54,7 +55,7 @@ async def enter_text_and_click(

#if the text_selector is the same as the click_selector, press the Enter key instead of clicking
if text_selector == click_selector:
do_press_key_combination_result = await do_press_key_combination(page, "Enter")
do_press_key_combination_result = await do_press_key_combination(browser_manager, page, "Enter")
if do_press_key_combination_result:
result["detailed_message"] += f" Instead of click, pressed the Enter key successfully on element: \"{click_selector}\"."
await browser_manager.notify_user(f"Pressed the Enter key successfully on element: \"{click_selector}\".")
Expand All @@ -67,6 +68,9 @@ async def enter_text_and_click(
do_click_result = await do_click(page, click_selector, wait_before_click_execution)
result["detailed_message"] += f' {do_click_result["detailed_message"]}'
await browser_manager.notify_user(do_click_result["summary_message"])

await asyncio.sleep(0.1) # sleep for 100ms to allow the mutation observer to detect changes

await browser_manager.take_screenshots("click_using_selector", page)

return result["detailed_message"]
10 changes: 7 additions & 3 deletions ae/core/skills/enter_text_using_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
from typing import List # noqa: UP035

from playwright.async_api import Page
from ae.core.skills.press_key_combination import press_key_combination

from ae.core.playwright_manager import PlaywrightManager
from ae.core.skills.press_key_combination import press_key_combination
from ae.utils.dom_helper import get_element_outer_html
from ae.utils.dom_mutation_observer import subscribe
from ae.utils.dom_mutation_observer import unsubscribe
from ae.utils.logger import logger
from ae.utils.dom_mutation_observer import subscribe
from ae.utils.dom_mutation_observer import unsubscribe


@dataclass
class EnterTextEntry:
Expand Down Expand Up @@ -115,6 +117,8 @@ def detect_dom_changes(changes:str): # type: ignore
await asyncio.sleep(0.1) # sleep for 100ms to allow the mutation observer to detect changes
unsubscribe(detect_dom_changes)

await browser_manager.take_screenshots("click_using_selector", page)

await browser_manager.notify_user(result["summary_message"])
if dom_changes_detected:
return f"{result['detailed_message']}.\n As a consequence of this action, new elements have appeared in view: {dom_changes_detected}. Get all_fields to interact with the elements."
Expand Down
3 changes: 3 additions & 0 deletions ae/core/skills/open_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ async def openurl(url: Annotated[str, "The URL to navigate to. Value must includ
logger.warn(f"Initial navigation to {url} failed: {e}. Will try to continue anyway.") # happens more often than not, but does not seem to be a problem
import traceback
traceback.print_exc()

await browser_manager.take_screenshots("click_using_selector", page)

await browser_manager.notify_user(f"Opened URL: {url}")
return f"Page loaded: {page.url.split('?')[0]}" # type: ignore

Expand Down
7 changes: 5 additions & 2 deletions ae/core/skills/press_key_combination.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ async def press_enter_key(selector: Annotated[str, """The properly formed query
raise ValueError('No active page found. OpenURL command opens a new page.')

await do_click(page, selector, wait_before_execution=0.0)
result = await do_press_key_combination(page, 'Enter')
result = await do_press_key_combination(browser_manager, page, 'Enter')

if result:
return f"Enter key pressed in field with selector: {selector}"
else:
return f"Failed to press Enter key in field with selector: {selector}"


async def do_press_key_combination(page: Page, key_combination: str) -> bool:
async def do_press_key_combination(browser_manager: PlaywrightManager, page: Page, key_combination: str) -> bool:
"""
Presses a key combination on the provided page.
Expand All @@ -79,6 +79,7 @@ async def do_press_key_combination(page: Page, key_combination: str) -> bool:
For example, 'Control+C' to copy or 'Alt+F4' to close a window on Windows.
Parameters:
- browser_manager (PlaywrightManager): The PlaywrightManager instance.
- page (Page): The Playwright page instance.
- key_combination (str): The key combination to press, represented as a string. For combinations, use '+' as a separator.
Expand All @@ -101,6 +102,8 @@ async def do_press_key_combination(page: Page, key_combination: str) -> bool:
# Release the modifier keys
for key in keys[:-1]:
await page.keyboard.up(key)

await browser_manager.take_screenshots("click_using_selector", page)
except Exception as e:
logger.error(f"Error executing press_key_combination \"{key_combination}\": {e}")
return False
Expand Down
5 changes: 4 additions & 1 deletion test/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
parser = argparse.ArgumentParser(description='Run test suite for specified range of test tasks.')

# Add arguments
parser.add_argument('-s', '--take_screenshots', type=bool, default=False,
help='Take screenshots after every operation performed (default: False)')
parser.add_argument('-wait', '--wait_time_non_headless', type=int, default=5,
help='Time to wait between test tasks when running in non-headless mode (default: 10 seconds)')
parser.add_argument('-min', '--min_task_index', type=int, default=0,
Expand All @@ -22,4 +24,5 @@
args = parser.parse_args()

# Run the main function with the provided or default arguments, not passing browser_manager or AutoGenWrapper will cause the test processor to create new instances of them
asyncio.run(run_tests(None, None, args.min_task_index, args.max_task_index, test_results_id=args.test_results_id, test_file=args.test_config_file))
asyncio.run(run_tests(None, None, args.min_task_index, args.max_task_index, test_results_id=args.test_results_id, test_file=args.test_config_file,
take_screenshots=args.take_screenshots, wait_time_non_headless=args.wait_time_non_headless))
42 changes: 30 additions & 12 deletions test/tests_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from ae.core.playwright_manager import PlaywrightManager
from ae.utils.logger import logger
from autogen.agentchat.chat import ChatResult # type: ignore
from autogen.agentchat import Agent
from playwright.async_api import Page
from tabulate import tabulate
from termcolor import colored
Expand All @@ -36,9 +35,21 @@ def check_test_folders():
os.makedirs(TEST_RESULTS)
logger.info(f"Created scores folder at: {TEST_RESULTS}")

def create_task_log_folders(task_id: str, test_results_id: str):
task_log_dir = os.path.join(TEST_LOGS, f'{test_results_id}_{task_id}')
task_screenshots_dir = os.path.join(task_log_dir, 'snapshots')
if not os.path.exists(task_log_dir):
os.makedirs(task_log_dir)
logger.info(f"Created log dir for task {task_id} at: {task_log_dir}")
if not os.path.exists(task_screenshots_dir):
os.makedirs(task_screenshots_dir)
logger.info(f"Created screenshots dir for task {task_id} at: {task_screenshots_dir}")

def dump_log(task_id: str, messages_str_keys: dict[str, str]):
file_name = os.path.join(TEST_LOGS, f'execution_logs_{task_id}.json')
return {"task_log_folder": task_log_dir, "task_screenshots_folder": task_screenshots_dir}


def dump_log(task_id: str, messages_str_keys: dict[str, str], logs_dir: str):
file_name = os.path.join(logs_dir, f'execution_logs_{task_id}.json')
with open(file_name, 'w', encoding='utf-8') as f:
json.dump(messages_str_keys, f, ensure_ascii=False, indent=4)

Expand Down Expand Up @@ -121,7 +132,7 @@ def get_command_exec_cost(command_exec_result: ChatResult):
return output


async def execute_single_task(task_config: dict[str, Any], browser_manager: PlaywrightManager, ag: AutogenWrapper, page: Page) -> dict[str, Any]:
async def execute_single_task(task_config: dict[str, Any], browser_manager: PlaywrightManager, ag: AutogenWrapper, page: Page, logs_dir: str) -> dict[str, Any]:
"""
Executes a single test task based on a specified task configuration and evaluates its performance.
Expand Down Expand Up @@ -165,7 +176,7 @@ async def execute_single_task(task_config: dict[str, Any], browser_manager: Play
agent_key = list(messages.keys())[0] # type: ignore
last_agent_response = extract_last_response(messages[agent_key]) # type: ignore

dump_log(str(task_id), messages_str_keys)
dump_log(str(task_id), messages_str_keys, logs_dir)

evaluator = evaluator_router(task_config)
cdp_session = await page.context.new_cdp_session(page)
Expand Down Expand Up @@ -201,7 +212,7 @@ async def execute_single_task(task_config: dict[str, Any], browser_manager: Play


async def run_tests(ag: AutogenWrapper, browser_manager: PlaywrightManager, min_task_index: int, max_task_index: int,
test_file: str="", test_results_id: str = "", wait_time_non_headless: int=5) -> list[dict[str, Any]]:
test_file: str="", test_results_id: str = "", wait_time_non_headless: int=5, take_screenshots: bool = False) -> list[dict[str, Any]]:
"""
Runs a specified range of test tasks using Playwright for browser interactions and AutogenWrapper for task automation.
It initializes necessary components, processes each task, handles exceptions, and compiles test results into a structured list.
Expand All @@ -214,6 +225,7 @@ async def run_tests(ag: AutogenWrapper, browser_manager: PlaywrightManager, min_
- test_file (str): Path to the file containing the test configurations. If not provided, defaults to a predetermined file path.
- test_results_id (str): A unique identifier for the session of test results. Defaults to a timestamp if not provided.
- wait_time_non_headless (int): Time to wait between tasks when running in non-headless mode, useful for live monitoring or debugging.
- take_screenshots (bool): Whether to take screenshots during test execution. Defaults to False.
Returns:
- list[dict[str, Any]]: A list of dictionaries, each containing the results from executing a test task. Results include task ID, intent, score, total command time, etc.
Expand Down Expand Up @@ -248,8 +260,14 @@ async def run_tests(ag: AutogenWrapper, browser_manager: PlaywrightManager, min_
total_tests = max_task_index - min_task_index

for index, task_config in enumerate(test_configurations[min_task_index:max_task_index], start=min_task_index):
task_id = str(task_config.get('task_id'))
log_folders = create_task_log_folders(task_id, test_results_id)
browser_manager.set_take_screenshots(take_screenshots)
if take_screenshots:
browser_manager.set_screenshots_dir(log_folders["task_screenshots_folder"])

print_progress_bar(index - min_task_index, total_tests)
task_result = await execute_single_task(task_config, browser_manager, ag, page)
task_result = await execute_single_task(task_config, browser_manager, ag, page, log_folders["task_log_folder"])
test_results.append(task_result)
save_test_results(test_results, test_results_id)
print_test_result(task_result, index + 1, total_tests)
Expand Down Expand Up @@ -287,20 +305,20 @@ async def run_tests(ag: AutogenWrapper, browser_manager: PlaywrightManager, min_

for result in test_results:
compute_cost = result.get("compute_cost",0) # type: ignore
if compute_cost is not None:
if compute_cost is not None and isinstance(compute_cost, dict):
total_cost += compute_cost.get("cost", 0) # type: ignore
total_tokens += compute_cost.get("total_tokens", 0) # type: ignore

passed_tests = [result for result in test_results if result['score'] == 1]
summary_table = [ # type: ignore
['Total Tests', 'Passed', 'Failed', 'Average Time Taken (s)', 'Total Time Taken (s)', 'Total Tokens', 'Total Cost ($)'],
[total_tests, len(passed_tests), total_tests - len(passed_tests),
round(sum(test['tct'] for test in test_results) / total_tests, 2), # type: ignore
round(sum(test['tct'] for test in test_results), 2), # type: ignore
total_tokens, total_cost]
round(sum(test['tct'] for test in test_results) / total_tests, 2), # type: ignore
round(sum(test['tct'] for test in test_results), 2), # type: ignore
total_tokens, total_cost]
]

print('\nSummary Report:')
print(tabulate(summary_table, headers='firstrow', tablefmt='grid')) # type: ignore

return test_results
return test_results

0 comments on commit f49076c

Please sign in to comment.