Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions examples/rl/README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
# Reinforcement Learning (RL) Examples

This folder contains scenarios that employ reinforcement learning. MARO's RL toolkit makes it possible to use a common workflow on different scenarios, so long as the necessary scenario-related components are provided. The workflow consists of Python scripts for running the necessary components in single-threaded and distributed modes under ``workflows``. General scenario-independent settings can be found in ``config.yml``. The scenario can be chosen by setting the ``scenario`` field in this file.
This folder contains scenarios that employ reinforcement learning. MARO's RL toolkit provides scenario-agnostic workflows to run a variety of scenarios in single-thread, multi-process or distributed modes.

## How to Run

Scripts to run the common workflow in docker containers are in ``scripts/docker``. Start by choosing "single", "sync" or "async" for the ``mode`` field in ``config.yml`` to run a scenario in single-threaded, synchronous and asynchronous modes, respectively. Go to this folder and execute ``bash run.sh`` to launch the program and Docker Compose will take care of starting the necessary containers. Note that the script will build the docker image first if it has not already been built by running ``bash build.sh``. When the program is finished, be sure to run ``bash kill.sh`` to clean up the containers and remove the network.
The ``main.py`` script can be used to run the scenarios under ``examples/rl`` or any user-defined scenario that provides the necessary components (see the section below for details) . To choose a scenario, edit ``SCENARIO_PATH`` in the script to point to the desired scenario folder. You may also edit the rest of the config variables to your own preference. Note that this script runs in single-thread mode only.
To run a scenario in multi-process mode on a local machine, you will need to use the CLI tool (which requires MARO installation from the source). Start by creating a configuration file (.yml)that follows the template ``maro/maro/rl/workflows/config/template.yml`` to specify the scenario-independent settings. Then use the command ``maro local run [-c] path/to/your/config`` to run in containerized (with ``-c``) or non-containerized (with "-c") environments.

## Create Your Own Scenarios

