Skip to content

Commit

Permalink
Add threadpool and user stats
Browse files Browse the repository at this point in the history
  • Loading branch information
jinyu-loopmind committed Sep 15, 2023
1 parent c6f7615 commit c644833
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 10 deletions.
6 changes: 4 additions & 2 deletions loopquest/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import getpass
import webbrowser
from .ui import choose_instance
from .crud import get_cloud_user_id
from .crud import get_cloud_user_id, send_instance_choice_stats
from .utils import is_docker_installed
from .private_api import (
is_local_instance_initialized,
Expand All @@ -18,10 +18,12 @@
)


def init(local=None):
def init(local: bool | None = None):
if local is None:
local = choose_instance() == "local"

send_instance_choice_stats(CLOUD_BACKEND_URL, local)

# TODO: add an api to record the choice of the instance.
if local:
if is_docker_installed():
Expand Down
8 changes: 7 additions & 1 deletion loopquest/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,18 @@ def make_request(method: str, url: str, **kwargs):
headers["Authorization"] = f"Bearer {os.getenv(TOKEN_ENV_VAR_NAME)}"
kwargs["headers"] = headers

# print(headers)
response = requests.request(method, url, **kwargs)
response.raise_for_status()
return response


def send_instance_choice_stats(backend_url: str, is_local: bool):
res = requests.post(
f"{backend_url}/user_stats/instance_choice", params={"is_local": is_local}
)
res.raise_for_status()


def construct_environment_info(env: gymnasium.Env, user_id: str):
def get_size_of_gym_space(space):
return max(sum(space.shape), 1)
Expand Down
44 changes: 39 additions & 5 deletions loopquest/gym_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from .utils import jsonize_dict, cast_to_list, flatten_and_cast_to_list
from .api import get_frontend_url, get_backend_url, get_user_id
from concurrent.futures import ThreadPoolExecutor


