Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify Python version compatibility for datetime.UTC.

The datetime import is added but datetime.UTC is used later in the file (lines 355, 358, 373, 376), which requires Python 3.11+. If the project supports older Python versions, this will cause an AttributeError.


🏁 Script executed:

#!/bin/bash
# Check Python version requirements in the project

# Check pyproject.toml, setup.py, or requirements files for Python version constraints
fd -t f 'pyproject.toml|setup.py|setup.cfg|requirements.*txt' --exec cat {} \; | rg -i 'python.*version|requires-python|python_requires'

# Check for other usages of datetime.UTC in the codebase
rg -n 'datetime\.UTC' --type py

Length of output: 1520


Replace datetime.UTC with timezone.utc for Python 3.10 compatibility
datetime.UTC is only available in Python 3.11+ (used at lines 355, 358, 373, 376); replace with datetime.timezone.utc to maintain support for requires-python >=3.10.

🤖 Prompt for AI Agents
In plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py
around lines 355, 358, 373 and 376, replace usages of datetime.UTC (which is
only in Python 3.11+) with datetime.timezone.utc to maintain Python 3.10
compatibility; ensure the module import remains "import datetime" (so
datetime.timezone is available) and update those four occurrences to
datetime.timezone.utc.

import logging
import asyncio
import os
Expand All @@ -8,12 +9,15 @@
import aiortc
from getstream import AsyncStream
from getstream.chat.async_client import ChatClient
from getstream.models import ChannelInput
from getstream.models import ChannelInput, ChannelMember
from getstream.video import rtc
from getstream.chat.async_channel import Channel
from getstream.video.async_call import Call
from getstream.video.rtc import ConnectionManager, audio_track
from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import Participant, TrackType
from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import (
Participant,
TrackType,
)
from getstream.video.rtc.track_util import PcmData
from getstream.video.rtc.tracks import SubscriptionConfig, TrackSubscriptionConfig

Expand All @@ -37,11 +41,13 @@ def __init__(self, connection: ConnectionManager):
async def close(self):
await self._connection.leave()


class StreamEdge(EdgeTransport):
"""
StreamEdge uses getstream.io's edge network. To support multiple vendors, this means we expose

"""

client: AsyncStream

def __init__(self, **kwargs):
Expand Down Expand Up @@ -71,9 +77,15 @@ def __init__(self, **kwargs):
def _get_webrtc_kind(self, track_type_int: int) -> str:
"""Get the expected WebRTC kind (audio/video) for a SFU track type."""
# Map SFU track types to WebRTC kinds
if track_type_int in (TrackType.TRACK_TYPE_AUDIO, TrackType.TRACK_TYPE_SCREEN_SHARE_AUDIO):
if track_type_int in (
TrackType.TRACK_TYPE_AUDIO,
TrackType.TRACK_TYPE_SCREEN_SHARE_AUDIO,
):
return "audio"
elif track_type_int in (TrackType.TRACK_TYPE_VIDEO, TrackType.TRACK_TYPE_SCREEN_SHARE):
elif track_type_int in (
TrackType.TRACK_TYPE_VIDEO,
TrackType.TRACK_TYPE_SCREEN_SHARE,
):
return "video"
else:
# Default to video for unknown types
Expand Down Expand Up @@ -101,7 +113,9 @@ async def _on_track_published(self, event: sfu_events.TrackPublishedEvent):
# First check if track already exists in map (e.g., from previous unpublish/republish)
if track_key in self._track_map:
self._track_map[track_key]["published"] = True
self.logger.info(f"Track marked as published (already existed): {track_key}")
self.logger.info(
f"Track marked as published (already existed): {track_key}"
)
return

# Wait for pending track to be populated (with 10 second timeout)
Expand All @@ -113,34 +127,42 @@ async def _on_track_published(self, event: sfu_events.TrackPublishedEvent):

