Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.10.318 #603

Merged
merged 20 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update
  • Loading branch information
FriedrichWeinmann committed Nov 6, 2023
commit 2b95cf732599243ca814be0c023c47b1613820cc
1 change: 1 addition & 0 deletions PSFramework/PSFramework.psd1
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
'Unregister-PSFConfig'
'Unregister-PSFMessageColorTransform'
'Wait-PSFMessage'
'Wait-PSFRunspaceWorkflow'
'Write-PSFHostColor'
'Write-PSFMessageProxy'
'Write-PSFRunspaceQueue'
Expand Down
Binary file modified PSFramework/bin/PSFramework.dll
Binary file not shown.
Binary file modified PSFramework/bin/PSFramework.pdb
Binary file not shown.
35 changes: 30 additions & 5 deletions PSFramework/bin/PSFramework.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions PSFramework/functions/runspace/Add-PSFRunspaceWorker.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@
This however requires the worker to cycle often enough to catch that signal.
If this is impossible - for example, if the worker has one long-running piece - and we still need to be able to just kill it, set this flag.
This means that on stop, all worker runspaces will be killed - without doing any cleanup!

.PARAMETER MaxItems
The maximum number of items this worker will process.
All runspaces of this worker will refuse to process more items than this number in total shared among them.

.PARAMETER CloseOutQueue
When this worker stops - for example due to reaching its limit of items produced, or its source queue being closed - it will, after finishing its processing, close the associated out queue.
This allows subsequent workers to know, whether to expect any further input.

.PARAMETER QueuesToClose
When this worker stops, close the queues listed.
This works independent of CloseOutQueue - both con ber used together or separately.

.PARAMETER Throttle
A throttle object, as returned by New-PSFThrottle.
Expand Down Expand Up @@ -129,6 +141,7 @@
$OutQueue,

[Parameter(Mandatory = $true)]
[Alias('Process')]
[ScriptBlock]
$ScriptBlock,

Expand All @@ -144,6 +157,15 @@
[switch]
$KillToStop,

[int]
$MaxItems,

[switch]
$CloseOutQueue,

[string[]]
$QueuesToClose,

[PSFramework.Utility.Throttle]
$Throttle,

Expand Down Expand Up @@ -200,6 +222,9 @@

if ($Begin) { $worker.Begin = $Begin }
if ($End) { $worker.End = $End }
if ($MaxItems) { $worker.MaxItems = $MaxItems }
if ($CloseOutQueue) { $worker.CloseOutQueue = $true }
if ($QueuesToClose) { $worker.QueuesToClose = $QueuesToClose }

if ($SessionState) { $worker.SessionState = $SessionState }
foreach ($module in $Modules) { $worker.Modules.Add($module) }
Expand Down
9 changes: 8 additions & 1 deletion PSFramework/functions/runspace/Start-PSFRunspaceWorkflow.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

.PARAMETER InputObject
Runspace Workflow object to launch.

.PARAMETER PassThru
Return the runspace workflow just started.

.EXAMPLE
PS C:\> Start-PSFRunspaceWorkflow -Name MailboxAnalysis
Expand All @@ -36,13 +39,17 @@

[Parameter(ValueFromPipeline = $true)]
[PSFramework.Runspace.RSWorkflow[]]
$InputObject
$InputObject,

