Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backend Group Filter Infrastructure #141

Merged
merged 49 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
fd5e79e
fixed error caused by stdout in pipeline and removed previous fix wit…
aykutaaykut Jul 31, 2023
88d3b40
extended Filter base class with group_filter attr.
aykutaaykut Aug 1, 2023
82de62e
commented out a debug log
aykutaaykut Aug 1, 2023
70c1496
added GROUP_FILTER as a message type
aykutaaykut Aug 1, 2023
8e3f2b5
implemented initial BoW filter template
aykutaaykut Aug 1, 2023
f29d621
added inital GROUP_FILTER event handler template
aykutaaykut Aug 1, 2023
458cc87
extended connection api to handle GROUP_FILTER messages
aykutaaykut Aug 1, 2023
0e2f3f3
added BoW filter button
aykutaaykut Aug 1, 2023
89b8ecb
Merge branch 'main' into backend-aa-sync-score
aykutaaykut Aug 1, 2023
39c0d64
updated requirements with pyts for bow algorithm
aykutaaykut Aug 2, 2023
60bb1cf
added group filter communication mechanism between participants and e…
aykutaaykut Aug 2, 2023
9004495
wip: implemented bow algorithm with derivatives
aykutaaykut Aug 2, 2023
82fad38
wip: updated preprocessing to be modular
aykutaaykut Aug 7, 2023
6ee8df3
wip: implemented oasis algorithm and changed data keys from frame tim…
aykutaaykut Aug 10, 2023
7b9203d
wip: implemented alignment and oasis algorithm execution
aykutaaykut Aug 10, 2023
fdf7fd0
removed filter api endpoint for group filters
aykutaaykut Sep 5, 2023
2aebb18
updated npm packages
aykutaaykut Sep 5, 2023
96ef5be
removed group filter related attributes and functions
aykutaaykut Sep 16, 2023
f2b6af3
implemented group filter base class
aykutaaykut Sep 16, 2023
0fde82f
implemented group filter aggregator
aykutaaykut Sep 16, 2023
418cd54
implemented factory and utility functions for group filter and aggreg…
aykutaaykut Sep 16, 2023
a3de6f3
introduced a new message type for group filters
aykutaaykut Sep 16, 2023
d176353
removed old group filter implementation
aykutaaykut Sep 16, 2023
587ad81
added group filter related data and updated validation functions
aykutaaykut Sep 16, 2023
601a5c9
implemented set group filter and aggregator functions
aykutaaykut Sep 16, 2023
1e07005
implemented group filter extension for connection and track handler
aykutaaykut Sep 16, 2023
ce0ee15
updated user factory functions to support group handler passing to th…
aykutaaykut Sep 16, 2023
eb5d916
added SET_GROUP_FILTERS message to remove group filters in None button
aykutaaykut Sep 16, 2023
459a097
Merge branch 'main' into backend-aa-sync-score
aykutaaykut Sep 16, 2023
8b9fe6c
solved flake8 errors
aykutaaykut Sep 16, 2023
878bcf9
implemented a template group filter as an example
aykutaaykut Sep 16, 2023
64938bc
updated gf aggregator to enabla all participants joining aggregation …
aykutaaykut Sep 21, 2023
455a5f9
updated __init__ file in group_filters package to automatically impor…
aykutaaykut Sep 21, 2023
2d60a37
manually add gfs in the __init__ file
aykutaaykut Sep 21, 2023
dd0e39f
variable names are updated to make them more self explanatory
aykutaaykut Sep 21, 2023
ec9293b
removed unnecessary variables
aykutaaykut Sep 22, 2023
30dde5e
deleted sync_score filter/folder under filters directory
aykutaaykut Oct 4, 2023
de6e442
removed unused attribute
aykutaaykut Oct 4, 2023
567af97
removed sync_score from filters's init
aykutaaykut Oct 4, 2023
218b761
bug fix related to experimenter disconnects after the group filters a…
aykutaaykut Oct 4, 2023
9ec1f61
updated template group filter with a static align_data method and add…
aykutaaykut Oct 4, 2023
a5bd55d
added more comments
aykutaaykut Oct 4, 2023
76f479f
Merge branch 'main' into backend-aa-sync-score
aykutaaykut Oct 4, 2023
d0e44e0
bug fix for experimenter disconnecting after group filters are disabl…
aykutaaykut Oct 4, 2023
31b0cb4
solved some flake8 related formatting issues
aykutaaykut Oct 4, 2023
f1754d8
fixed a typo
aykutaaykut Oct 4, 2023
e0d3477
implemented OpenFace ROI optimization related changes
aykutaaykut Oct 12, 2023
bbce82a
added environment variables for better performance
aykutaaykut Oct 14, 2023
bc9277a
updated flake8 configuration for black formatting compatibility
aykutaaykut Oct 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
implemented set group filter and aggregator functions
  • Loading branch information