The workflow scripts make it easy to create your own scenarios by only supplying the necessary ingredients without worrying about putting them together. It is necessary to create an ``__init__.py`` under your scenario folder (so that it can be treated as a package) and expose all ingredients in it. The ingredients include:
You can create your own scenarios by supplying the necessary ingredients without worrying about putting them together in a workflow. It is necessary to create an ``__init__.py`` under your scenario folder (so that it can be treated as a package) and expose all ingredients in it. The ingredients include:
* Definitions of policies and agent-to-policy mappings. These definitions should be provided as a dictionary named ``policy_creator`` that maps a name to a function that takes the name and returns a policy instance with that name. The agent-to-policy mapping should be provided as a dictionary named ``agent2policy``.
* Definitions of training algorithms. These definitions should be provided as a dictionary named ``trainer_creator`` that maps a name to a function that takes the name and returns a trainer instance with that name.
* Definitions of state, action and reward shaping logic pertinent to your simulator and policies.
These definitions should be encapsulated in ``get_env_sampler``, which is a function that takes no parameters and returns an environment sampler;
* Definitions of policies and agent-to-policy mappings. These definitions should be provided as a dictionary named ``policy_func_index`` that maps the name of each policy to a function that creates a policy instance with that name (the policy name should be the function's only parameter). The agent-to-policy mapping should be provided as a dictionary named ``agent2policy``.

It is possible to have customized routines invoked at the end of a roll-out episode or episode segment. These routines usually involve processing or rendering information collected during roll-out. To do this, first implement the ``post_step`` method in your environment sampler class and populate the ``tracker`` member with whatever information you wish to track during roll-out. Then create two functions, ``post_collect`` and ``post_evaluate``, to process the information contained in each ``tracker`` and expose them in the scenario folder's ``__init__.py``. These functions are used as callbacks in the main learning loop and executed at the end of each training or evaluation episode. See ``cim/callbacks.py`` for a simple example of how to create these functions.
These definitions should be encapsulated in ``env_sampler_creator``, which is a function that takes ``policy_creator`` and returns an environment sampler;
It is possible to have customized routines invoked at the end of a roll-out episode or episode segment. These routines usually involve processing and / or rendering information collected during roll-out. To do this, first implement the ``post_step`` method in your environment sampler class to record whatever information you wish to keep track of during roll-out. Then create functions named ``post_collect`` and ``post_evaluate`` to process the information and expose them in the scenario folder's ``__init__.py``. These functions are used as callbacks in the main learning loop and executed at the end of each training or evaluation episode. See ``cim/callbacks.py`` for a simple example of how to create these functions.
7 changes: 4 additions & 3 deletions examples/rl/cim/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Container Inventory Management

This example demonstrates the use of MARO's RL toolkit to optimize container inventory management. The scenario consists of a set of ports, each acting as a learning agent, and vessels that transfer empty containers among them. Each port must decide 1) whether to load or discharge containers when a vessel arrives and 2) how many containers to be loaded or discharged. The objective is to minimize the overall container shortage over a certain period of time. In this folder you can find:
* ``config.py``, which contains environment and policy configurations for the scenario;
* ``config.py``, which contains general configurations for the scenario;
* ``algorithms``, which contains configurations for the Actor-Critic, DQN and discrete-MADDPG algorithms, including network configurations;
* ``env_sampler.py``, which defines state, action and reward shaping in the ``CIMEnvSampler`` class;
* ``policies.py``, which defines the Q-net for DQN and the network components for Actor-Critic;
* ``policy_trainer.py``, which contains a registry for the policies and algorithms defined in ``algorithms``;
* ``callbacks.py``, which defines routines to be invoked at the end of training or evaluation episodes.

The scripts for running the learning workflows can be found under ``examples/rl/workflows``. See ``README`` under ``examples/rl`` for details about the general applicability of these scripts. We recommend that you follow this example to write your own scenarios.
See ``README.md`` under ``examples/rl`` for details about running the single-threaded learning workflow. We recommend that you follow this example to write your own scenarios.
25 changes: 13 additions & 12 deletions examples/rl/cim/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@
# Licensed under the MIT license.


def post_collect(trackers: list, ep: int, segment: int) -> None:
def post_collect(info_list: list, ep: int, segment: int) -> None:
# print the env metric from each rollout worker
for tracker in trackers:
print(f"env summary (episode {ep}, segment {segment}): {tracker['env_metric']}")
for info in info_list:
print(info)
print(f"env summary (episode {ep}, segment {segment}): {info['env_metric']}")

# print the average env metric
if len(trackers) > 1:
metric_keys, num_trackers = trackers[0]["env_metric"].keys(), len(trackers)
avg_metric = {key: sum(tr["env_metric"][key] for tr in trackers) / num_trackers for key in metric_keys}
if len(info_list) > 1:
metric_keys, num_envs = info_list[0]["env_metric"].keys(), len(info_list)
avg_metric = {key: sum(info["env_metric"][key] for info in info_list) / num_envs for key in metric_keys}
print(f"average env summary (episode {ep}, segment {segment}): {avg_metric}")


def post_evaluate(trackers: list, ep: int) -> None:
def post_evaluate(info_list: list, ep: int) -> None:
# print the env metric from each rollout worker
for tracker in trackers:
print(f"env summary (episode {ep}): {tracker['env_metric']}")
for info in info_list:
print(f"env summary (episode {ep}): {info['env_metric']}")

# print the average env metric
if len(trackers) > 1:
metric_keys, num_trackers = trackers[0]["env_metric"].keys(), len(trackers)
avg_metric = {key: sum(tr["env_metric"][key] for tr in trackers) / num_trackers for key in metric_keys}
if len(info_list) > 1:
metric_keys, num_envs = info_list[0]["env_metric"].keys(), len(info_list)
avg_metric = {key: sum(info["env_metric"][key] for info in info_list) / num_envs for key in metric_keys}
print(f"average env summary (episode {ep}): {avg_metric}")
2 changes: 1 addition & 1 deletion examples/rl/cim/env_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _get_reward(self, env_action_dict: Dict[Any, object], event: DecisionEvent,
return {agent_id: reward for agent_id, reward in zip(ports, rewards)}

def _post_step(self, cache_element: CacheElement, reward: Dict[Any, float]) -> None:
self._tracker["env_metric"] = self._env.metrics
self._info["env_metric"] = self._env.metrics


agent2policy = {agent: f"{algorithm}_{agent}.policy" for agent in Env(**env_conf).agent_idx_list}
Expand Down
4 changes: 2 additions & 2 deletions examples/rl/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
from maro.rl.workflows.scenario import Scenario
from maro.utils import Logger


# config variables
SCENARIO_PATH = "cim"
NUM_EPISODES = 50
NUM_STEPS = -1
NUM_STEPS = None
CHECKPOINT_PATH = os.path.join(os.getcwd(), "checkpoints")
CHECKPOINT_INTERVAL = 5
EVAL_SCHEDULE = [10, 20, 30, 40, 50]
Expand Down
9 changes: 5 additions & 4 deletions examples/rl/vm_scheduling/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

A virtual machine (VM) scheduler is a cloud computing service component responsible for providing compute resources to satisfy user demands. A good resource allocation policy should aim to optimize several metrics at the same time, such as user wait time, profit, energy consumption and physical machine (PM) overload. Many commercial cloud providers use rule-based policies. Alternatively, the policy can also be optimized using reinforcement learning (RL) techniques, which involves simulating with historical data. This example demonstrates how DQN and Actor-Critic algorithms can be applied to this scenario. In this folder, you can find:

* ``config.py``, which contains environment and policy configurations.
* ``config.py``, which contains general configurations for the scenario;
* ``algorithms``, which contains configurations for the Actor-Critic, DQN algorithms, including network configurations;
* ``env_sampler.py``, which defines state, action and reward shaping in the ``VMEnvSampler`` class;
* ``policies.py``, which defines the Q-net for DQN and the network components for Actor-Critic.
* ``callbacks.py``, which contains routines to be invoked at the end of a training or evaluation episode.
* ``policy_trainer.py``, which contains a registry for the policies and algorithms defined in ``algorithms``;
* ``callbacks.py``, which defines routines to be invoked at the end of training or evaluation episodes.

The scripts to run the learning workflows can be found under ``examples/rl/workflows``. See ``README`` under ``examples/rl`` for details about the general applicability of these scripts. We recommend that you follow this example to write your own scenarios.
See ``README.md`` under ``examples/rl`` for details about running the single-threaded learning workflow. We recommend that you follow this example to write your own scenarios.


# Some Comments About the Results
Expand Down
30 changes: 15 additions & 15 deletions examples/rl/vm_scheduling/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,32 @@
makedirs(plt_path, exist_ok=True)


def post_collect(trackers, ep, segment):
def post_collect(info_list, ep, segment):
# print the env metric from each rollout worker
for tracker in trackers:
print(f"env summary (episode {ep}, segment {segment}): {tracker['env_metric']}")
for info in info_list:
print(f"env summary (episode {ep}, segment {segment}): {info['env_metric']}")

# print the average env metric
if len(trackers) > 1:
metric_keys, num_trackers = trackers[0]["env_metric"].keys(), len(trackers)
avg_metric = {key: sum(tr["env_metric"][key] for tr in trackers) / num_trackers for key in metric_keys}
if len(info_list) > 1:
metric_keys, num_envs = info_list[0]["env_metric"].keys(), len(info_list)
avg_metric = {key: sum(tr["env_metric"][key] for tr in info_list) / num_envs for key in metric_keys}
print(f"average env metric (episode {ep}, segment {segment}): {avg_metric}")


def post_evaluate(trackers, ep):
def post_evaluate(info_list, ep):
# print the env metric from each rollout worker
for tracker in trackers:
print(f"env summary (evaluation episode {ep}): {tracker['env_metric']}")
for info in info_list:
print(f"env summary (evaluation episode {ep}): {info['env_metric']}")

# print the average env metric
if len(trackers) > 1:
metric_keys, num_trackers = trackers[0]["env_metric"].keys(), len(trackers)
avg_metric = {key: sum(tr["env_metric"][key] for tr in trackers) / num_trackers for key in metric_keys}
if len(info_list) > 1:
metric_keys, num_envs = info_list[0]["env_metric"].keys(), len(info_list)
avg_metric = {key: sum(tr["env_metric"][key] for tr in info_list) / num_envs for key in metric_keys}
print(f"average env metric (evaluation episode {ep}): {avg_metric}")

for tracker in trackers:
core_requirement = tracker["actions_by_core_requirement"]
action_sequence = tracker["action_sequence"]
for info in info_list:
core_requirement = info["actions_by_core_requirement"]
action_sequence = info["action_sequence"]
# plot action sequence
fig = plt.figure(figsize=(40, 32))
ax = fig.add_subplot(1, 1, 1)
Expand Down
18 changes: 9 additions & 9 deletions examples/rl/vm_scheduling/env_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,19 @@ def _get_allocation_reward(self, event: DecisionPayload, alpha: float, beta: flo
return (alpha + beta * vm_unit_price * min(self._durations - event.frame_index, event.remaining_buffer_time))

def _post_step(self, cache_element: CacheElement, reward: Dict[Any, float]):
self._tracker["env_metric"] = {k: v for k, v in self._env.metrics.items() if k != "total_latency"}
self._tracker["env_metric"]["latency_due_to_agent"] = self._env.metrics["total_latency"].due_to_agent
self._tracker["env_metric"]["latency_due_to_resource"] = self._env.metrics["total_latency"].due_to_resource
if "actions_by_core_requirement" not in self._tracker:
self._tracker["actions_by_core_requirement"] = defaultdict(list)
if "action_sequence" not in self._tracker:
self._tracker["action_sequence"] = []
self._info["env_metric"] = {k: v for k, v in self._env.metrics.items() if k != "total_latency"}
self._info["env_metric"]["latency_due_to_agent"] = self._env.metrics["total_latency"].due_to_agent
self._info["env_metric"]["latency_due_to_resource"] = self._env.metrics["total_latency"].due_to_resource
if "actions_by_core_requirement" not in self._info:
self._info["actions_by_core_requirement"] = defaultdict(list)
if "action_sequence" not in self._info:
self._info["action_sequence"] = []

action = cache_element.action_dict["AGENT"]
if cache_element.state:
mask = cache_element.state[num_features:]
self._tracker["actions_by_core_requirement"][cache_element.event.vm_cpu_cores_requirement].append([action, mask])
self._tracker["action_sequence"].append(action)
self._info["actions_by_core_requirement"][cache_element.event.vm_cpu_cores_requirement].append([action, mask])
self._info["action_sequence"].append(action)


agent2policy = {"AGENT": f"{algorithm}.policy"}
Expand Down
Loading