Skip to content

Commit

Permalink
fix(pu): fix reward type bug in 2048, os import bug in cartpole (#304)
Browse files Browse the repository at this point in the history
  • Loading branch information
puyuan1996 authored Dec 9, 2024
1 parent 9205e9f commit b9dce4e
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 90 deletions.
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
DI-engine>=0.4.7
gymnasium[atari]
numpy>=1.22.4
gymnasium[atari]==0.28.0
numpy==1.24.1
pympler
minigrid
moviepy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
manager=dict(shared_memory=False, ),
),
policy=dict(
use_wandb=True,
use_wandb=False,
model=dict(
observation_shape=4,
action_space_size=2,
Expand Down
8 changes: 5 additions & 3 deletions zoo/classic_control/cartpole/envs/cartpole_lightzero_env.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import copy
import os
import random
from datetime import datetime
from typing import Union, Optional, Dict
from typing import Union, Dict

import gymnasium as gym
import matplotlib.pyplot as plt
import numpy as np
from ding.envs import BaseEnv, BaseEnvTimestep
from ding.envs import ObsPlusPrevActRewWrapper
from ding.torch_utils import to_ndarray
from ding.utils import ENV_REGISTRY
from easydict import EasyDict
import matplotlib.pyplot as plt
from matplotlib import animation
from matplotlib.animation import PillowWriter


@ENV_REGISTRY.register('cartpole_lightzero')
Expand Down Expand Up @@ -180,7 +182,7 @@ def animate(i):
patch.set_data(frames[i])

anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
anim.save(path, writer='imagemagick', fps=20)
anim.save(path, writer=PillowWriter(fps=20))

def close(self) -> None:
"""
Expand Down
4 changes: 2 additions & 2 deletions zoo/game_2048/envs/game_2048_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,9 @@ def step(self, action):

# Convert the reward to ndarray
if self.reward_type == 'merged_tiles_plus_log_max_tile_num':
reward = to_ndarray([reward_merged_tiles_plus_log_max_tile_num]).astype(np.float32)
reward = to_ndarray(reward_merged_tiles_plus_log_max_tile_num).astype(np.float32)
elif self.reward_type == 'raw':
reward = to_ndarray([reward]).astype(np.float32)
reward = to_ndarray(reward).astype(np.float32)

# Prepare information to return
info = {"raw_reward": raw_reward, "current_max_tile_num": self.highest()}
Expand Down
188 changes: 106 additions & 82 deletions zoo/game_2048/envs/test_game_2048_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,57 +6,55 @@


@pytest.mark.unittest
class TestGame2048():
def setup(self) -> None:
class TestGame2048:
def setup_method(self, method) -> None:
# Configuration for the Game2048 environment
cfg = EasyDict(dict(
env_id="game_2048",
# (str) The render mode. Options are 'None', 'state_realtime_mode', 'image_realtime_mode' or 'image_savefile_mode'. If None, then the game will not be rendered.
render_mode=None,
replay_format='gif',
replay_name_suffix='eval',
replay_path=None,
act_scale=True,
channel_last=False,
# (str) The type of observation to use. Options are 'raw_board', 'raw_encoded_board', and 'dict_encoded_board'.
obs_type='raw_encoded_board',
reward_type='raw', # options=['raw', 'merged_tiles_plus_log_max_tile_num']
reward_normalize=False,
reward_norm_scale=100,
max_tile=int(2 ** 16), # 2**11=2048, 2**16=65536
delay_reward_step=0,
prob_random_agent=0.,
max_episode_steps=int(1e6),
is_collect=True,
ignore_legal_actions=True,
need_flatten=False,
num_of_possible_chance_tile=2,
possible_tiles=np.array([2, 4]),
tile_probabilities=np.array([0.9, 0.1]),
))
cfg = EasyDict({
'env_id': "game_2048",
'render_mode': None, # Options: 'None', 'state_realtime_mode', 'image_realtime_mode', 'image_savefile_mode'
'replay_format': 'gif',
'replay_name_suffix': 'eval',
'replay_path': None,
'act_scale': True,
'channel_last': False,
'obs_type': 'raw_encoded_board', # Options: 'raw_board', 'raw_encoded_board', 'dict_encoded_board'
'reward_type': 'raw', # Options: ['raw', 'merged_tiles_plus_log_max_tile_num']
'reward_normalize': False,
'reward_norm_scale': 100,
'max_tile': int(2 ** 16), # 2**11=2048, 2**16=65536
'delay_reward_step': 0,
'prob_random_agent': 0.,
'max_episode_steps': int(1e6),
'is_collect': True,
'ignore_legal_actions': True,
'need_flatten': False,
'num_of_possible_chance_tile': 2,
'possible_tiles': np.array([2, 4]),
'tile_probabilities': np.array([0.9, 0.1]),
})
# Create a Game2048 environment that will be used in the following tests.
self.env = Game2048Env(cfg)

# Test the initialization of the Game2048 environment.
def test_initialization(self):
assert isinstance(self.env, Game2048Env)
assert isinstance(self.env, Game2048Env), "Environment is not an instance of Game2048Env"

# Test the reset method of the Game2048 environment.
# Ensure that the shape of the observation is as expected.
def test_reset(self):
obs = self.env.reset()
assert obs.shape == (4, 4, 16)
assert obs.shape == (16, 4, 4), f"Expected observation shape (16, 4, 4), got {obs.shape}"

# Test the step method of the Game2048 environment.
# Ensure that the shape of the observation, the type of the reward,
# the type of the done flag and the type of the info are as expected.
def test_step_shape(self):
self.env.reset()
obs, reward, done, info = self.env.step(1)
assert obs.shape == (4, 4, 16)
assert isinstance(reward, np.ndarray)
assert isinstance(done, bool)
assert isinstance(info, dict)
assert obs.shape == (16, 4, 4), f"Expected observation shape (16, 4, 4), got {obs.shape}"
assert isinstance(reward, np.ndarray), f"Expected reward type np.ndarray, got {type(reward)}"
assert isinstance(done, bool), f"Expected done type bool, got {type(done)}"
assert isinstance(info, dict), f"Expected info type dict, got {type(info)}"

# Test the render method of the Game2048 environment.
# Ensure that the shape of the rendered image is as expected.
Expand All @@ -71,88 +69,114 @@ def test_render(self):
# Ensure that the random seed is set correctly.
def test_seed(self):
self.env.seed(0)
assert self.env.np_random.randn() != np.random.randn()
assert self.env.np_random.choice([0,1,2,3]) != np.random.choice([0,1,2,3])

def test_step_action_case1(self):
init_board = np.array([[8, 4, 0, 0],
[2, 0, 0, 0],
[2, 0, 0, 0],
[2, 4, 2, 0]])
init_board = np.array([
[8, 4, 0, 0],
[2, 0, 0, 0],
[2, 0, 0, 0],
[2, 4, 2, 0]
])

# Test action 0 (Assuming it represents 'up' move)
self.env.reset(init_board=init_board, add_random_tile_flag=False)
obs, reward, done, info = self.env.step(0)
expected_board_up = np.array([[8, 8, 2, 0],
[4, 0, 0, 0],
[2, 0, 0, 0],
[0, 0, 0, 0]])
assert np.array_equal(self.env.board, expected_board_up)
expected_board_up = np.array([
[8, 8, 2, 0],
[4, 0, 0, 0],
[2, 0, 0, 0],
[0, 0, 0, 0]
])
np.testing.assert_array_equal(self.env.board, expected_board_up, "Board state after 'up' action is incorrect")

# Test action 1 (Assuming it represents 'right' move)
self.env.reset(init_board=init_board, add_random_tile_flag=False)
obs, reward, done, info = self.env.step(1)
expected_board_right = np.array([[0, 0, 8, 4],
[0, 0, 0, 2],
[0, 0, 0, 2],
[0, 2, 4, 2]])
assert np.array_equal(self.env.board, expected_board_right)
expected_board_right = np.array([
[0, 0, 8, 4],
[0, 0, 0, 2],
[0, 0, 0, 2],
[0, 2, 4, 2]
])
np.testing.assert_array_equal(self.env.board, expected_board_right,
"Board state after 'right' action is incorrect")

# Test action 2 (Assuming it represents 'down' move)
self.env.reset(init_board=init_board, add_random_tile_flag=False)
obs, reward, done, info = self.env.step(2)
expected_board_down = np.array([[0, 0, 0, 0],
[8, 0, 0, 0],
[2, 0, 0, 0],
[4, 8, 2, 0]])
assert np.array_equal(self.env.board, expected_board_down)
expected_board_down = np.array([
[0, 0, 0, 0],
[8, 0, 0, 0],
[2, 0, 0, 0],
[4, 8, 2, 0]
])
np.testing.assert_array_equal(self.env.board, expected_board_down,
"Board state after 'down' action is incorrect")

# Test action 3 (Assuming it represents 'left' move)
self.env.reset(init_board=init_board, add_random_tile_flag=False)
obs, reward, done, info = self.env.step(3)
expected_board_left = np.array([[8, 4, 0, 0],
[2, 0, 0, 0],
[2, 0, 0, 0],
[2, 4, 2, 0]])
assert np.array_equal(self.env.board, expected_board_left)
expected_board_left = np.array([
[8, 4, 0, 0],
[2, 0, 0, 0],
[2, 0, 0, 0],
[2, 4, 2, 0]
])
np.testing.assert_array_equal(self.env.board, expected_board_left,
"Board state after 'left' action is incorrect")

def test_step_action_case2(self):
init_board = np.array([[8, 4, 2, 0],
[2, 0, 2, 0],
[2, 2, 4, 0],
[2, 4, 2, 0]])
init_board = np.array([
[8, 4, 2, 0],
[2, 0, 2, 0],
[2, 2, 4, 0],
[2, 4, 2, 0]
])

# Test action 0 (Assuming it represents 'up' move)
self.env.reset(init_board=init_board, add_random_tile_flag=False)
obs, reward, done, info = self.env.step(0)
expected_board_up = np.array([[8, 4, 4, 0],
[4, 2, 4, 0],
[2, 4, 2, 0],
[0, 0, 0, 0]])
assert np.array_equal(self.env.board, expected_board_up)
expected_board_up = np.array([
[8, 4, 4, 0],
[4, 2, 4, 0],
[2, 4, 2, 0],
[0, 0, 0, 0]
])
np.testing.assert_array_equal(self.env.board, expected_board_up, "Board state after 'up' action is incorrect")

# Test action 1 (Assuming it represents 'right' move)
self.env.reset(init_board=init_board, add_random_tile_flag=False)
obs, reward, done, info = self.env.step(1)
expected_board_right = np.array([[0, 8, 4, 2],
[0, 0, 0, 4],
[0, 0, 4, 4],
[0, 2, 4, 2]])
assert np.array_equal(self.env.board, expected_board_right)
expected_board_right = np.array([
[0, 8, 4, 2],
[0, 0, 0, 4],
[0, 0, 4, 4],
[0, 2, 4, 2]
])
np.testing.assert_array_equal(self.env.board, expected_board_right,
"Board state after 'right' action is incorrect")

# Test action 2 (Assuming it represents 'down' move)
self.env.reset(init_board=init_board, add_random_tile_flag=False)
obs, reward, done, info = self.env.step(2)
expected_board_down = np.array([[0, 0, 0, 0],
[8, 4, 4, 0],
[2, 2, 4, 0],
[4, 4, 2, 0]])
assert np.array_equal(self.env.board, expected_board_down)
expected_board_down = np.array([
[0, 0, 0, 0],
[8, 4, 4, 0],
[2, 2, 4, 0],
[4, 4, 2, 0]
])
np.testing.assert_array_equal(self.env.board, expected_board_down,
"Board state after 'down' action is incorrect")

# Test action 3 (Assuming it represents 'left' move)
self.env.reset(init_board=init_board, add_random_tile_flag=False)
obs, reward, done, info = self.env.step(3)
expected_board_left = np.array([[8, 4, 2, 0],
[4, 0, 0, 0],
[4, 4, 0, 0],
[2, 4, 2, 0]])
assert np.array_equal(self.env.board, expected_board_left)
expected_board_left = np.array([
[8, 4, 2, 0],
[4, 0, 0, 0],
[4, 4, 0, 0],
[2, 4, 2, 0]
])
np.testing.assert_array_equal(self.env.board, expected_board_left,
"Board state after 'left' action is incorrect")

0 comments on commit b9dce4e

Please sign in to comment.