diff --git a/loopquest/api.py b/loopquest/api.py index c88401b..ab9d693 100644 --- a/loopquest/api.py +++ b/loopquest/api.py @@ -2,7 +2,7 @@ import getpass import webbrowser from .ui import choose_instance -from .crud import get_cloud_user_id +from .crud import get_cloud_user_id, send_instance_choice_stats from .utils import is_docker_installed from .private_api import ( is_local_instance_initialized, @@ -18,10 +18,12 @@ ) -def init(local=None): +def init(local: bool | None = None): if local is None: local = choose_instance() == "local" + send_instance_choice_stats(CLOUD_BACKEND_URL, local) + # TODO: add an api to record the choice of the instance. if local: if is_docker_installed(): diff --git a/loopquest/crud.py b/loopquest/crud.py index bdccec0..eb3904a 100644 --- a/loopquest/crud.py +++ b/loopquest/crud.py @@ -31,12 +31,18 @@ def make_request(method: str, url: str, **kwargs): headers["Authorization"] = f"Bearer {os.getenv(TOKEN_ENV_VAR_NAME)}" kwargs["headers"] = headers - # print(headers) response = requests.request(method, url, **kwargs) response.raise_for_status() return response +def send_instance_choice_stats(backend_url: str, is_local: bool): + res = requests.post( + f"{backend_url}/user_stats/instance_choice", params={"is_local": is_local} + ) + res.raise_for_status() + + def construct_environment_info(env: gymnasium.Env, user_id: str): def get_size_of_gym_space(space): return max(sum(space.shape), 1) diff --git a/loopquest/gym_wrappers.py b/loopquest/gym_wrappers.py index 070e1f3..c469e39 100644 --- a/loopquest/gym_wrappers.py +++ b/loopquest/gym_wrappers.py @@ -15,6 +15,7 @@ ) from .utils import jsonize_dict, cast_to_list, flatten_and_cast_to_list from .api import get_frontend_url, get_backend_url, get_user_id +from concurrent.futures import ThreadPoolExecutor class LoopquestGymWrapper(gymnasium.Wrapper): @@ -23,6 +24,7 @@ def __init__( env: gymnasium.Env, experiment_name: str = "default", experiment_description: str = "", + max_workers: int = 10, ): super().__init__(env) self.current_step = 0 @@ -49,6 +51,8 @@ def __init__( print( f"Check the experiment progress at: {self.frontend_url}/experiment/{self.exp_id}" ) + # TODO: evaluate if message queue is better than thread pool. + self.executor = ThreadPoolExecutor(max_workers=max_workers) @property def exp_id(self): @@ -66,7 +70,9 @@ def step(self, action): self.current_step += 1 observation, reward, terminated, truncated, info = self.env.step(action) info_json_str = jsonize_dict(info) - create_step( + # TODO: we can do batch upload here instead of uploading one by one. + self.executor.submit( + create_step, self.backend_url, StepCreate( id=self.step_id, @@ -86,6 +92,26 @@ def step(self, action): info=info_json_str, ), ) + # create_step( + # self.backend_url, + # StepCreate( + # id=self.step_id, + # experiment_id=self.exp_id, + # episode=self.current_episode, + # step=self.current_step, + # observation=flatten_and_cast_to_list( + # self.observation_space, observation + # ), + # action=cast_to_list(action), + # reward=reward, + # prev_observation=flatten_and_cast_to_list( + # self.observation_space, self.prev_observation + # ), + # termnated=terminated, + # truncated=truncated, + # info=info_json_str, + # ), + # ) self.prev_observation = observation return observation, reward, terminated, truncated, info @@ -104,7 +130,8 @@ def reset(self): # self.current_episode += 1 info_json_str = jsonize_dict(info) - create_step( + self.executor.submit( + create_step, self.backend_url, StepCreate( id=self.step_id, @@ -127,6 +154,7 @@ def close(self): ExperimentUpdate(status=ExperimentStatus.FINISHED), ) self.env.close() + self.executor.shutdown() def render(self): # TODO: implement this. @@ -140,13 +168,19 @@ def render(self): ) elif self.render_mode == "rgb_array": rgb_array = self.env.render() - step = upload_rgb_as_image(self.backend_url, self.step_id, rgb_array) + self.executor.submit( + upload_rgb_as_image, self.backend_url, self.step_id, rgb_array + ) return rgb_array elif self.render_mode == "rgb_array_list": rgb_array_list = self.env.render() for i, rgb_array in enumerate(rgb_array_list): - upload_rgb_as_image( - self.backend_url, self.step_id, rgb_array, image_id=i + self.executor.submit( + upload_rgb_as_image, + self.backend_url, + self.step_id, + rgb_array, + image_id=i, ) return rgb_array_list diff --git a/loopquest/private_api.py b/loopquest/private_api.py index 9f3883d..45e8052 100644 --- a/loopquest/private_api.py +++ b/loopquest/private_api.py @@ -8,6 +8,9 @@ LOCAL_BACKEND_URL = "http://localhost:8000" CLOUD_FRONTEND_URL = "https://open.loopquest.ai" CLOUD_BACKEND_URL = "https://open.loopquest.ai/api" +# For local development +# CLOUD_FRONTEND_URL = "http://localhost:3000" +# CLOUD_BACKEND_URL = "http://localhost:3000/api" TOKEN_ENV_VAR_NAME = "LOOPQUEST_USER_TOKEN" @@ -83,7 +86,9 @@ def save_token_to_env(token): def add_env_to_gitignore(): gitignore_path = os.path.join(os.getcwd(), ".gitignore") - if os.path.exists(gitignore_path) and not is_dotenv_in_gitignore(gitignore_path): + if os.path.exists(gitignore_path): + if is_dotenv_in_gitignore(gitignore_path): + return with open(gitignore_path, "a") as f: f.write("\n# Added by LoopQuest\n") f.write(".env\n") diff --git a/loopquest/schema.py b/loopquest/schema.py index a2fd533..6f29439 100644 --- a/loopquest/schema.py +++ b/loopquest/schema.py @@ -1,4 +1,3 @@ -from pydantic import BaseModel from typing import List, Optional, Union import datetime from pydantic import BaseModel diff --git a/tests/test_performance.py b/tests/test_performance.py new file mode 100644 index 0000000..f1c6a37 --- /dev/null +++ b/tests/test_performance.py @@ -0,0 +1,133 @@ +import gymnasium +import loopquest +from unittest.mock import patch +import time + +STEP = 100 + + +def test_step_local(): + name = "MountainCarContinuous-v0" + with patch("builtins.input", return_value="2"): + env = loopquest.make_env(gymnasium.make(name, render_mode="rgb_array")) + obs, info = env.reset() + step_times = [] + render_times = [] + for i in range(STEP): + action = env.action_space.sample() + start_time = time.time() + obs, reward, terminated, truncated, info = env.step(action) + elapsed_time = time.time() - start_time + step_times.append(elapsed_time) + + start_time = time.time() + env.render() + elapsed_time = time.time() - start_time + render_times.append(elapsed_time) + + if terminated or truncated: + break + env.close() + loopquest.close() + with_loopquest_step_time = sum(step_times) / len(step_times) + with_loopquest_render_time = sum(render_times) / len(render_times) + print(f"With Loopquest Average step time: {with_loopquest_step_time}") + print(f"Without Loopquest Average render time: {with_loopquest_render_time}") + + env = gymnasium.make(name, render_mode="rgb_array") + obs, info = env.reset() + step_times = [] + render_times = [] + for i in range(STEP): + action = env.action_space.sample() + start_time = time.time() + obs, reward, terminated, truncated, info = env.step(action) + elapsed_time = time.time() - start_time + step_times.append(elapsed_time) + + start_time = time.time() + env.render() + elapsed_time = time.time() - start_time + render_times.append(elapsed_time) + + if terminated or truncated: + break + env.close() + without_loopquest_step_time = sum(step_times) / len(step_times) + without_loopquest_render_time = sum(render_times) / len(render_times) + print(f"With Loopquest Average step time: {without_loopquest_step_time}") + print(f"Without Loopquest Average render time: {without_loopquest_render_time}") + + print( + f"LoopQuest step overhead: {with_loopquest_step_time - without_loopquest_step_time}" + ) + print( + f"LoopQuest render overhead: {with_loopquest_render_time - without_loopquest_render_time}" + ) + assert True + + +# def test_step_cloud(): +# name = "MountainCarContinuous-v0" +# with patch("builtins.input", return_value="1"): +# env = loopquest.make_env(gymnasium.make(name, render_mode="rgb_array")) +# obs, info = env.reset() +# step_times = [] +# render_times = [] +# for i in range(STEP): +# action = env.action_space.sample() +# start_time = time.time() +# obs, reward, terminated, truncated, info = env.step(action) +# elapsed_time = time.time() - start_time +# step_times.append(elapsed_time) + +# start_time = time.time() +# env.render() +# elapsed_time = time.time() - start_time +# render_times.append(elapsed_time) + +# if terminated or truncated: +# break +# env.close() +# loopquest.close() +# with_loopquest_step_time = sum(step_times) / len(step_times) +# with_loopquest_render_time = sum(render_times) / len(render_times) +# print(f"With Loopquest Average step time: {with_loopquest_step_time}") +# print(f"Without Loopquest Average render time: {with_loopquest_render_time}") + +# env = gymnasium.make(name, render_mode="rgb_array") +# obs, info = env.reset() +# step_times = [] +# render_times = [] +# for i in range(STEP): +# action = env.action_space.sample() +# start_time = time.time() +# obs, reward, terminated, truncated, info = env.step(action) +# elapsed_time = time.time() - start_time +# step_times.append(elapsed_time) + +# start_time = time.time() +# env.render() +# elapsed_time = time.time() - start_time +# render_times.append(elapsed_time) + +# if terminated or truncated: +# break +# env.close() +# without_loopquest_step_time = sum(step_times) / len(step_times) +# without_loopquest_render_time = sum(render_times) / len(render_times) +# print(f"With Loopquest Average step time: {without_loopquest_step_time}") +# print(f"Without Loopquest Average render time: {without_loopquest_render_time}") + +# print( +# f"LoopQuest step overhead: {with_loopquest_step_time - without_loopquest_step_time}" +# ) +# print( +# f"LoopQuest render overhead: {with_loopquest_render_time - without_loopquest_render_time}" +# ) +# assert True + + +if __name__ == "__main__": + test_step_local() + # test_step_cloud()