Skip to content

Release/0.1.10 #771

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 21 additions & 35 deletions docs/setup/mwm-workflow-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ A workflow is a standard template that contains a list of tasks that can be ran

The first task to be ran, will always be the first task in the list. The next task/tasks to be ran must be listed in the [Task Destinations](#task-destinations) of the task. A workflow requires at least one task.

Workflows can be created or updated via the [Workflow API](https://github.com/Project-MONAI/monai-deploy-workflow-manager/blob/develop/docs/api/rest/workflow.md).
Workflows can be created or updated via the [Workflow API](https://github.com/Project-MONAI/monai-deploy-workflow-manager/blob/develop/docs/api/rest/workflow.md).

# Contents

Expand Down Expand Up @@ -142,7 +142,7 @@ The following is an example of the structure of a workflow.
The following is an example of a complete workflow:
![scenario1](../images/workflow_examples/scenario1.png)

An example of a workflow with two tasks:
An example of a workflow with two tasks:

1. Argo task
2. Export Task
Expand Down Expand Up @@ -240,7 +240,7 @@ It also defines the "PROD_PACS" output destination, meaning that it can be used:
Tasks are the basic building block of a workflow. They are provided as a list - the first Task in the list is executed when the workflow is triggered.
Subsequent tasks are triggered by the `task_destinations` specified by previous tasks.

# Task Object
# Task Object

### Task Types
These tasks are borken down into different types:
Expand Down Expand Up @@ -292,8 +292,8 @@ The following are examples of the task json structure including required args fo
]
},
"task_destinations": [
{
"name": "export-task-id"
{
"name": "export-task-id"
}
]
}
Expand Down Expand Up @@ -364,7 +364,7 @@ Depending of the type of task, the task object may contain additional fields.
Router tasks don't have additional fields. They are used to contain `task_destinations` so that workflow processing can be directed to the desired next step.

#### Export
These are task types that allow for artifacts to be exported based on the input artifacts list. This task type should not have Out artifacts listed.
These are task types that allow for artifacts to be exported based on the input artifacts list. This task type should not have Out artifacts listed.
The task also requires these extra attributes:-

| Property | Type | Description |
Expand Down Expand Up @@ -403,7 +403,7 @@ Example (output sent to another task if the patient is female, otherwise to PACS
Export destinations define an external location to which the output of the task can be sent. This will take the form of an event published to a pub/sub service notifying of an available export to a specific destination reference. Most commonly, the export location will be a PACs system and the notification will be picked up by the Monai Informatics Gateway.

#### Plugin
These are tasks are Named the same as the installed Pluging.
These are tasks are Named the same as the installed Pluging.
The task also requires these extra attributes:-

| Property | Type | Description |
Expand All @@ -414,7 +414,7 @@ The task also requires these extra attributes:-
The args requirements for argo plugin can be found [here](#argo).

### Task Arguments
Each task plugin requires specific arguments to be provided in the args dictionary. This allows all task types to support as many additional values as necessary without the need to bloat the workflow spec.
Each task plugin requires specific arguments to be provided in the args dictionary. This allows all task types to support as many additional values as necessary without the need to bloat the workflow spec.

#### Argo
The Argo plugin triggers workflows pre-deployed onto an [Argo workflow server](https://argoproj.github.io/argo-events/).
Expand All @@ -425,25 +425,11 @@ The Task's "args" object should contain the following fields:

| Property | Type | Required | Description |
|------|------|------|------|
|workflow_template_name|str|Yes|The ID of this workflow as registered on the Argo server.|
|namespace|str|Yes|The namespace of the argo workflow.|
|server_url|url|Yes|The URL of the Argo server.|
|allow_insecure|bool|No|Allow insecure connections to argo from the plug-in.|
|parameters|dictionary|No|Key value pairs, Argo parameters that will be passed on to the Argo workflow.|
|priority_class|string|No|The name of a valid Kubernetes priority class to be assigned to the Argo workflow pods|
|resources|dictionary|No|A resource requests & limits object (see below). These will be applied to the Argo workflow pods|

##### Resource Request Object

Resource request parameters should be included in the task args object dictionary, as a string dictionary. The resources dictionary and all included values below are optional.

| Property | Type | Description |
|------|------|------|
|memory_reservation|str|A valid [Kubernetes memory request value](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory).|
|cpu_reservation|url|A valid [Kubernetes CPU request value](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-cpu).|
|gpu_limit|dictionary|The number of GPUs to be used by this task.|
|memory_limit|string|The maximum amount of memory this task may use|
|cpu_limit|object|The maximum amount of CPU this task may use. See |
|workflow_template_name|string|Yes|The ID of this workflow as registered on the Argo server.|
|priority_class|string|No|The name of a valid Kubernetes priority class to be assigned to the Argo workflow pods.|
|gpu_required|string|No|Whether a GPU is to be used by this task.|
|memory_gb|string|No|The maximum amount of memory in gigabytes this task may use.|
|cpu|string|No|The maximum amount of CPU this task may use.|

For more information about Kubernetes requests & limits, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/.

Expand Down Expand Up @@ -501,7 +487,7 @@ As you can see in the example below, input artifacts require a _value_. This is

#### DICOM Input

If payload DICOM inputs are to be used in a given task, the value of the input must be `context.input.dicom`. This will to resolve to the `{payloadId}/dcm` folder within Minio / S3.
If payload DICOM inputs are to be used in a given task, the value of the input must be `context.input.dicom`. This will to resolve to the `{payloadId}/dcm` folder within Minio / S3.

Example:
```json
Expand Down Expand Up @@ -700,11 +686,11 @@ The following examples both function the same and act as an AND condition.
## Evaluators
Conditional evaluators are logical statement strings that may be used to determine which tasks are executed. They can make use of the execution context _metadata_ and dicom tags. All conditions must evaluate to true in order for the task to be triggered.

[A detailed breakdown of conditional logic can be found here.](https://github.com/Project-MONAI/monai-deploy-workflow-manager/blob/develop/guidelines/mwm-conditionals.md)
[A detailed breakdown of conditional logic can be found here.](https://github.com/Project-MONAI/monai-deploy-workflow-manager/blob/develop/guidelines/mwm-conditionals.md)

### Supported Evaulators


Conditional evaluators should support evaluating workflow variables against predefined values with the following operators:

< (Valid for integers)
Expand Down Expand Up @@ -765,9 +751,9 @@ Example (status):

#### Result Metadata & Execution Stats - Using Dictionary Values

The Result Metadata and Execution Stats are populated by the plugin and are added to the workflow instance once a task is completed to provide some output of a task. Each plugin will have its own implementation to populate the result metadata.
The Result Metadata and Execution Stats are populated by the plugin and are added to the workflow instance once a task is completed to provide some output of a task. Each plugin will have its own implementation to populate the result metadata.

Because `result` and `execution_stats` are a dictionary, the section after `context.executions.task_id.result` or `context.executions.task_id.execution_stats` is the key to be checked in the result/execution_stats dictionary.
Because `result` and `execution_stats` are a dictionary, the section after `context.executions.task_id.result` or `context.executions.task_id.execution_stats` is the key to be checked in the result/execution_stats dictionary.

For conditional statements, the key specified is case sensitive and must match exactly to the key which has been output by the model and saved in the result/execution_stats dictionary.

Expand Down Expand Up @@ -807,9 +793,9 @@ The result metadata for an Argo task is populated by a `metadata.json` that is i
}
```

If metadata is to be used in a conditional the `metadata.json` must be present somewhere in the output directory and a valid JSON dictionary. It will automatically be imported if it is in the directory.
If metadata is to be used in a conditional the `metadata.json` must be present somewhere in the output directory and a valid JSON dictionary. It will automatically be imported if it is in the directory.

An example format of the metadata.json can be found below:
An example format of the metadata.json can be found below:

execution stats are populated from the argo execution values returned automatically.

Expand Down Expand Up @@ -916,4 +902,4 @@ Name:
Description:
```python
{{context.workflow.description}} == 'This workflow is a valid workflow'
```
```
15 changes: 15 additions & 0 deletions src/Shared/Shared/ValidationConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ public static class ValidationConstants
/// </summary>
public static readonly string Notifications = "notifications";

/// <summary>
/// Key for the CPU.
/// </summary>
public static readonly string Cpu = "cpu";

/// <summary>
/// Key for the memory.
/// </summary>
public static readonly string Memory = "memory_gb";

/// <summary>
/// Key for the GPU.
/// </summary>
public static readonly string GpuRequired = "gpu_required";

public enum ModeValues
{
QA,
Expand Down
1 change: 1 addition & 0 deletions src/Shared/Shared/Wrappers/StatsPagedResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class StatsPagedResponse<T> : PagedResponse<T>
public DateTime PeriodEnd { get; set; }
public long TotalExecutions { get; set; }
public long TotalFailures { get; set; }
public long TotalInprogress { get; set; }
public double AverageTotalExecutionSeconds { get; set; }
public double AverageArgoExecutionSeconds { get; set; }

Expand Down
10 changes: 10 additions & 0 deletions src/TaskManager/API/Models/TaskExecutionStats.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,15 @@ public TaskExecutionStats(TaskUpdateEvent taskUpdateEvent)
TaskId = taskUpdateEvent.TaskId;
Status = taskUpdateEvent.Status.ToString();
}

public TaskExecutionStats(TaskCancellationEvent taskCanceledEvent, string correlationId)
{
Guard.Against.Null(taskCanceledEvent, "taskCanceledEvent");
CorrelationId = correlationId;
WorkflowInstanceId = taskCanceledEvent.WorkflowInstanceId;
ExecutionId = taskCanceledEvent.ExecutionId;
TaskId = taskCanceledEvent.TaskId;
Status = TaskExecutionStatus.Failed.ToString();
}
}
}
27 changes: 21 additions & 6 deletions src/TaskManager/Database/ITaskExecutionStatsRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,29 @@ public interface ITaskExecutionStatsRepository
/// Creates a task dispatch event in the database.
/// </summary>
/// <param name="taskDispatchEvent">A TaskDispatchEvent to create.</param>
/// <returns>Returns the created TaskDispatchEventInfo.</returns>
/// <returns></returns>
Task CreateAsync(TaskDispatchEventInfo taskDispatchEventInfo);

/// <summary>
/// Updates user accounts of a task dispatch event in the database.
/// Updates status of a task dispatch event in the database.
/// </summary>
/// <param name="taskDispatchEvent">A TaskDispatchEvent to update.</param>
/// <returns>Returns the created TaskDispatchEventInfo.</returns>
/// <returns></returns>
Task UpdateExecutionStatsAsync(TaskUpdateEvent taskUpdateEvent);

/// <summary>
/// Updates status of a task now its been canceled.
/// </summary>
/// <param name="TaskCanceledException">A TaskCanceledException to update.</param>
/// <returns></returns
Task UpdateExecutionStatsAsync(TaskCancellationEvent taskCanceledEvent, string correlationId);

/// <summary>
/// Returns paged entries between the two given dates.
/// </summary>
/// <param name="startTime">start of the range.</param>
/// <param name="endTime">end of the range.</param>
/// <returns>a paged view of entried in range</returns>
/// <returns>a collections of stats</returns>
Task<IEnumerable<TaskExecutionStats>> GetStatsAsync(DateTime startTime, DateTime endTime, int PageSize = 10, int PageNumber = 1, string workflowInstanceId = "", string taskId = "");

/// <summary>
Expand All @@ -49,14 +56,22 @@ public interface ITaskExecutionStatsRepository
/// <param name="startTime">start of the range.</param>
/// <param name="endTime">end of the range.</param>
/// <returns>The count of all records in range</returns>
Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, string workflowInstanceId = "", string taskId = "");
//Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, string workflowInstanceId = "", string taskId = "");

/// <summary>
/// Return the count of the entries with this status, or all if no status given
/// </summary>
/// <param name="start">start of the range.</param>
/// <param name="endTime">end of the range.</param>
/// <param name="status">the status to get count of, or string.empty</param>
/// <returns>The count of all records in range</returns>
Task<long> GetStatsStatusCountAsync(DateTime start, DateTime endTime, string status = "", string workflowInstanceId = "", string taskId = "");
/// <summary>
/// Returns all stats in Failed or PartialFail status.
/// </summary>
/// <param name="startTime">start of the range.</param>
/// <param name="endTime">end of the range.</param>
/// <returns>All stats NOT of that status</returns>
/// <returns>All stats that failed or partially failed</returns>
Task<long> GetStatsStatusFailedCountAsync(DateTime startTime, DateTime endTime, string workflowInstanceId = "", string taskId = "");

/// <summary>
Expand Down
49 changes: 34 additions & 15 deletions src/TaskManager/Database/TaskExecutionStatsRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,29 @@ await _taskExecutionStatsCollection.UpdateOneAsync(o =>
}
}

public async Task UpdateExecutionStatsAsync(TaskCancellationEvent taskCanceledEvent, string correlationId)
{
Guard.Against.Null(taskCanceledEvent, "taskCanceledEvent");

try
{
var updateMe = new TaskExecutionStats(taskCanceledEvent, correlationId);
var duration = updateMe.CompletedAtUTC == default ? 0 : (updateMe.CompletedAtUTC - updateMe.StartedUTC).TotalMilliseconds / 1000;
await _taskExecutionStatsCollection.UpdateOneAsync(o =>
o.ExecutionId == updateMe.ExecutionId,
Builders<TaskExecutionStats>.Update
.Set(w => w.Status, updateMe.Status)
.Set(w => w.LastUpdatedUTC, DateTime.UtcNow)
.Set(w => w.CompletedAtUTC, updateMe.CompletedAtUTC)
.Set(w => w.DurationSeconds, duration)

, new UpdateOptions { IsUpsert = true }).ConfigureAwait(false);
}
catch (Exception e)
{
_logger.DatabaseException(nameof(CreateAsync), e);
}
}
public async Task<IEnumerable<TaskExecutionStats>> GetStatsAsync(DateTime startTime, DateTime endTime, int PageSize = 10, int PageNumber = 1, string workflowInstanceId = "", string taskId = "")
{
startTime = startTime.ToUniversalTime();
Expand All @@ -128,12 +151,13 @@ public async Task<IEnumerable<TaskExecutionStats>> GetStatsAsync(DateTime startT
T.StartedUTC >= startTime &&
T.StartedUTC <= endTime.ToUniversalTime() &&
(workflowinstanceNull || T.WorkflowInstanceId == workflowInstanceId) &&
(taskIdNull || T.TaskId == taskId) &&
(
T.Status == TaskExecutionStatus.Succeeded.ToString()
|| T.Status == TaskExecutionStatus.Failed.ToString()
|| T.Status == TaskExecutionStatus.PartialFail.ToString()
)
(taskIdNull || T.TaskId == taskId)
//&&
//(
// T.Status == TaskExecutionStatus.Succeeded.ToString()
// || T.Status == TaskExecutionStatus.Failed.ToString()
// || T.Status == TaskExecutionStatus.PartialFail.ToString()
// )
)
.Limit(PageSize)
.Skip((PageNumber - 1) * PageSize)
Expand Down Expand Up @@ -173,24 +197,19 @@ private static TaskExecutionStats ExposeExecutionStats(TaskExecutionStats taskEx
}
return taskExecutionStats;
}

public async Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, string workflowInstanceId = "", string taskId = "")
public async Task<long> GetStatsStatusCountAsync(DateTime start, DateTime endTime, string status = "", string workflowInstanceId = "", string taskId = "")
{
var statusNull = string.IsNullOrWhiteSpace(status);
var workflowinstanceNull = string.IsNullOrWhiteSpace(workflowInstanceId);
var taskIdNull = string.IsNullOrWhiteSpace(taskId);

return await _taskExecutionStatsCollection.CountDocumentsAsync(T =>
T.StartedUTC >= startTime.ToUniversalTime() &&
T.StartedUTC >= start.ToUniversalTime() &&
T.StartedUTC <= endTime.ToUniversalTime() &&
(workflowinstanceNull || T.WorkflowInstanceId == workflowInstanceId) &&
(taskIdNull || T.TaskId == taskId) &&
(
T.Status == TaskExecutionStatus.Succeeded.ToString() ||
T.Status == TaskExecutionStatus.Failed.ToString() ||
T.Status == TaskExecutionStatus.PartialFail.ToString())
);
(statusNull || T.Status == status));
}

public async Task<long> GetStatsStatusFailedCountAsync(DateTime start, DateTime endTime, string workflowInstanceId = "", string taskId = "")
{
var workflowinstanceNull = string.IsNullOrWhiteSpace(workflowInstanceId);
Expand Down
8 changes: 6 additions & 2 deletions src/TaskManager/Plug-ins/Argo/ArgoClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,17 @@ public async Task<WorkflowTemplate> Argo_GetWorkflowTemplateAsync(string argoNam
public virtual async Task<WorkflowTemplate> Argo_CreateWorkflowTemplateAsync(string argoNamespace, WorkflowTemplateCreateRequest body, CancellationToken cancellationToken)
{
Guard.Against.NullOrWhiteSpace(argoNamespace);
Guard.Against.Null(body);
Guard.Against.Null(body.Template);

var urlBuilder = new StringBuilder();
urlBuilder.Append(CultureInfo.InvariantCulture, $"{FormattedBaseUrl}/api/v1/workflow-templates/{argoNamespace}");

var method = "POST";
var content = new StringContent(Newtonsoft.Json.JsonConvert.SerializeObject(body));
var stringBody = Newtonsoft.Json.JsonConvert.SerializeObject(body);
var content = new StringContent(stringBody);

var _logger = NLog.LogManager.GetCurrentClassLogger();
_logger.Debug($"Sending content to Argo :{stringBody}");
return await SendRequest<WorkflowTemplate>(content, urlBuilder, method, cancellationToken).ConfigureAwait(false);
}

Expand Down
Loading