Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add agent communication wrapper. #1881

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2d03833
Add agent communication wrapper.
Gamenot Feb 22, 2023
05e1f87
Add docstrings.
Gamenot Feb 22, 2023
9f8d428
Add header.
Gamenot Feb 22, 2023
eacbeaa
Update configuration.
Gamenot Feb 22, 2023
7efa724
Mock up interface for message
Gamenot Feb 22, 2023
b2a4c8d
Add custom block to interface option.
Gamenot Feb 22, 2023
d2081ba
Bring wrapper close to completion.
Gamenot Feb 23, 2023
96a878e
Remove agent interface todo.
Gamenot Feb 23, 2023
b7dc02e
Fix docstring test.
Gamenot Feb 23, 2023
e9a22c0
Add agent communcation example.
Gamenot Feb 23, 2023
cde2091
Add agent communication wrapper.
Gamenot Feb 22, 2023
e021a06
Add docstrings.
Gamenot Feb 22, 2023
1e78270
Add header.
Gamenot Feb 22, 2023
356a922
Update configuration.
Gamenot Feb 22, 2023
4decba0
Mock up interface for message
Gamenot Feb 22, 2023
8b0ed19
Add custom block to interface option.
Gamenot Feb 22, 2023
8960f52
Bring wrapper close to completion.
Gamenot Feb 23, 2023
5268ebd
Remove agent interface todo.
Gamenot Feb 23, 2023
af425d0
Fix docstring test.
Gamenot Feb 23, 2023
f6355a6
Add agent communcation example.
Gamenot Feb 23, 2023
a2f3a1d
Merge branch 'tucker/add_agent_communication' of https://github.com/h…
Gamenot Mar 2, 2023
842f60e
Improve action space.
Gamenot Mar 2, 2023
f98350f
Add vehicle targetting communication wrapper.
Gamenot Mar 3, 2023
931800f
make gen-header
Gamenot Mar 10, 2023
b519785
Fix docstring test
Gamenot Mar 10, 2023
b1a03ef
Fix type test.
Gamenot Mar 10, 2023
85c7582
Fix remaining typing issues.
Gamenot Mar 10, 2023
2cef72e
Remove unused import.
Gamenot Mar 10, 2023
ca0b51e
Merge branch 'master' into tucker/add_agent_communication
Gamenot Mar 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Bring wrapper close to completion.
  • Loading branch information
Gamenot committed Mar 1, 2023
commit 8960f5276347d9959d166aaa72695d25eb90aeb5
235 changes: 201 additions & 34 deletions smarts/env/gymnasium/wrappers/agent_communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,92 +19,259 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from typing import Any, Dict, NamedTuple, Optional, Sequence, Tuple
from enum import Enum
import enum
from collections import defaultdict
from enum import Enum, IntFlag, unique
from functools import lru_cache
from typing import Any, Dict, List, NamedTuple, Optional, Set, Tuple

import gymnasium as gym
import numpy as np

from smarts.env.gymnasium.hiway_env_v1 import HiWayEnvV1


@unique
class Bands(IntFlag):
L0 = enum.auto()
L1 = enum.auto()
L2 = enum.auto()
L3 = enum.auto()
L4 = enum.auto()
L5 = enum.auto()
L6 = enum.auto()
L7 = enum.auto()
L8 = enum.auto()
L9 = enum.auto()
L10 = enum.auto()
L11 = enum.auto()
ALL = L0 | L1 | L2 | L3 | L4 | L5 | L6 | L7 | L8 | L9 | L10 | L11


@unique
class Sensitivity(Enum):
LOW = 0
STANDARD = 1
HIGH = 2


class Transmitter(NamedTuple):
"""A configuration utility to set up agents to transmit messages."""
class Header(NamedTuple):
"""A header modeled loosely after an email.

# TODO MTA: move this to agent interface.
Args:
channel (str): The channel this message is on.
sender (str): The name of the sender.
sender_type (str): The type of actor the sender is.
cc (Set[str]):
The actors and aliases this should be sent publicly to. All cc recipients see the cc.
bcc (Set[str]):
The actors and aliases this should be sent privately to. The recipient only sees self
in bcc list.
format (str): The format of this message.
"""

channel: str
sender: str
sender_type: str
cc: Set[str]
bcc: Set[str]
format: str


class Message(NamedTuple):
"""The message data.

Args:
content (Any): The content of this message.
"""

content: Any
# size: uint

bands: Sequence[int]

class V2XTransmitter(NamedTuple):
"""A configuration utility to set up agents to transmit messages."""

bands: Bands
range: float
mission: bool
position: bool
heading: bool
breaking: bool
throttle: bool
steering: bool
custom_message_blob_size: int
# available_channels: List[str]


class Receiver(NamedTuple):
class V2XReceiver(NamedTuple):
"""A configuratoin utility to set up agent to receive messages."""

# TODO MTA: move this to agent interface.
bands: Sequence[int]
sensitivity: Sensitivity = Sensitivity.HIGH
bands: Bands
aliases: List[str]
whitelist_channels: Optional[List[str]] = None
blacklist_channels: Set[str] = {}
sensitivity: Sensitivity = Sensitivity.STANDARD


class MessagePasser(gym.Wrapper):
"""This wrapper augments the observations and actions to require passing messages from agents."""

