Skip to content

Commit

Permalink
allow await queueOnOtherThread + optimize performance
Browse files Browse the repository at this point in the history
  • Loading branch information
JasonXuDeveloper committed Jun 15, 2023
1 parent 680a256 commit 90fa0e4
Showing 1 changed file with 28 additions and 49 deletions.
77 changes: 28 additions & 49 deletions UnityProject/Assets/Dependencies/JEngine/Core/Manager/ThreadMgr.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
using System;
using UnityEngine;
using System.Threading;
using Unity.Collections;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
using ThreadTaskAction = JEngine.Core.ThreadMgr.ThreadTaskAwaiter.ThreadTaskAction;
using UnityEngine;

namespace JEngine.Core
{
Expand All @@ -17,12 +16,6 @@ public struct ThreadTaskAwaiter : INotifyCompletion
{
public int Index;

public struct ThreadTaskAction
{
public void* ActionPtr;
public ulong ActionGCHandleAddress;
}

public void GetResult()
{
}
Expand All @@ -32,9 +25,7 @@ public bool IsCompleted

public void OnCompleted(Action continuation)
{
ThreadTaskAction* action = ItemList + Index;
action->ActionPtr =
UnsafeUtility.PinGCObjectAndGetAddress(continuation, out action->ActionGCHandleAddress);
Actions[Index] = continuation;
}

public ThreadTaskAwaiter GetAwaiter()
Expand Down Expand Up @@ -77,40 +68,22 @@ private static void SetCompleted(int index)
{
if (UsageList[index] == 0 || index < 0 || index >= MaxSize)
return;
ThreadTaskAction* action = ItemList + index;
var actionPtr = action->ActionPtr;
var actionGCHandleAddress = action->ActionGCHandleAddress;
Action act = Actions[index];
try
{
UsageList[index] = 0;
Action act = null;
try
{
act = UnsafeMgr.Instance.FromPtr<Action>(actionPtr);
}
catch
{
//ignore
}

act?.Invoke();
}
catch (Exception e)
{
Debug.LogException(e);
}
finally
{
if (actionGCHandleAddress != 0)
UnsafeUtility.ReleaseGCObject(actionGCHandleAddress);
}
}

/// <summary>
/// 非托管内存
/// 待执行任务
/// </summary>
private static readonly ThreadTaskAction* ItemList =
(ThreadTaskAction*)UnsafeUtility.Malloc(sizeof(ThreadTaskAction) * MaxSize, 4, Allocator.Persistent);
private static readonly Action[] Actions = new Action[MaxSize];

/// <summary>
/// 使用列表
Expand All @@ -137,7 +110,6 @@ public static void Initialize()
_updateTaskId = LifeCycleMgr.Instance.AddUpdateTask(Update, () => _active);
//默认运行
Activate();
GC.AddMemoryPressure(sizeof(ThreadTaskAction) * MaxSize);
GC.AddMemoryPressure(MaxSize);
}

Expand All @@ -152,23 +124,23 @@ public static void Initialize()
private static bool _active;

/// <summary>
/// Activate loom to execute loop
/// Activate threadMgr to execute loop
/// </summary>
public static void Activate()
{
_active = true;
}

/// <summary>
/// Deactivate loom to stop loop
/// Deactivate threadMgr to stop loop
/// </summary>
public static void Deactivate()
{
_active = false;
}

/// <summary>
/// Stop the current loom, requires re-initialize to rerun
/// Stop the current threadMgr, requires re-initialize to rerun
/// </summary>
public static void Stop()
{
Expand Down Expand Up @@ -245,19 +217,26 @@ public static ThreadTaskAwaiter QueueOnMainThread(Action action, float time = 0f
/// <param name="p"></param>
/// <param name="time"></param>
/// <typeparam name="T"></typeparam>
public static void QueueOnOtherThread<T>(Action<T> action, T p, float time = 0f)
{
QueueOnOtherThread(() => action(p), time);
}
public static ThreadTaskAwaiter QueueOnOtherThread<T>(Action<T> action, T p, float time = 0f)
=> QueueOnMainThread(action, p, time);

/// <summary>
/// Queue an action on other thread to run after specific seconds
/// </summary>
/// <param name="action"></param>
/// <param name="time"></param>
public static void QueueOnOtherThread(Action action, float time = 0f)
public static ThreadTaskAwaiter QueueOnOtherThread(Action action, float time = 0f)
{
Delayed.Enqueue(new DelayedQueueItem { Time = _curTime + time, Action = action, MainThread = false });
var ret = new ThreadTaskAwaiter();
int index = GetIndex();
ret.Index = index;
var act = new Action(() =>
{
action();
SetCompleted(index);
});
Delayed.Enqueue(new DelayedQueueItem { Time = _curTime + time, Action = act, MainThread = false });
return ret;
}

/// <summary>
Expand All @@ -275,7 +254,7 @@ public static void QueueOnOtherThread(Action action, float time = 0f)
/// </summary>
static void Update()
{
_curTime = UnityEngine.Time.time;
_curTime = Time.time;
var i = Delayed.Count;
while (i-- > 0)
{
Expand All @@ -302,7 +281,7 @@ static void Update()
}
catch (Exception e)
{
UnityEngine.Debug.LogException(e);
Debug.LogException(e);
}
});
}
Expand All @@ -314,7 +293,7 @@ static void Update()
}
catch (Exception e)
{
UnityEngine.Debug.LogException(e);
Debug.LogException(e);
}
}
}
Expand Down

0 comments on commit 90fa0e4

Please sign in to comment.