aykutaaykut committed Sep 16, 2023
commit 601a5c90fea7b78add622ec4c58d133152ef43b8
103 changes: 103 additions & 0 deletions backend/experiment/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
from hub.exceptions import ErrorDictException
from session.data.session import SessionData

from group_filters.group_filter_aggregator import GroupFilterAggregator
from filters.filter_dict import FilterDict
from group_filters import group_filter_aggregator_factory
import asyncio

if TYPE_CHECKING:
from users import Experimenter, Participant

Expand All @@ -30,6 +35,8 @@ class Experiment(AsyncIOEventEmitter):
session: SessionData
_experimenters: list[Experimenter]
_participants: dict[str, Participant]
_audio_group_filter_aggregators: dict[str, GroupFilterAggregator]
_video_group_filter_aggregators: dict[str, GroupFilterAggregator]

def __init__(self, session: SessionData):
"""Start a new Experiment.
Expand All @@ -48,6 +55,8 @@ def __init__(self, session: SessionData):
self._participants = {}
self._logger.info(f"Experiment created: {self}")
self.session.creation_time = timestamp()
self._audio_group_filter_aggregators = {}
self._video_group_filter_aggregators = {}

def __str__(self) -> str:
"""Get string representation of this Experiment."""
Expand Down Expand Up @@ -422,3 +431,97 @@ def _set_state(self, state: ExperimentState) -> None:

self._state = state
self.emit("state", self._state)

async def set_video_group_filter_aggregators(
self, group_filter_configs: list[FilterDict], ports: list[int]
) -> None:
old_group_filter_aggregators = self._video_group_filter_aggregators

self._video_group_filter_aggregators = {}
coroutines = []
for config, port in zip(group_filter_configs, ports):
filter_id = config["id"]
# Reuse existing filter for matching id and type.
if (
filter_id in old_group_filter_aggregators
and old_group_filter_aggregators[filter_id]._group_filter.config["type"]
== config["type"]
):
self._video_group_filter_aggregators[
filter_id
] = old_group_filter_aggregators[filter_id]

self._video_group_filter_aggregators[
filter_id
]._group_filter.set_config(config)
self._video_group_filter_aggregators[filter_id].delete_data()
else:
# Create a new filter for configs with empty id.
self._video_group_filter_aggregators[
filter_id
] = group_filter_aggregator_factory.create_group_filter_aggregator(
"video", config, port
)

# Cleanup old group filter aggregators
for (
filter_id,
old_group_filter_aggregator,
) in old_group_filter_aggregators.items():
if filter_id not in self._video_group_filter_aggregators:
coroutines.append(old_group_filter_aggregator.cleanup())

# Run new group filter aggregators
for (
new_group_filter_aggregator
) in self._video_group_filter_aggregators.values():
coroutines.append(new_group_filter_aggregator.run())

await asyncio.gather(*coroutines)

async def set_audio_group_filter_aggregators(
self, group_filter_configs: list[FilterDict], ports: list[int]
) -> None:
old_group_filter_aggregators = self._audio_group_filter_aggregators

self._audio_group_filter_aggregators = {}
coroutines = []
for config, port in zip(group_filter_configs, ports):
filter_id = config["id"]
# Reuse existing filter for matching id and type.
if (
filter_id in old_group_filter_aggregators
and old_group_filter_aggregators[filter_id]._group_filter.config["type"]
== config["type"]
):
self._audio_group_filter_aggregators[
filter_id
] = old_group_filter_aggregators[filter_id]

self._audio_group_filter_aggregators[
filter_id
]._group_filter.set_config(config)
self._audio_group_filter_aggregators[filter_id].delete_data()
else:
# Create a new filter for configs with empty id.
self._audio_group_filter_aggregators[
filter_id
] = group_filter_aggregator_factory.create_group_filter_aggregator(
"audio", config, port
)

# Cleanup old group filter aggregators
for (
filter_id,
old_group_filter_aggregator,
) in old_group_filter_aggregators.items():
if filter_id not in self._audio_group_filter_aggregators:
coroutines.append(old_group_filter_aggregator.cleanup())

# Run new group filter aggregators
for (
new_group_filter_aggregator
) in self._audio_group_filter_aggregators.values():
coroutines.append(new_group_filter_aggregator.run())

await asyncio.gather(*coroutines)
98 changes: 64 additions & 34 deletions backend/users/experimenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import Any, Coroutine

from filters import filter_utils
from group_filters import group_filter_utils
from session.data.session import is_valid_session
from custom_types.chat_message import is_valid_chatmessage
from custom_types.kick import is_valid_kickrequest
Expand Down Expand Up @@ -79,7 +80,7 @@ def __init__(self, experimenter_id: str, hub: _h.Hub) -> None:
self.on_message("BAN_PARTICIPANT", self._handle_ban)
self.on_message("MUTE", self._handle_mute)
self.on_message("SET_FILTERS", self._handle_set_filters)
self.on_message("GROUP_FILTER", self._handle_group_filters)
self.on_message("SET_GROUP_FILTERS", self._handle_set_group_filters)

def __str__(self) -> str:
"""Get string representation of this experimenter.
Expand Down Expand Up @@ -745,36 +746,65 @@ async def _handle_set_filters(self, data: Any) -> MessageDict:
)
return MessageDict(type="SUCCESS", data=success)