while elapsed < timeout:
# Find pending track for this user/session with matching kind
for tid, (pending_user, pending_session, pending_kind) in list(self._pending_tracks.items()):
if (pending_user == user_id and
pending_session == session_id and
pending_kind == expected_kind):
for tid, (pending_user, pending_session, pending_kind) in list(
self._pending_tracks.items()
):
if (
pending_user == user_id
and pending_session == session_id
and pending_kind == expected_kind
):
track_id = tid
del self._pending_tracks[tid]
break

if track_id:
break

# Wait a bit before checking again
await asyncio.sleep(poll_interval)
elapsed += poll_interval

if track_id:
# Store with correct type from SFU
self._track_map[track_key] = {"track_id": track_id, "published": True}
self.logger.info(f"Trackmap published: {track_type_int} from {user_id}, track_id: {track_id} (waited {elapsed:.2f}s)")

self.logger.info(
f"Trackmap published: {track_type_int} from {user_id}, track_id: {track_id} (waited {elapsed:.2f}s)"
)

# NOW spawn TrackAddedEvent with correct type
self.events.send(events.TrackAddedEvent(
plugin_name="getstream",
track_id=track_id,
track_type=track_type_int,
user=event.participant,
user_metadata=event.participant
))
self.events.send(
events.TrackAddedEvent(
plugin_name="getstream",
track_id=track_id,
track_type=track_type_int,
user=event.participant,
user_metadata=event.participant,
)
)
else:
raise TimeoutError(
f"Timeout waiting for pending track: {track_type_int} ({expected_kind}) from user {user_id}, "
Expand All @@ -149,10 +171,12 @@ async def _on_track_published(self, event: sfu_events.TrackPublishedEvent):
f"Key: {track_key}\n"
f"Track map: {self._track_map}\n"
)

async def _on_track_removed(self, event: sfu_events.ParticipantLeftEvent | sfu_events.TrackUnpublishedEvent):

async def _on_track_removed(
self, event: sfu_events.ParticipantLeftEvent | sfu_events.TrackUnpublishedEvent
):
"""Handle track unpublished and participant left events."""
if not event.payload: # NOTE: mypy typecheck
if not event.payload: # NOTE: mypy typecheck
return