class LoopquestGymWrapper(gymnasium.Wrapper):
Expand All @@ -23,6 +24,7 @@ def __init__(
env: gymnasium.Env,
experiment_name: str = "default",
experiment_description: str = "",
max_workers: int = 10,
):
super().__init__(env)
self.current_step = 0
Expand All @@ -49,6 +51,8 @@ def __init__(
print(
f"Check the experiment progress at: {self.frontend_url}/experiment/{self.exp_id}"
)
# TODO: evaluate if message queue is better than thread pool.
self.executor = ThreadPoolExecutor(max_workers=max_workers)

@property
def exp_id(self):
Expand All @@ -66,7 +70,9 @@ def step(self, action):
self.current_step += 1
observation, reward, terminated, truncated, info = self.env.step(action)
info_json_str = jsonize_dict(info)
create_step(
# TODO: we can do batch upload here instead of uploading one by one.
self.executor.submit(
create_step,
self.backend_url,
StepCreate(
id=self.step_id,
Expand All @@ -86,6 +92,26 @@ def step(self, action):
info=info_json_str,
),
)
# create_step(
# self.backend_url,
# StepCreate(
# id=self.step_id,
# experiment_id=self.exp_id,
# episode=self.current_episode,
# step=self.current_step,
# observation=flatten_and_cast_to_list(
# self.observation_space, observation
# ),
# action=cast_to_list(action),
# reward=reward,
# prev_observation=flatten_and_cast_to_list(
# self.observation_space, self.prev_observation
# ),
# termnated=terminated,
# truncated=truncated,
# info=info_json_str,
# ),
# )
self.prev_observation = observation
return observation, reward, terminated, truncated, info

Expand All @@ -104,7 +130,8 @@ def reset(self):
# self.current_episode += 1

info_json_str = jsonize_dict(info)
create_step(
self.executor.submit(
create_step,
self.backend_url,
StepCreate(
id=self.step_id,
Expand All @@ -127,6 +154,7 @@ def close(self):
ExperimentUpdate(status=ExperimentStatus.FINISHED),
)
self.env.close()
self.executor.shutdown()

def render(self):
# TODO: implement this.
Expand All @@ -140,13 +168,19 @@ def render(self):
)
elif self.render_mode == "rgb_array":
rgb_array = self.env.render()
step = upload_rgb_as_image(self.backend_url, self.step_id, rgb_array)
self.executor.submit(
upload_rgb_as_image, self.backend_url, self.step_id, rgb_array
)
return rgb_array
elif self.render_mode == "rgb_array_list":
rgb_array_list = self.env.render()
for i, rgb_array in enumerate(rgb_array_list):
upload_rgb_as_image(
self.backend_url, self.step_id, rgb_array, image_id=i
self.executor.submit(
upload_rgb_as_image,
self.backend_url,
self.step_id,
rgb_array,
image_id=i,
)
return rgb_array_list

Expand Down
7 changes: 6 additions & 1 deletion loopquest/private_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
LOCAL_BACKEND_URL = "http://localhost:8000"
CLOUD_FRONTEND_URL = "https://open.loopquest.ai"
CLOUD_BACKEND_URL = "https://open.loopquest.ai/api"
# For local development
# CLOUD_FRONTEND_URL = "http://localhost:3000"
# CLOUD_BACKEND_URL = "http://localhost:3000/api"
TOKEN_ENV_VAR_NAME = "LOOPQUEST_USER_TOKEN"


Expand Down Expand Up @@ -83,7 +86,9 @@ def save_token_to_env(token):

def add_env_to_gitignore():
gitignore_path = os.path.join(os.getcwd(), ".gitignore")
if os.path.exists(gitignore_path) and not is_dotenv_in_gitignore(gitignore_path):
if os.path.exists(gitignore_path):
if is_dotenv_in_gitignore(gitignore_path):
return
with open(gitignore_path, "a") as f:
f.write("\n# Added by LoopQuest\n")
f.write(".env\n")
Expand Down
1 change: 0 additions & 1 deletion loopquest/schema.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from pydantic import BaseModel
from typing import List, Optional, Union
import datetime
from pydantic import BaseModel
Expand Down
133 changes: 133 additions & 0 deletions tests/test_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import gymnasium
import loopquest
from unittest.mock import patch
import time

STEP = 100


def test_step_local():
name = "MountainCarContinuous-v0"
with patch("builtins.input", return_value="2"):
env = loopquest.make_env(gymnasium.make(name, render_mode="rgb_array"))
obs, info = env.reset()
step_times = []
render_times = []
for i in range(STEP):
action = env.action_space.sample()
start_time = time.time()
obs, reward, terminated, truncated, info = env.step(action)
elapsed_time = time.time() - start_time
step_times.append(elapsed_time)

start_time = time.time()
env.render()
elapsed_time = time.time() - start_time
render_times.append(elapsed_time)

if terminated or truncated:
break
env.close()
loopquest.close()
with_loopquest_step_time = sum(step_times) / len(step_times)
with_loopquest_render_time = sum(render_times) / len(render_times)
print(f"With Loopquest Average step time: {with_loopquest_step_time}")
print(f"Without Loopquest Average render time: {with_loopquest_render_time}")

env = gymnasium.make(name, render_mode="rgb_array")
obs, info = env.reset()
step_times = []
render_times = []
for i in range(STEP):
action = env.action_space.sample()
start_time = time.time()
obs, reward, terminated, truncated, info = env.step(action)
elapsed_time = time.time() - start_time
step_times.append(elapsed_time)

start_time = time.time()
env.render()
elapsed_time = time.time() - start_time
render_times.append(elapsed_time)

if terminated or truncated:
break
env.close()
without_loopquest_step_time = sum(step_times) / len(step_times)
without_loopquest_render_time = sum(render_times) / len(render_times)
print(f"With Loopquest Average step time: {without_loopquest_step_time}")
print(f"Without Loopquest Average render time: {without_loopquest_render_time}")

print(
f"LoopQuest step overhead: {with_loopquest_step_time - without_loopquest_step_time}"
)
print(
f"LoopQuest render overhead: {with_loopquest_render_time - without_loopquest_render_time}"
)
assert True


# def test_step_cloud():
# name = "MountainCarContinuous-v0"
# with patch("builtins.input", return_value="1"):
# env = loopquest.make_env(gymnasium.make(name, render_mode="rgb_array"))
# obs, info = env.reset()
# step_times = []
# render_times = []
# for i in range(STEP):
# action = env.action_space.sample()
# start_time = time.time()
# obs, reward, terminated, truncated, info = env.step(action)
# elapsed_time = time.time() - start_time
# step_times.append(elapsed_time)

# start_time = time.time()
# env.render()
# elapsed_time = time.time() - start_time
# render_times.append(elapsed_time)

# if terminated or truncated:
# break
# env.close()
# loopquest.close()
# with_loopquest_step_time = sum(step_times) / len(step_times)
# with_loopquest_render_time = sum(render_times) / len(render_times)
# print(f"With Loopquest Average step time: {with_loopquest_step_time}")
# print(f"Without Loopquest Average render time: {with_loopquest_render_time}")

# env = gymnasium.make(name, render_mode="rgb_array")
# obs, info = env.reset()
# step_times = []
# render_times = []
# for i in range(STEP):
# action = env.action_space.sample()
# start_time = time.time()
# obs, reward, terminated, truncated, info = env.step(action)
# elapsed_time = time.time() - start_time
# step_times.append(elapsed_time)

# start_time = time.time()
# env.render()
# elapsed_time = time.time() - start_time
# render_times.append(elapsed_time)

# if terminated or truncated:
# break
# env.close()
# without_loopquest_step_time = sum(step_times) / len(step_times)
# without_loopquest_render_time = sum(render_times) / len(render_times)
# print(f"With Loopquest Average step time: {without_loopquest_step_time}")
# print(f"Without Loopquest Average render time: {without_loopquest_render_time}")

# print(
# f"LoopQuest step overhead: {with_loopquest_step_time - without_loopquest_step_time}"
# )
# print(
# f"LoopQuest render overhead: {with_loopquest_render_time - without_loopquest_render_time}"
# )
# assert True


if __name__ == "__main__":
test_step_local()
# test_step_cloud()

0 comments on commit c644833

Please sign in to comment.