Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 141 additions & 3 deletions Runtime/Scripts/Internal/YieldInstruction.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using UnityEngine;

namespace LiveKit
Expand All @@ -13,10 +15,79 @@ public class YieldInstruction : CustomYieldInstruction
private volatile bool _isDone;
private volatile bool _isError;

public bool IsDone { get => _isDone; protected set => _isDone = value; }
// Sentinel published once completion has fired so any continuation registered
// afterwards runs inline instead of being silently dropped.
private static readonly Action s_completedSentinel = () => { };
private Action? _continuation;

public bool IsDone
{
get => _isDone;
protected set
{
_isDone = value;
if (value) InvokeContinuation();
}
}
public bool IsError { get => _isError; protected set => _isError = value; }

public override bool keepWaiting => !_isDone;

/// <summary>
/// Returns an awaiter so callers can <c>await</c> this instruction directly.
/// </summary>
/// <remarks>
/// The awaiter completes when <see cref="IsDone"/> becomes true. As with the
/// coroutine path, success vs. failure is inspected on the instruction itself
/// (<see cref="IsError"/> and any subclass-specific result fields); <c>GetResult</c>
/// does not throw.
/// </remarks>
public YieldInstructionAwaiter GetAwaiter() => new YieldInstructionAwaiter(this);

internal void RegisterContinuation(Action continuation)
{
// Race between completion-side (FFI thread writes sentinel) and await-side
// (registers continuation): CompareExchange decides who wrote first.
// null -> we won, completion will invoke our continuation later
// sentinel -> completion already fired; invoke inline
// other -> a second awaiter beat us here, which we don't support
var prev = Interlocked.CompareExchange(ref _continuation, continuation, null);
if (prev == null) return;
if (ReferenceEquals(prev, s_completedSentinel))
{
continuation();
return;
}
throw new InvalidOperationException(
"YieldInstruction does not support multiple awaiters; await it only once.");
}

private void InvokeContinuation()
{
var prev = Interlocked.Exchange(ref _continuation, s_completedSentinel);
if (prev != null && !ReferenceEquals(prev, s_completedSentinel))
{
prev();
}
}
}

public readonly struct YieldInstructionAwaiter : INotifyCompletion
{
private readonly YieldInstruction _instruction;

internal YieldInstructionAwaiter(YieldInstruction instruction)
{
_instruction = instruction;
}

public bool IsCompleted => _instruction.IsDone;

public void OnCompleted(Action continuation) => _instruction.RegisterContinuation(continuation);

// Intentionally a no-op. Parity with the coroutine path: callers inspect IsError
// and subclass-specific result fields on the instruction itself.
public void GetResult() { }
}

public class StreamYieldInstruction : CustomYieldInstruction
Expand All @@ -28,12 +99,31 @@ public class StreamYieldInstruction : CustomYieldInstruction
private volatile bool _isEos;
private volatile bool _isCurrentReadDone;

private static readonly Action s_completedSentinel = () => { };
private Action? _continuation;

/// <summary>
/// True if the stream has reached the end.
/// </summary>
public bool IsEos { get => _isEos; protected set => _isEos = value; }
public bool IsEos
{
get => _isEos;
protected set
{
_isEos = value;
if (value) InvokeContinuation();
}
}

internal bool IsCurrentReadDone { get => _isCurrentReadDone; set => _isCurrentReadDone = value; }
internal bool IsCurrentReadDone
{
get => _isCurrentReadDone;
set
{
_isCurrentReadDone = value;
if (value) InvokeContinuation();
}
}

public override bool keepWaiting => !_isCurrentReadDone && !_isEos;

Expand All @@ -50,6 +140,54 @@ public override void Reset()
throw new InvalidOperationException("Cannot reset after end of stream");
}
_isCurrentReadDone = false;
// Drop the sentinel published by the previous completion so the next awaiter
// can install a fresh continuation. Safe because Reset is only called after the
// previous read's await has already resumed.
Volatile.Write(ref _continuation, null);
}

/// <summary>
/// Returns an awaiter that completes when the next chunk is ready or the stream ends.
/// Call <see cref="Reset"/> between iterations to await the following chunk.
/// </summary>
public StreamYieldInstructionAwaiter GetAwaiter() => new StreamYieldInstructionAwaiter(this);

internal void RegisterContinuation(Action continuation)
{
var prev = Interlocked.CompareExchange(ref _continuation, continuation, null);
if (prev == null) return;
if (ReferenceEquals(prev, s_completedSentinel))
{
continuation();
return;
}
throw new InvalidOperationException(
"StreamYieldInstruction does not support multiple concurrent awaiters; await it once per chunk.");
}

private void InvokeContinuation()
{
var prev = Interlocked.Exchange(ref _continuation, s_completedSentinel);
if (prev != null && !ReferenceEquals(prev, s_completedSentinel))
{
prev();
}
}
}

public readonly struct StreamYieldInstructionAwaiter : INotifyCompletion
{
private readonly StreamYieldInstruction _instruction;

internal StreamYieldInstructionAwaiter(StreamYieldInstruction instruction)
{
_instruction = instruction;
}

public bool IsCompleted => _instruction.IsCurrentReadDone || _instruction.IsEos;

public void OnCompleted(Action continuation) => _instruction.RegisterContinuation(continuation);

public void GetResult() { }
}
}
30 changes: 30 additions & 0 deletions Tests/PlayMode/RoomTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Collections;
using System.Threading.Tasks;
using NUnit.Framework;
using UnityEngine;
using UnityEngine.TestTools;
using LiveKit.Proto;
using LiveKit.PlayModeTests.Utils;
Expand All @@ -26,6 +28,34 @@ public IEnumerator Connect_FailsWithInvalidUrl()
Assert.IsNotNull(context.ConnectionError, "Expected connection to fail");
}

// Parity check for the awaitable surface added in Stage 1 of the UniTask migration:
// awaiting a ConnectInstruction must observe the same IsError signal that
// yield return does. The outer driver stays IEnumerator because Unity's PlayMode
// runner does not accept [Test] async Task — the await itself is what we're
// validating, wrapped in a Task that the coroutine polls.
[UnityTest, Category("E2E")]
public IEnumerator Connect_FailsWithInvalidUrl_Awaitable()
{
LogAssert.ignoreFailingMessages = true;

using var room = new Room();
var connect = room.Connect("invalid-url", "token", new RoomOptions());
var awaitTask = AwaitInstruction(connect);

yield return new WaitUntil(() => awaitTask.IsCompleted);

LogAssert.ignoreFailingMessages = false;

Assert.IsNull(awaitTask.Exception, awaitTask.Exception?.ToString());
Assert.IsTrue(connect.IsDone, "Awaiter should not resume before IsDone");
Assert.IsTrue(connect.IsError, "Expected connection to fail");
}

private static async Task AwaitInstruction(YieldInstruction instruction)
{
await instruction;
}

[UnityTest, Category("E2E")]
public IEnumerator RoomName_MatchesProvided()
{
Expand Down
Loading