participant = event.participant
Expand All @@ -164,32 +188,36 @@ async def _on_track_removed(self, event: sfu_events.ParticipantLeftEvent | sfu_e
session_id = event.payload.session_id

# Determine which tracks to remove
if hasattr(event.payload, 'type') and event.payload is not None:
if hasattr(event.payload, "type") and event.payload is not None:
# TrackUnpublishedEvent - single track
tracks_to_remove = [event.payload.type]
event_desc = "Track unpublished"
else:
# ParticipantLeftEvent - all published tracks
tracks_to_remove = (event.participant.published_tracks if event.participant else None) or []
tracks_to_remove = (
event.participant.published_tracks if event.participant else None
) or []
event_desc = "Participant left"

track_names = [TrackType.Name(t) for t in tracks_to_remove]
self.logger.info(f"{event_desc}: {user_id}, tracks: {track_names}")

# Mark each track as unpublished and send TrackRemovedEvent
for track_type_int in tracks_to_remove:
track_key = (user_id, session_id, track_type_int)
track_info = self._track_map.get(track_key)

if track_info:
track_id = track_info["track_id"]
self.events.send(events.TrackRemovedEvent(
plugin_name="getstream",
track_id=track_id,
track_type=track_type_int,
user=participant,
user_metadata=participant
))
self.events.send(
events.TrackRemovedEvent(
plugin_name="getstream",
track_id=track_id,
track_type=track_type_int,
user=participant,
user_metadata=participant,
)
)
# Mark as unpublished instead of removing
self._track_map[track_key]["published"] = False
else:
Expand Down Expand Up @@ -240,31 +268,42 @@ async def join(self, agent: "Agent", call: Call) -> StreamConnection:
async def on_track(track_id, track_type, user):
# Store track in pending map - wait for SFU to confirm type before spawning TrackAddedEvent
self._pending_tracks[track_id] = (user.user_id, user.session_id, track_type)
self.logger.info(f"Track received from WebRTC (pending SFU confirmation): {track_id}, type: {track_type}, user: {user.user_id}")
self.logger.info(
f"Track received from WebRTC (pending SFU confirmation): {track_id}, type: {track_type}, user: {user.user_id}"
)

self.events.silent(events.AudioReceivedEvent)

@connection.on("audio")
async def on_audio_received(pcm: PcmData, participant: Participant):
self.events.send(events.AudioReceivedEvent(
plugin_name="getstream",
pcm_data=pcm,
participant=participant,
user_metadata=participant
))

await connection.__aenter__() # TODO: weird API? there should be a manual version
self.events.send(
events.AudioReceivedEvent(
plugin_name="getstream",
pcm_data=pcm,
participant=participant,
user_metadata=participant,
)
)

await (
connection.__aenter__()
) # TODO: weird API? there should be a manual version
self._connection = connection

standardize_connection = StreamConnection(connection)
return standardize_connection

def create_audio_track(self, framerate: int = 48000, stereo: bool = True):
return audio_track.AudioStreamTrack(framerate=framerate, stereo=stereo) # default to webrtc framerate
return audio_track.AudioStreamTrack(
framerate=framerate, stereo=stereo
) # default to webrtc framerate

def create_video_track(self):
return aiortc.VideoStreamTrack()

def add_track_subscriber(self, track_id: str) -> Optional[aiortc.mediastreams.MediaStreamTrack]:
def add_track_subscriber(
self, track_id: str
) -> Optional[aiortc.mediastreams.MediaStreamTrack]:
return self._connection.subscriber_pc.add_track_subscriber(track_id)

async def publish_tracks(self, audio_track, video_track):
Expand Down Expand Up @@ -301,6 +340,45 @@ async def open_demo(self, call: Call) -> str:

# Create the user in the GetStream system
await client.create_user(name=name, id=human_id)

# Ensure that both agent and user get access the demo by adding the user as member and the agent the channel creator
channel = client.chat.channel(self.channel_type, call.id)
response = await channel.get_or_create(
data=ChannelInput(
created_by_id=self.agent_user_id,
members=[
ChannelMember(
user_id=human_id,
# TODO: get rid of this when codegen for stream-py is fixed, these fields are meaningless
banned=False,
channel_role="",
created_at=datetime.datetime.now(datetime.UTC),
notifications_muted=False,
shadow_banned=False,
updated_at=datetime.datetime.now(datetime.UTC),
custom={},
)
],
)
)

if human_id not in [m.user_id for m in response.data.members]:
await channel.update(
add_members=[
ChannelMember(
user_id=human_id,
# TODO: get rid of this when codegen for stream-py is fixed, these fields are meaningless
banned=False,
channel_role="",
created_at=datetime.datetime.now(datetime.UTC),
notifications_muted=False,
shadow_banned=False,
updated_at=datetime.datetime.now(datetime.UTC),
custom={},
)
]
)

# Create user token for browser access
token = client.create_token(human_id, expiration=3600)

Expand All @@ -317,7 +395,7 @@ async def open_demo(self, call: Call) -> str:
"bitrate": 12000000,
"w": 1920,
"h": 1080,
# TODO: FPS..., aim at 60fps
"channel_type": self.channel_type,
}

url = f"{base_url}{call.id}?{urlencode(params)}"
Expand All @@ -331,26 +409,3 @@ async def open_demo(self, call: Call) -> str:
print(f"Please manually open this URL: {url}")

return url

def open_pronto(self, api_key: str, token: str, call_id: str):
"""Open browser with the video call URL."""
# Use the same URL pattern as the working workout assistant example
base_url = (
f"{os.getenv('EXAMPLE_BASE_URL', 'https://pronto-staging.getstream.io')}/join/"
)
params = {
"api_key": api_key,
"token": token,
"skip_lobby": "true",
"video_encoder": "vp8",
}

url = f"{base_url}{call_id}?{urlencode(params)}"
self.logger.info(f"🌐 Opening browser: {url}")

try:
webbrowser.open(url)
self.logger.info("✅ Browser opened successfully!")
except Exception as e:
self.logger.error(f"❌ Failed to open browser: {e}")
self.logger.info(f"Please manually open this URL: {url}")