Skip to content

Commit

Permalink
Merge pull request #4 from Laurensvanrun/3-eoserror-too-many-posts-we…
Browse files Browse the repository at this point in the history
…re-made-to-a-semaphore

Fix to prevent posting to often to the semaphore
  • Loading branch information
Laurensvanrun authored Nov 8, 2024
2 parents 2db4240 + 86be226 commit df25ad8
Showing 1 changed file with 44 additions and 34 deletions.
78 changes: 44 additions & 34 deletions Core/Types/Next.Core.Promises.pas
Original file line number Diff line number Diff line change
Expand Up @@ -261,14 +261,16 @@ TPromiseThread = class(TThread)
end;
private
FController: ITask;
FSignalController: TSemaphore;
FSignalController: TEvent;
FSignalControllerRevision: Int64;
FCancel: TEvent;

FSignal: TSemaphore;
FThreads: TList<TPromiseThread>;
FIdleThreads: Integer;

FExceptionLogger: IPromiseSchedulerExceptionLogger;
FThreadPoolIsMaxSize: Boolean;

procedure AddThread;

Expand All @@ -277,6 +279,7 @@ TPromiseThread = class(TThread)

procedure SetLogger(ALogger: IPromiseSchedulerExceptionLogger);
procedure LogFatalException(APromiseClassname: String; AException: Exception);
procedure SignalControllerIf;
protected
FPromises: TList<IPromiseAccess>;

Expand All @@ -292,7 +295,6 @@ TPromiseThread = class(TThread)
procedure Start;
procedure Schedule(APromise: IPromiseAccess);
procedure Signal;
function ThreadCount: Integer;
end;
{$ENDREGION}

Expand Down Expand Up @@ -868,12 +870,10 @@ procedure TPromiseScheduler.Schedule(APromise: IPromiseAccess);
System.TMonitor.Exit(FPromises);
end;
Signal();

FSignalController.Release;
SignalControllerIf();
end;

procedure TPromiseScheduler.SetLogger(
ALogger: IPromiseSchedulerExceptionLogger);
procedure TPromiseScheduler.SetLogger(ALogger: IPromiseSchedulerExceptionLogger);
begin
System.TMonitor.Enter(Self);
try
Expand All @@ -887,43 +887,42 @@ procedure TPromiseScheduler.AddThread;
begin
FThreads.Add(TPromiseThread.Create(Self));
FThreads.Last.NameThreadForDebugging('Promise worker - #' + FThreads.Count.ToString(), FThreads.Last.ThreadID);
end;

function TPromiseScheduler.ThreadCount: Integer;
begin
Result := FThreads.Count;
FThreadPoolIsMaxSize := (FThreads.Count >= MAX_POOL_SIZE);
end;

procedure TPromiseScheduler.ControlPool;
var
LEvents: Array[0..1] of THandle;
LWaitResult: Cardinal;
LCancel: Boolean;
LThread: TPromiseThread;
i: Integer;
var LEvents: Array[0..1] of THandle;
begin
LEvents[0] := FCancel.Handle;
LEvents[1] := FSignalController.Handle;
LCancel := False;
var LCancel := False;

for i := 0 to MIN_POOL_SIZE - 1 do
for var i := 0 to MIN_POOL_SIZE - 1 do
AddThread();

while (not LCancel) do begin
LWaitResult := WaitForMultipleObjectsEx(2, @LEvents, False, INFINITE, False);

if LWaitResult = WAIT_OBJECT_0 then
LCancel := True;
if LWaitResult = (WAIT_OBJECT_0 + 1) then begin
if GrowPool() then begin
//Take is easy, only grow/shrink every 10 seconds
if FCancel.WaitFor(100) = wrSignaled then
LCancel := True;
const LWaitResult = WaitForMultipleObjectsEx(2, @LEvents, False, INFINITE, False);
case LWaitResult of
WAIT_OBJECT_0: LCancel := True;

WAIT_OBJECT_0 + 1: begin
FSignalController.ResetEvent;
const LRevisionBefore = TInterlocked.Read(FSignalControllerRevision);

if GrowPool() then begin
//Take it easy, only grow/shrink every 100ms
if FCancel.WaitFor(100) = wrSignaled then
LCancel := True;
end;

const LRevisionAfter = TInterlocked.Read(FSignalControllerRevision);
if LRevisionBefore <> LRevisionAfter then
SignalControllerIf();
end;
end;
end;

for LThread in FThreads do begin
for var LThread in FThreads do begin
LThread.Cancel;
LThread.WaitFor;
LThread.Free;
Expand All @@ -936,7 +935,9 @@ constructor TPromiseScheduler.Create;
FThreads := TList<TPromiseThread>.Create;

FSignal := TSemaphore.Create(nil, 0, 9999, '');
FSignalController := TSemaphore.Create(nil, 0, 9999, '');

FSignalController := TEvent.Create;
FSignalControllerRevision := 0;

FCancel := TEvent.Create();
FIdleThreads := 0;
Expand Down Expand Up @@ -1004,11 +1005,12 @@ function TPromiseScheduler.GrowPool: Boolean;
if (LPromiseCount > FIdleThreads) then begin
if (FThreads.Count < MAX_POOL_SIZE) then begin
AddThread();

//could be that GrowPool was triggered by multiple promises, so we might need to grow more
SignalControllerIf();
Result := True;
end else begin
//Reclaim to try again later
FSignalController.Release;
end;
end
//else: nothing to do, pool is already at max size, and shrinking the pool isn't supported yet
end;
end;

Expand All @@ -1034,6 +1036,14 @@ procedure TPromiseScheduler.Signal;
FSignal.Release;
end;

procedure TPromiseScheduler.SignalControllerIf;
begin
if not FThreadPoolIsMaxSize then begin
TInterlocked.Increment(FSignalControllerRevision); //will rollover to MinInt when the max is reached
FSignalController.SetEvent;
end;
end;

function TPromiseScheduler.SignalToken: TSemaphore;
begin
Result := FSignal;
Expand Down

0 comments on commit df25ad8

Please sign in to comment.