[switch]
$PassThru
)
process {
$resolvedWorkflows = Resolve-PsfRunspaceWorkflow -Name $Name -InputObject $InputObject -Cmdlet $PSCmdlet

foreach ($resolvedWorkflow in $resolvedWorkflows) {
$resolvedWorkflow.Start()
if ($PassThru) { $resolvedWorkflow }
}
}
}
170 changes: 170 additions & 0 deletions PSFramework/functions/runspace/Wait-PSFRunspaceWorkflow.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
function Wait-PSFRunspaceWorkflow {
<#
.SYNOPSIS
Wait for a Runspace Workflow to complete.

.DESCRIPTION
Wait for a Runspace Workflow to complete.

.PARAMETER Queue
The name of the queue to measure completion by.
Usually the last output queue in the chain of steps.

.PARAMETER WorkerName
The name of the worker to measure completion by.
Usually the last step in the chain of steps.

.PARAMETER Closed
The workflow is considered completed, when the queue or worker selected is closed.

.PARAMETER Count
The workflow is considered completed, when the queue selected has received the specified number of results.
This looks at the total amount ever provided, not current number queued.

.PARAMETER ReferenceQueue
The workflow is considered completed, when the queue selected has received the same number of items as the reference queue.

.PARAMETER ReferenceMultiplier
When comparing the result queue with a reference queue, multiply the number of items in the reference queue by this value.
Use when the number of output items, based from the original input, scales by a constant multiplier.
Defaults to 1.

.PARAMETER PassThru
Pass through the workflow object waiting for.
Useful to stop it once waiting has completed.

.PARAMETER Timeout
Maximum wait time. Throws an error if exceeded.
Defaults to 1 day.

.PARAMETER Name
Name of the workflow to wait for.

.PARAMETER InputObject
A runspace workflow object to wait for.

.EXAMPLE
PS C:\> $workflow | Wait-PSFRunspaceWorkflow -Queue Done -Count 1000

Wait until 1000 items have been queued to "Done" in total.

.EXAMPLE
PS C:\> $workflow | Wait-PSFRunspaceWorkflow -Queue Done -PassThru | Stop-PSFRunspaceWorkflow

Wait until the "Done" queue has been closed, then stop the workflow.

.EXAMPLE
PS C:\> $workflow | Wait-PSFRunspaceWorkflow -Queue Done -ReferenceQueue Input

Wait until the "Done" queue has processed as many items as there were written to the "Input" queue.
#>
[CmdletBinding(DefaultParameterSetName = 'Closed')]
param (
[Parameter(Mandatory = $true, ParameterSetName = 'Closed')]
[Parameter(Mandatory = $true, ParameterSetName = 'Count')]
[Parameter(Mandatory = $true, ParameterSetName = 'Reference')]
[string]
$Queue,

[Parameter(Mandatory = $true, ParameterSetName = 'WorkerClosed')]
[Parameter(Mandatory = $true, ParameterSetName = 'WorkerCount')]
[Parameter(Mandatory = $true, ParameterSetName = 'WorkerReference')]
[string]
$WorkerName,

[Parameter(ParameterSetName = 'Closed')]
[Parameter(Mandatory = $true, ParameterSetName = 'WorkerClosed')]
[switch]
$Closed,

[Parameter(Mandatory = $true, ParameterSetName = 'WorkerCount')]
[Parameter(Mandatory = $true, ParameterSetName = 'Count')]
[int]
$Count,

[Parameter(Mandatory = $true, ParameterSetName = 'WorkerReference')]
[Parameter(Mandatory = $true, ParameterSetName = 'Reference')]
[string]
$ReferenceQueue,

[Parameter(ParameterSetName = 'WorkerReference')]
[Parameter(ParameterSetName = 'Reference')]
[int]
$ReferenceMultiplier = 1,

[switch]
$PassThru,

[PSFramework.Parameter.TimeSpanParameter]
$Timeout,

[Parameter(ValueFromPipeline = $true, ValueFromPipelineByPropertyName = $true)]
[string[]]
$Name,

[Parameter(ValueFromPipeline = $true)]
[PSFramework.Runspace.RSWorkflow[]]
$InputObject
)
process {
$resolvedWorkflows = Resolve-PsfRunspaceWorkflow -Name $Name -InputObject $InputObject -Cmdlet $PSCmdlet
$limit = (Get-Date).AddDays(1)
if ($Timeout) { $limit = (Get-Date).Add($Timeout) }

foreach ($resolvedWorkflow in $resolvedWorkflows) {
switch ($PSCmdlet.ParameterSetName) {
'Closed' {
while (-not $resolvedWorkflow.Queues.$Queue.Closed) {
if ($limit -lt (Get-Date)) {
Stop-PSFFunction -String 'Wait-PSFRunspaceWorkflow.Error.Timeout' -StringValues $limit, $resolvedWorkflow.Name -Target $resolvedWorkflow -EnableException $true -Cmdlet $PSCmdlet -Category OperationTimeout
}
Start-Sleep -Milliseconds 200
}
}
'WorkerClosed' {
$queueObject = $resolvedWorkflow.Queues[$resolvedWorkflow.Workers.$WorkerName.OutQueue]
while (-not $queueObject.Closed) {
if ($limit -lt (Get-Date)) {
Stop-PSFFunction -String 'Wait-PSFRunspaceWorkflow.Error.Timeout' -StringValues $limit, $resolvedWorkflow.Name -Target $resolvedWorkflow -EnableException $true -Cmdlet $PSCmdlet -Category OperationTimeout
}
Start-Sleep -Milliseconds 200
}
}
'Count' {
while ($resolvedWorkflow.Queues.$Queue.TotalItemCount -lt $Count) {
if ($limit -lt (Get-Date)) {
Stop-PSFFunction -String 'Wait-PSFRunspaceWorkflow.Error.Timeout' -StringValues $limit, $resolvedWorkflow.Name -Target $resolvedWorkflow -EnableException $true -Cmdlet $PSCmdlet -Category OperationTimeout
}
Start-Sleep -Milliseconds 200
}
}
'WorkerCount' {
while ($resolvedWorkflow.Workers.$WorkerName.CountInputCompleted -lt $Count) {
if ($limit -lt (Get-Date)) {
Stop-PSFFunction -String 'Wait-PSFRunspaceWorkflow.Error.Timeout' -StringValues $limit, $resolvedWorkflow.Name -Target $resolvedWorkflow -EnableException $true -Cmdlet $PSCmdlet -Category OperationTimeout
}
Start-Sleep -Milliseconds 200
}
}
'Reference' {
while ($resolvedWorkflow.Queues.$Queue.TotalItemCount -lt ($resolvedWorkflow.Queues.$ReferenceQueue.TotalItemCount * $ReferenceMultiplier)) {
if ($limit -lt (Get-Date)) {
Stop-PSFFunction -String 'Wait-PSFRunspaceWorkflow.Error.Timeout' -StringValues $limit, $resolvedWorkflow.Name -Target $resolvedWorkflow -EnableException $true -Cmdlet $PSCmdlet -Category OperationTimeout
}
Start-Sleep -Milliseconds 200
}
}
'WorkerReference' {
while ($resolvedWorkflow.Workers.$WorkerName.CountInputCompleted -lt ($resolvedWorkflow.Queues.$ReferenceQueue.TotalItemCount * $ReferenceMultiplier)) {
if ($limit -lt (Get-Date)) {
Stop-PSFFunction -String 'Wait-PSFRunspaceWorkflow.Error.Timeout' -StringValues $limit, $resolvedWorkflow.Name -Target $resolvedWorkflow -EnableException $true -Cmdlet $PSCmdlet -Category OperationTimeout
}
Start-Sleep -Milliseconds 200
}
}
}

if ($PassThru) { $resolvedWorkflow }
}
}
}
2 changes: 1 addition & 1 deletion PSFramework/functions/runspace/Write-PSFRunspaceQueue.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
[string]
$Name,