def __init__(self, env: gym.Env, message_size=125000):
def __init__(
self,
env: gym.Env,
message_config: Dict[str, Tuple[V2XTransmitter, V2XReceiver]],
max_message_bytes=125000,
):
super().__init__(env)
self._message_config = message_config
# map alias to agent ids (multiple agents can be under the same alias)
self._alias_mapping = defaultdict(list)
for a_id, (_, receiver) in message_config.items():
for alias in receiver.aliases:
self._alias_mapping[alias].append(a_id)

assert isinstance(env, HiWayEnvV1)
o_action_space: gym.spaces.Dict = self.env.action_space
msg_space = (
gym.spaces.Box(low=0, high=256, shape=(message_size,), dtype=np.uint8),
gym.spaces.Box(low=0, high=256, shape=(max_message_bytes,), dtype=np.uint8),
)
self.action_space = gym.spaces.Dict(
{
a_id: gym.spaces.Tuple(
(
action_space,
base_action_space,
msg_space,
)
)
for a_id, action_space in o_action_space.spaces.items()
for a_id, base_action_space in o_action_space.spaces.items()
}
)
o_observation_space: gym.spaces.Dict = self.env.observation_space
self._transmission_space = gym.spaces.Sequence(
gym.spaces.Tuple(
(
gym.spaces.Tuple(
(
gym.spaces.Text(20), # channel
gym.spaces.Text(30), # sender
gym.spaces.Text(10), # sender_type
gym.spaces.Sequence(gym.spaces.Text(30)), # cc
gym.spaces.Sequence(gym.spaces.Text(30)), # bcc
gym.spaces.Text(10), # format
)
),
gym.spaces.Tuple((msg_space,)),
)
)
)
self.observation_space = gym.spaces.Dict(
{
"agents": o_observation_space,
"messages": gym.spaces.Dict(
{a_id: msg_space for a_id in o_action_space}
),
a_id: gym.spaces.Dict(
dict(
**obs,
transmissions=self._transmission_space,
)
)
for a_id, obs in o_observation_space.items()
}
)

@lru_cache
def resolve_alias(self, alias):
return set(self._alias_mapping[alias])

def step(self, action):
std_actions = {}
msgs = {}
for a_id, ma in action.items():
std_actions[a_id] = ma[0]
msgs[a_id] = ma[1]
# step
std_actions = {a_id: act for a_id, (act, _) in action}
observations, rewards, terms, truncs, infos = self.env.step(std_actions)

msgs = defaultdict(list)

# filter recipients for active
cached_active_filter = lru_cache(lambda a: a.intersection(observations.keys()))

# filter recipients by band
## compare transmitter
cached_band_filter = lru_cache(
lambda sender, recipients: (
r
for r in recipients
if self._message_config[sender][0].bands
| self._message_config[r][1].bands
)
)

# filter recipients that do not listen to the channel
accepts_channel = lru_cache(
lambda channel, recipient: (
(not self._message_config[recipient][1].whitelist_channels)
or (channel in self._message_config[recipient][1].whitelist_channels)
)
and channel not in self._message_config[recipient][1].blacklist_channels
)
cached_channel_filter = lru_cache(
lambda channel, recipients: (
r for r in recipients if accepts_channel(channel, r)
)
)

## filter recipients by distance
## Includes all
## TODO ensure this works on all formatting types
# cached_dist_comp = lambda sender, receiver: obs[sender]["position"].dot(obs[receiver]["position"])
# cached_distance_filter = lru_cache(lambda sender, receivers: (
# r for r in receivers if cached_distance_filter
# ))

# compress filters
general_filter = lambda header, initial_recipients: (
cc
for recipients in map(self.resolve_alias, initial_recipients)
for cc in cached_channel_filter(
header.channel,
cached_band_filter(header.sender, cached_active_filter(recipients)),
)
)

# Organise the messages to their recipients
for a_id, (_, msg) in action.items():
msg: List[Tuple[Header, Message]] = msg
for header, message in msg:
header: Header = header
message: Message = message

# expand the recipients
cc_recipients = set(general_filter(header, header.cc))
bcc_recipients = set(general_filter(header, header.bcc))
cc_header = header._replace(cc=cc_recipients)

# associate the messages to the recipients
for recipient in (
cc_recipients - bcc_recipients
): # privacy takes priority
msgs[recipient].append(
(cc_header._replace(bcc=set()), message) # clear bcc
)
for recipient in bcc_recipients:
msgs[recipient].append(
(
cc_header._replace(
bcc={recipient}
), # leave recipient in bcc
message,
)
)

obs, rewards, terms, truncs, infos = self.env.step(std_actions)
obs_with_msgs = {
"agents": obs,
"messages": {a_id: msgs for a_id in obs},
a_id: dict(
**obs,
transmissions=msgs[a_id],
)
for a_id, obs in observations.items()
}
return obs_with_msgs, rewards, terms, truncs, infos

def reset(
self, *, seed: Optional[int] = None, options: Optional[Dict[str, Any]] = None
) -> Tuple[Any, Dict[str, Any]]:
obs, info = super().reset(seed=seed, options=options)
return {"agents": obs, "messages": self.observation_space["messages"].sample()}
observations, info = super().reset(seed=seed, options=options)
obs_with_msgs = {
a_id: dict(**obs, transmissions=self._transmission_space.sample(0))
for a_id, obs in observations.items()
}
return obs_with_msgs, info