-
-
Notifications
You must be signed in to change notification settings - Fork 191
Feature/performance test #749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2fc3197
adc2227
5827fba
592f03b
629139a
0dbf8ee
9edc2f1
d55cee4
9b65e95
2c8bac7
6af46bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
from typing import Any, Union | ||
from loguru import logger | ||
import AppKit | ||
import ApplicationServices | ||
|
||
def get_attribute(element, attribute): | ||
result, value = ApplicationServices.AXUIElementCopyAttributeValue(element, attribute, None) | ||
if result == 0: | ||
return value | ||
return None | ||
|
||
def find_element_by_attribute(element, attribute, value): | ||
if get_attribute(element, attribute) == value: | ||
return element | ||
children = get_attribute(element, ApplicationServices.kAXChildrenAttribute) or [] | ||
for child in children: | ||
found = find_element_by_attribute(child, attribute, value) | ||
if found: | ||
return found | ||
return None | ||
|
||
def find_application(app_name: str): | ||
"""Find an application by its name and return its accessibility element. | ||
|
||
Args: | ||
app_name (str): The name of the application to find. | ||
|
||
Returns: | ||
AXUIElement or None: The AXUIElement representing the application, | ||
or None if the application is not running. | ||
""" | ||
workspace = AppKit.NSWorkspace.sharedWorkspace() | ||
running_apps = workspace.runningApplications() | ||
app = next((app for app in running_apps if app.localizedName() == app_name), None) | ||
if app is None: | ||
logger.error(f"{app_name} application is not running.") | ||
return None | ||
|
||
app_element = ApplicationServices.AXUIElementCreateApplication(app.processIdentifier()) | ||
return app_element | ||
|
||
def get_main_window(app_element): | ||
"""Get the main window of an application. | ||
|
||
Args: | ||
app_element: The AXUIElement of the application. | ||
|
||
Returns: | ||
AXUIElement or None: The AXUIElement representing the main window, | ||
or None if no windows are found. | ||
""" | ||
error_code, windows = ApplicationServices.AXUIElementCopyAttributeValue(app_element, ApplicationServices.kAXWindowsAttribute, None) | ||
if error_code or not windows: | ||
return None | ||
|
||
return windows[0] if windows else None | ||
|
||
def get_element_value(element, role: str): | ||
"""Get the value of a specific element by its role. | ||
|
||
Args: | ||
element: The AXUIElement to search within. | ||
role (str): The role of the element to find (e.g., "AXStaticText"). | ||
|
||
Returns: | ||
str: The value of the element, or an error message if not found. | ||
""" | ||
target_element = find_element_by_attribute(element, ApplicationServices.kAXRoleAttribute, role) | ||
if not target_element: | ||
return f"{role} element not found." | ||
|
||
value = get_attribute(target_element, ApplicationServices.kAXValueAttribute) | ||
return value if value else f"No value for {role} element." |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
import os | ||
from sqlalchemy import create_engine, inspect | ||
from sqlalchemy.orm import sessionmaker | ||
from openadapt.db.db import Base | ||
from openadapt.config import DATA_DIR_PATH, PARENT_DIR_PATH, RECORDING_DIR_PATH | ||
import openadapt.db.crud as crud | ||
|
||
def get_session(): | ||
db_url = RECORDING_DIR_PATH / "recording.db" | ||
print(f"Database URL: {db_url}") | ||
engine = create_engine(f"sqlite:///{db_url}") | ||
# SessionLocal = sessionmaker(bind=engine) | ||
Base.metadata.create_all(bind=engine) | ||
session = crud.get_new_session(read_only=True) | ||
print("Database connection established.") | ||
seanmcguire12 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return session, engine | ||
|
||
def check_tables_exist(engine): | ||
inspector = inspect(engine) | ||
tables = inspector.get_table_names() | ||
expected_tables = [ | ||
'recording', | ||
'action_event', | ||
'screenshot', | ||
'window_event', | ||
'performance_stat', | ||
'memory_stat' | ||
] | ||
for table in expected_tables: | ||
if table in tables: | ||
print(f"Table '{table}' exists.") | ||
else: | ||
print(f"Table '{table}' does NOT exist.") | ||
seanmcguire12 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return tables | ||
|
||
def fetch_data(session): | ||
# get the most recent three recordings | ||
recordings = crud.get_recordings(session, max_rows=3) | ||
recording_ids = [recording.id for recording in recordings] | ||
|
||
action_events = [] | ||
screenshots = [] | ||
window_events = [] | ||
performance_stats = [] | ||
memory_stats = [] | ||
|
||
for recording in recordings: | ||
action_events.extend(crud.get_action_events(session, recording)) | ||
screenshots.extend(crud.get_screenshots(session, recording)) | ||
window_events.extend(crud.get_window_events(session, recording)) | ||
performance_stats.extend(crud.get_perf_stats(session, recording)) | ||
memory_stats.extend(crud.get_memory_stats(session, recording)) | ||
|
||
data = { | ||
"recordings": recordings, | ||
"action_events": action_events, | ||
"screenshots": screenshots, | ||
"window_events": window_events, | ||
"performance_stats": performance_stats, | ||
"memory_stats": memory_stats, | ||
} | ||
|
||
# Debug prints to verify data fetching | ||
print(f"Recordings: {len(data['recordings'])} found.") | ||
print(f"Action Events: {len(data['action_events'])} found.") | ||
print(f"Screenshots: {len(data['screenshots'])} found.") | ||
print(f"Window Events: {len(data['window_events'])} found.") | ||
print(f"Performance Stats: {len(data['performance_stats'])} found.") | ||
print(f"Memory Stats: {len(data['memory_stats'])} found.") | ||
|
||
return data | ||
|
||
def format_sql_insert(table_name, rows): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add types. Also, what do you think about something like this:
Please fill out the type for |
||
if not rows: | ||
return "" | ||
|
||
columns = rows[0].__table__.columns.keys() | ||
sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES\n" | ||
values = [] | ||
|
||
for row in rows: | ||
row_values = [getattr(row, col) for col in columns] | ||
row_values = [f"'{value}'" if isinstance(value, str) else str(value) for value in row_values] | ||
values.append(f"({', '.join(row_values)})") | ||
|
||
sql += ",\n".join(values) + ";\n" | ||
return sql | ||
|
||
def dump_to_fixtures(filepath): | ||
session, engine = get_session() | ||
check_tables_exist(engine) | ||
data = fetch_data(session) | ||
|
||
with open(filepath, "a") as file: | ||
if data["recordings"]: | ||
file.write("-- Insert sample recordings\n") | ||
file.write(format_sql_insert("recording", data["recordings"])) | ||
|
||
if data["action_events"]: | ||
file.write("-- Insert sample action_events\n") | ||
file.write(format_sql_insert("action_event", data["action_events"])) | ||
|
||
if data["screenshots"]: | ||
file.write("-- Insert sample screenshots\n") | ||
file.write(format_sql_insert("screenshot", data["screenshots"])) | ||
|
||
if data["window_events"]: | ||
file.write("-- Insert sample window_events\n") | ||
file.write(format_sql_insert("window_event", data["window_events"])) | ||
|
||
if data["performance_stats"]: | ||
file.write("-- Insert sample performance_stats\n") | ||
file.write(format_sql_insert("performance_stat", data["performance_stats"])) | ||
|
||
if data["memory_stats"]: | ||
file.write("-- Insert sample memory_stats\n") | ||
file.write(format_sql_insert("memory_stat", data["memory_stats"])) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about something more DRY, e.g.:
|
||
|
||
print(f"Data appended to {filepath}") | ||
|
||
if __name__ == "__main__": | ||
fixtures_path = PARENT_DIR_PATH / "tests/assets/fixtures.sql" | ||
dump_to_fixtures(fixtures_path) |
Uh oh!
There was an error while loading. Please reload this page.