[Parameter(Mandatory = $true, ParameterSetName = 'Single')]
[Parameter(Mandatory = $true, ValueFromPipeline = $true, ParameterSetName = 'Single')]
[AllowNull()]
$Value,

Expand Down
1 change: 1 addition & 0 deletions PSFramework/functions/utility/New-PSFThrottle.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,6 @@
try { $throttle.AddLimit($pair.Key, $pair.Value) }
catch { Write-PSFMessage -Level Error -String 'New-PSFThrottle.Error.InvalidLimit' -StringValues $pair.Key, $pair.Value -ErrorRecord $_ -EnableException $true -PSCmdlet $PSCmdlet }
}
$throttle
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@
[OutputType([PSFramework.Runspace.RSWorkflow])]
[CmdletBinding()]
param (
[AllowEmptyCollection()]
[AllowNull()]
[string[]]
$Name,

[AllowEmptyCollection()]
[AllowNull()]
[PSFramework.Runspace.RSWorkflow[]]
$InputObject,

Expand Down
7 changes: 3 additions & 4 deletions PSFramework/internal/scripts/runspaceWorkerCode.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,19 @@
$validStates = 'Starting', 'Running'
while ($validStates -contains $__PSF_Worker.State) {
# Inqueue is closed and all items processed?
if ($__PSF_Worker.Done) { break }
if ($__PSF_Worker.MaxItems -and $__PSF_Worker.MaxItems -ge $__PSF_Worker.CountInputCompleted) { break }
if ($__PSF_Worker.IsDone) { break }
if ($__PSF_Worker.MaxItems -and $__PSF_Worker.MaxItems -le $__PSF_Worker.CountInputCompleted) { break }

if ($__PSF_Worker.Throttle) {
$__PSF_Worker.Throttle.GetSlot()
}

$inputData = $null
$success = $__PSF_Workflow.Queues.$($__PSF_Worker.InQueue).TryDequeue([ref]$inputData)
$success = $__PSF_Worker.GetNext([ref]$inputData)
if (-not $success) {
Start-Sleep -Milliseconds 250
continue
}
$__PSF_Worker.IncrementInput()

try {
$results = & $__PSF_ScriptBlock $inputData
Expand Down
Loading