Skip to content

Commit 244e2fc

Browse files
authored
fix av sync example (livekit#338)
1 parent 201b04d commit 244e2fc

File tree

2 files changed

+101
-28
lines changed

2 files changed

+101
-28
lines changed

examples/video-stream/video_play.py

Lines changed: 58 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -56,31 +56,38 @@ def __init__(self, media_file: Union[str, Path]) -> None:
5656
def info(self) -> MediaInfo:
5757
return self._info
5858

59-
async def stream_video(self) -> AsyncIterable[rtc.VideoFrame]:
59+
async def stream_video(self) -> AsyncIterable[tuple[rtc.VideoFrame, float]]:
6060
"""Streams video frames from the media file in an endless loop."""
61-
for av_frame in self._video_container.decode(video=0):
61+
for i, av_frame in enumerate(self._video_container.decode(video=0)):
6262
# Convert video frame to RGBA
6363
frame = av_frame.to_rgb().to_ndarray()
6464
frame_rgba = np.ones((frame.shape[0], frame.shape[1], 4), dtype=np.uint8)
6565
frame_rgba[:, :, :3] = frame
66-
yield rtc.VideoFrame(
67-
width=frame.shape[1],
68-
height=frame.shape[0],
69-
type=rtc.VideoBufferType.RGBA,
70-
data=frame_rgba.tobytes(),
66+
yield (
67+
rtc.VideoFrame(
68+
width=frame.shape[1],
69+
height=frame.shape[0],
70+
type=rtc.VideoBufferType.RGBA,
71+
data=frame_rgba.tobytes(),
72+
),
73+
av_frame.time,
7174
)
7275

73-
async def stream_audio(self) -> AsyncIterable[rtc.AudioFrame]:
76+
async def stream_audio(self) -> AsyncIterable[tuple[rtc.AudioFrame, float]]:
7477
"""Streams audio frames from the media file in an endless loop."""
7578
for av_frame in self._audio_container.decode(audio=0):
7679
# Convert audio frame to raw int16 samples
7780
frame = av_frame.to_ndarray().T # Transpose to (samples, channels)
7881
frame = (frame * 32768).astype(np.int16)
79-
yield rtc.AudioFrame(
80-
data=frame.tobytes(),
81-
sample_rate=self.info.audio_sample_rate,
82-
num_channels=frame.shape[1],
83-
samples_per_channel=frame.shape[0],
82+
duration = len(frame) / self.info.audio_sample_rate
83+
yield (
84+
rtc.AudioFrame(
85+
data=frame.tobytes(),
86+
sample_rate=self.info.audio_sample_rate,
87+
num_channels=frame.shape[1],
88+
samples_per_channel=frame.shape[0],
89+
),
90+
av_frame.time + duration,
8491
)
8592

8693
def reset(self):
@@ -102,6 +109,7 @@ async def main(room: rtc.Room, room_name: str, media_path: str):
102109
api.VideoGrants(
103110
room_join=True,
104111
room=room_name,
112+
agent=True,
105113
)
106114
)
107115
.to_jwt()
@@ -121,7 +129,7 @@ async def main(room: rtc.Room, room_name: str, media_path: str):
121129
media_info = streamer.info
122130

123131
# Create video and audio sources/tracks
124-
queue_size_ms = 1000 # 1 second
132+
queue_size_ms = 1000
125133
video_source = rtc.VideoSource(
126134
width=media_info.video_width,
127135
height=media_info.video_height,
@@ -157,26 +165,54 @@ async def main(room: rtc.Room, room_name: str, media_path: str):
157165
)
158166

159167
async def _push_frames(
160-
stream: AsyncIterable[rtc.VideoFrame | rtc.AudioFrame],
168+
stream: AsyncIterable[tuple[rtc.VideoFrame | rtc.AudioFrame, float]],
161169
av_sync: rtc.AVSynchronizer,
162170
):
163-
async for frame in stream:
164-
await av_sync.push(frame)
171+
async for frame, timestamp in stream:
172+
await av_sync.push(frame, timestamp)
165173
await asyncio.sleep(0)
166174

175+
async def _log_fps(av_sync: rtc.AVSynchronizer):
176+
start_time = asyncio.get_running_loop().time()
177+
while True:
178+
await asyncio.sleep(2)
179+
wall_time = asyncio.get_running_loop().time() - start_time
180+
diff = av_sync.last_video_time - av_sync.last_audio_time
181+
logger.info(
182+
f"fps: {av_sync.actual_fps:.2f}, wall_time: {wall_time:.3f}s, "
183+
f"video_time: {av_sync.last_video_time:.3f}s, "
184+
f"audio_time: {av_sync.last_audio_time:.3f}s, diff: {diff:.3f}s"
185+
)
186+
167187
try:
168188
while True:
169189
streamer.reset()
170-
video_task = asyncio.create_task(
171-
_push_frames(streamer.stream_video(), av_sync)
172-
)
173-
audio_task = asyncio.create_task(
174-
_push_frames(streamer.stream_audio(), av_sync)
190+
191+
video_stream = streamer.stream_video()
192+
audio_stream = streamer.stream_audio()
193+
194+
# read the head frames and push them at the same time
195+
first_video_frame, video_timestamp = await video_stream.__anext__()
196+
first_audio_frame, audio_timestamp = await audio_stream.__anext__()
197+
logger.info(
198+
f"first video duration: {1/media_info.video_fps:.3f}s, "
199+
f"first audio duration: {first_audio_frame.duration:.3f}s"
175200
)
201+
await av_sync.push(first_video_frame, video_timestamp)
202+
await av_sync.push(first_audio_frame, audio_timestamp)
203+
204+
video_task = asyncio.create_task(_push_frames(video_stream, av_sync))
205+
audio_task = asyncio.create_task(_push_frames(audio_stream, av_sync))
206+
207+
log_fps_task = asyncio.create_task(_log_fps(av_sync))
176208

177209
# wait for both tasks to complete
178210
await asyncio.gather(video_task, audio_task)
179211
await av_sync.wait_for_playout()
212+
213+
# clean up
214+
av_sync.reset()
215+
log_fps_task.cancel()
180216
logger.info("playout finished")
181217
finally:
182218
await streamer.aclose()

livekit-rtc/livekit/rtc/synchronizer.py

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from .audio_source import AudioSource
1010
from .video_source import VideoSource
1111

12+
1213
logger = logging.getLogger(__name__)
1314

1415

@@ -43,6 +44,9 @@ def __init__(
4344
self._max_delay_tolerance_ms = _max_delay_tolerance_ms
4445

4546
self._stopped = False
47+
# the time of the last video/audio frame captured
48+
self._last_video_time: float = 0
49+
self._last_audio_time: float = 0
4650

4751
self._video_queue_max_size = int(
4852
self._video_fps * self._video_queue_size_ms / 1000
@@ -51,7 +55,7 @@ def __init__(
5155
# ensure queue is bounded if queue size is specified
5256
self._video_queue_max_size = max(1, self._video_queue_max_size)
5357

54-
self._video_queue = asyncio.Queue[VideoFrame](
58+
self._video_queue = asyncio.Queue[tuple[VideoFrame, Optional[float]]](
5559
maxsize=self._video_queue_max_size
5660
)
5761
self._fps_controller = _FPSController(
@@ -60,28 +64,47 @@ def __init__(
6064
)
6165
self._capture_video_task = asyncio.create_task(self._capture_video())
6266

63-
async def push(self, frame: Union[VideoFrame, AudioFrame]) -> None:
67+
async def push(
68+
self, frame: Union[VideoFrame, AudioFrame], timestamp: Optional[float] = None
69+
) -> None:
70+
"""Push a frame to the synchronizer
71+
72+
Args:
73+
frame: The video or audio frame to push.
74+
timestamp: (optional) The timestamp of the frame, for logging purposes for now.
75+
For AudioFrame, it should be the end time of the frame.
76+
"""
6477
if isinstance(frame, AudioFrame):
6578
await self._audio_source.capture_frame(frame)
79+
if timestamp is not None:
80+
self._last_audio_time = timestamp
6681
return
6782

68-
await self._video_queue.put(frame)
83+
await self._video_queue.put((frame, timestamp))
6984

7085
async def clear_queue(self) -> None:
7186
self._audio_source.clear_queue()
7287
while not self._video_queue.empty():
7388
await self._video_queue.get()
89+
self._video_queue.task_done()
7490

7591
async def wait_for_playout(self) -> None:
7692
"""Wait until all video and audio frames are played out."""
77-
await self._audio_source.wait_for_playout()
78-
await self._video_queue.join()
93+
await asyncio.gather(
94+
self._audio_source.wait_for_playout(),
95+
self._video_queue.join(),
96+
)
97+
98+
def reset(self) -> None:
99+
self._fps_controller.reset()
79100

80101
async def _capture_video(self) -> None:
81102
while not self._stopped:
82-
frame = await self._video_queue.get()
103+
frame, timestamp = await self._video_queue.get()
83104
async with self._fps_controller:
84105
self._video_source.capture_frame(frame)
106+
if timestamp is not None:
107+
self._last_video_time = timestamp
85108
self._video_queue.task_done()
86109

87110
async def aclose(self) -> None:
@@ -93,6 +116,16 @@ async def aclose(self) -> None:
93116
def actual_fps(self) -> float:
94117
return self._fps_controller.actual_fps
95118

119+
@property
120+
def last_video_time(self) -> float:
121+
"""The time of the last video frame captured"""
122+
return self._last_video_time
123+
124+
@property
125+
def last_audio_time(self) -> float:
126+
"""The time of the last audio frame played out"""
127+
return self._last_audio_time - self._audio_source.queued_duration
128+
96129

97130
class _FPSController:
98131
def __init__(
@@ -123,6 +156,10 @@ async def __aenter__(self) -> None:
123156
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
124157
self.after_process()
125158

159+
def reset(self) -> None:
160+
self._next_frame_time = None
161+
self._send_timestamps.clear()
162+
126163
async def wait_next_process(self) -> None:
127164
"""Wait until it's time for the next frame.
128165

0 commit comments

Comments
 (0)