async def _handle_group_filters(self, data: Any) -> None:
self._logger.debug(f"Data received: {data}")
participant = data["participant"]

self.participants[participant] = data

if len(self.participants) > 1:
for p1 in self.participants.keys():
for p2 in self.participants.keys():
if p1 != p2:
p1_frame = int(list(self.participants[p1].keys())[0])
p2_frame = int(list(self.participants[p2].keys())[0])

if p1_frame > p2_frame:
p1_data_aligned, p2_data_aligned = self.align(
list(self.participants[p1].values())[0],
list(self.participants[p2].values())[0],
)
else:
p2_data_aligned, p1_data_aligned = self.align(
list(self.participants[p2].values())[0],
list(self.participants[p1].values())[0],
)

self.apply_oasis()

def align(self, longer_data, shorter_data):
for l, s in zip(longer_data, shorter_data):
if len(l) != len(s):
for _ in range(len(l) - len(s)):
l.pop(0)

return longer_data, shorter_data
async def _handle_set_group_filters(self, data: Any) -> MessageDict:
if not group_filter_utils.is_valid_set_group_filters_request(data):
raise ErrorDictException(
code=400,
type="INVALID_DATATYPE",
description="Message data is not a valid SetGroupFiltersRequest.",
)

video_group_filters = data["video_group_filters"]
video_group_filter_ports = []
for _ in video_group_filters:
video_group_filter_ports.append(group_filter_utils.find_an_available_port())

audio_group_filters = data["audio_group_filters"]
audio_group_filter_ports = []
for _ in audio_group_filters:
audio_group_filter_ports.append(group_filter_utils.find_an_available_port())

experiment = self.get_experiment_or_raise("Failed to set filters.")
coroutines = []

# Update experiment with group filter aggregator
coroutines.append(
experiment.set_video_group_filter_aggregators(
video_group_filters, video_group_filter_ports
)
)
coroutines.append(
experiment.set_audio_group_filter_aggregators(
audio_group_filters, audio_group_filter_ports
)
)

# Update participant data
for p_data in experiment.session.participants.values():
p_data.video_group_filters = video_group_filters
p_data.audio_group_filters = audio_group_filters

# Update connected Participants
for p in experiment.participants.values():
if p.connection is not None:
coroutines.append(
p.set_video_group_filters(
video_group_filters, video_group_filter_ports
)
)
coroutines.append(
p.set_audio_group_filters(
audio_group_filters, audio_group_filter_ports
)
)
await asyncio.gather(*coroutines)

# Notify Experimenters connected to the hub about the data change
message = MessageDict(type="SESSION_CHANGE", data=experiment.session.asdict())
await self._hub.send_to_experimenters(message)

# Respond with success message
success = SuccessDict(
type="SET_GROUP_FILTERS", description="Successfully changed group filters."
)
return MessageDict(type="SUCCESS", data=success)
38 changes: 36 additions & 2 deletions backend/users/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ async def set_video_filters(self, filters: list[FilterDict]) -> None:
async def _set_video_filters_later(_):
if self._connection is None:
self._logger.error(
"__set_video_filters_later callback failed, _connection is "
"_set_video_filters_later callback failed, _connection is "
"None."
)
return
Expand All @@ -513,12 +513,46 @@ async def set_audio_filters(self, filters: list[FilterDict]) -> None:
async def _set_audio_filters_later(_):
if self._connection is None:
self._logger.error(
"__set_audio_filters_later callback failed, _connection is "
"_set_audio_filters_later callback failed, _connection is "
"None."
)
return
await self._connection.set_audio_filters(filters)

async def set_video_group_filters(
self, group_filters: list[FilterDict], ports: list[int]
) -> None:
if self._connection is not None:
await self._connection.set_video_group_filters(group_filters, ports)
else:

@self.once("connection_set")
async def _set_video_group_filters_later(_):
if self._connection is None:
self._logger.error(
"_set_video_group_filters_later callback failed, _connection is "
"None."
)
return
await self._connection.set_video_group_filters(group_filters, ports)

async def set_audio_group_filters(
self, group_filters: list[FilterDict], ports: list[int]
) -> None:
if self._connection is not None:
await self._connection.set_audio_group_filters(group_filters, ports)
else:

@self.once("connection_set")
async def _set_audio_group_filters_later(_):
if self._connection is None:
self._logger.error(
"_set_audio_group_filters_later callback failed, _connection is "
"None."
)
return
await self._connection.set_audio_group_filters(group_filters, ports)

async def start_recording(self) -> None:
"""Start recording for this user."""
if self._connection is not None:
Expand Down