Skip to content

Commit 8e2664b

Browse files
authored
Merge pull request #771 from Project-MONAI/release/0.1.10
Release/0.1.10
2 parents 5362cee + 62a3dc2 commit 8e2664b

31 files changed

+464
-227
lines changed

docs/setup/mwm-workflow-spec.md

Lines changed: 21 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ A workflow is a standard template that contains a list of tasks that can be ran
2424

2525
The first task to be ran, will always be the first task in the list. The next task/tasks to be ran must be listed in the [Task Destinations](#task-destinations) of the task. A workflow requires at least one task.
2626

27-
Workflows can be created or updated via the [Workflow API](https://github.com/Project-MONAI/monai-deploy-workflow-manager/blob/develop/docs/api/rest/workflow.md).
27+
Workflows can be created or updated via the [Workflow API](https://github.com/Project-MONAI/monai-deploy-workflow-manager/blob/develop/docs/api/rest/workflow.md).
2828

2929
# Contents
3030

@@ -142,7 +142,7 @@ The following is an example of the structure of a workflow.
142142
The following is an example of a complete workflow:
143143
![scenario1](../images/workflow_examples/scenario1.png)
144144

145-
An example of a workflow with two tasks:
145+
An example of a workflow with two tasks:
146146

147147
1. Argo task
148148
2. Export Task
@@ -240,7 +240,7 @@ It also defines the "PROD_PACS" output destination, meaning that it can be used:
240240
Tasks are the basic building block of a workflow. They are provided as a list - the first Task in the list is executed when the workflow is triggered.
241241
Subsequent tasks are triggered by the `task_destinations` specified by previous tasks.
242242

243-
# Task Object
243+
# Task Object
244244

245245
### Task Types
246246
These tasks are borken down into different types:
@@ -292,8 +292,8 @@ The following are examples of the task json structure including required args fo
292292
]
293293
},
294294
"task_destinations": [
295-
{
296-
"name": "export-task-id"
295+
{
296+
"name": "export-task-id"
297297
}
298298
]
299299
}
@@ -364,7 +364,7 @@ Depending of the type of task, the task object may contain additional fields.
364364
Router tasks don't have additional fields. They are used to contain `task_destinations` so that workflow processing can be directed to the desired next step.
365365

366366
#### Export
367-
These are task types that allow for artifacts to be exported based on the input artifacts list. This task type should not have Out artifacts listed.
367+
These are task types that allow for artifacts to be exported based on the input artifacts list. This task type should not have Out artifacts listed.
368368
The task also requires these extra attributes:-
369369

370370
| Property | Type | Description |
@@ -403,7 +403,7 @@ Example (output sent to another task if the patient is female, otherwise to PACS
403403
Export destinations define an external location to which the output of the task can be sent. This will take the form of an event published to a pub/sub service notifying of an available export to a specific destination reference. Most commonly, the export location will be a PACs system and the notification will be picked up by the Monai Informatics Gateway.
404404

405405
#### Plugin
406-
These are tasks are Named the same as the installed Pluging.
406+
These are tasks are Named the same as the installed Pluging.
407407
The task also requires these extra attributes:-
408408

409409
| Property | Type | Description |
@@ -414,7 +414,7 @@ The task also requires these extra attributes:-
414414
The args requirements for argo plugin can be found [here](#argo).
415415

416416
### Task Arguments
417-
Each task plugin requires specific arguments to be provided in the args dictionary. This allows all task types to support as many additional values as necessary without the need to bloat the workflow spec.
417+
Each task plugin requires specific arguments to be provided in the args dictionary. This allows all task types to support as many additional values as necessary without the need to bloat the workflow spec.
418418

419419
#### Argo
420420
The Argo plugin triggers workflows pre-deployed onto an [Argo workflow server](https://argoproj.github.io/argo-events/).
@@ -425,25 +425,11 @@ The Task's "args" object should contain the following fields:
425425

426426
| Property | Type | Required | Description |
427427
|------|------|------|------|
428-
|workflow_template_name|str|Yes|The ID of this workflow as registered on the Argo server.|
429-
|namespace|str|Yes|The namespace of the argo workflow.|
430-
|server_url|url|Yes|The URL of the Argo server.|
431-
|allow_insecure|bool|No|Allow insecure connections to argo from the plug-in.|
432-
|parameters|dictionary|No|Key value pairs, Argo parameters that will be passed on to the Argo workflow.|
433-
|priority_class|string|No|The name of a valid Kubernetes priority class to be assigned to the Argo workflow pods|
434-
|resources|dictionary|No|A resource requests & limits object (see below). These will be applied to the Argo workflow pods|
435-
436-
##### Resource Request Object
437-
438-
Resource request parameters should be included in the task args object dictionary, as a string dictionary. The resources dictionary and all included values below are optional.
439-
440-
| Property | Type | Description |
441-
|------|------|------|
442-
|memory_reservation|str|A valid [Kubernetes memory request value](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory).|
443-
|cpu_reservation|url|A valid [Kubernetes CPU request value](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-cpu).|
444-
|gpu_limit|dictionary|The number of GPUs to be used by this task.|
445-
|memory_limit|string|The maximum amount of memory this task may use|
446-
|cpu_limit|object|The maximum amount of CPU this task may use. See |
428+
|workflow_template_name|string|Yes|The ID of this workflow as registered on the Argo server.|
429+
|priority_class|string|No|The name of a valid Kubernetes priority class to be assigned to the Argo workflow pods.|
430+
|gpu_required|string|No|Whether a GPU is to be used by this task.|
431+
|memory_gb|string|No|The maximum amount of memory in gigabytes this task may use.|
432+
|cpu|string|No|The maximum amount of CPU this task may use.|
447433

448434
For more information about Kubernetes requests & limits, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/.
449435

@@ -501,7 +487,7 @@ As you can see in the example below, input artifacts require a _value_. This is
501487

502488
#### DICOM Input
503489

504-
If payload DICOM inputs are to be used in a given task, the value of the input must be `context.input.dicom`. This will to resolve to the `{payloadId}/dcm` folder within Minio / S3.
490+
If payload DICOM inputs are to be used in a given task, the value of the input must be `context.input.dicom`. This will to resolve to the `{payloadId}/dcm` folder within Minio / S3.
505491

506492
Example:
507493
```json
@@ -700,11 +686,11 @@ The following examples both function the same and act as an AND condition.
700686
## Evaluators
701687
Conditional evaluators are logical statement strings that may be used to determine which tasks are executed. They can make use of the execution context _metadata_ and dicom tags. All conditions must evaluate to true in order for the task to be triggered.
702688

703-
[A detailed breakdown of conditional logic can be found here.](https://github.com/Project-MONAI/monai-deploy-workflow-manager/blob/develop/guidelines/mwm-conditionals.md)
689+
[A detailed breakdown of conditional logic can be found here.](https://github.com/Project-MONAI/monai-deploy-workflow-manager/blob/develop/guidelines/mwm-conditionals.md)
704690

705691
### Supported Evaulators
706692

707-
693+
708694
Conditional evaluators should support evaluating workflow variables against predefined values with the following operators:
709695

710696
< (Valid for integers)
@@ -765,9 +751,9 @@ Example (status):
765751

766752
#### Result Metadata & Execution Stats - Using Dictionary Values
767753

768-
The Result Metadata and Execution Stats are populated by the plugin and are added to the workflow instance once a task is completed to provide some output of a task. Each plugin will have its own implementation to populate the result metadata.
754+
The Result Metadata and Execution Stats are populated by the plugin and are added to the workflow instance once a task is completed to provide some output of a task. Each plugin will have its own implementation to populate the result metadata.
769755

770-
Because `result` and `execution_stats` are a dictionary, the section after `context.executions.task_id.result` or `context.executions.task_id.execution_stats` is the key to be checked in the result/execution_stats dictionary.
756+
Because `result` and `execution_stats` are a dictionary, the section after `context.executions.task_id.result` or `context.executions.task_id.execution_stats` is the key to be checked in the result/execution_stats dictionary.
771757

772758
For conditional statements, the key specified is case sensitive and must match exactly to the key which has been output by the model and saved in the result/execution_stats dictionary.
773759

@@ -807,9 +793,9 @@ The result metadata for an Argo task is populated by a `metadata.json` that is i
807793
}
808794
```
809795

810-
If metadata is to be used in a conditional the `metadata.json` must be present somewhere in the output directory and a valid JSON dictionary. It will automatically be imported if it is in the directory.
796+
If metadata is to be used in a conditional the `metadata.json` must be present somewhere in the output directory and a valid JSON dictionary. It will automatically be imported if it is in the directory.
811797

812-
An example format of the metadata.json can be found below:
798+
An example format of the metadata.json can be found below:
813799

814800
execution stats are populated from the argo execution values returned automatically.
815801

@@ -916,4 +902,4 @@ Name:
916902
Description:
917903
```python
918904
{{context.workflow.description}} == 'This workflow is a valid workflow'
919-
```
905+
```

src/Shared/Shared/ValidationConstants.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,21 @@ public static class ValidationConstants
5353
/// </summary>
5454
public static readonly string Notifications = "notifications";
5555

56+
/// <summary>
57+
/// Key for the CPU.
58+
/// </summary>
59+
public static readonly string Cpu = "cpu";
60+
61+
/// <summary>
62+
/// Key for the memory.
63+
/// </summary>
64+
public static readonly string Memory = "memory_gb";
65+
66+
/// <summary>
67+
/// Key for the GPU.
68+
/// </summary>
69+
public static readonly string GpuRequired = "gpu_required";
70+
5671
public enum ModeValues
5772
{
5873
QA,

src/Shared/Shared/Wrappers/StatsPagedResponse.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class StatsPagedResponse<T> : PagedResponse<T>
2121
public DateTime PeriodEnd { get; set; }
2222
public long TotalExecutions { get; set; }
2323
public long TotalFailures { get; set; }
24+
public long TotalInprogress { get; set; }
2425
public double AverageTotalExecutionSeconds { get; set; }
2526
public double AverageArgoExecutionSeconds { get; set; }
2627

src/TaskManager/API/Models/TaskExecutionStats.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,5 +136,15 @@ public TaskExecutionStats(TaskUpdateEvent taskUpdateEvent)
136136
TaskId = taskUpdateEvent.TaskId;
137137
Status = taskUpdateEvent.Status.ToString();
138138
}
139+
140+
public TaskExecutionStats(TaskCancellationEvent taskCanceledEvent, string correlationId)
141+
{
142+
Guard.Against.Null(taskCanceledEvent, "taskCanceledEvent");
143+
CorrelationId = correlationId;
144+
WorkflowInstanceId = taskCanceledEvent.WorkflowInstanceId;
145+
ExecutionId = taskCanceledEvent.ExecutionId;
146+
TaskId = taskCanceledEvent.TaskId;
147+
Status = TaskExecutionStatus.Failed.ToString();
148+
}
139149
}
140150
}

src/TaskManager/Database/ITaskExecutionStatsRepository.cs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,29 @@ public interface ITaskExecutionStatsRepository
2525
/// Creates a task dispatch event in the database.
2626
/// </summary>
2727
/// <param name="taskDispatchEvent">A TaskDispatchEvent to create.</param>
28-
/// <returns>Returns the created TaskDispatchEventInfo.</returns>
28+
/// <returns></returns>
2929
Task CreateAsync(TaskDispatchEventInfo taskDispatchEventInfo);
3030

3131
/// <summary>
32-
/// Updates user accounts of a task dispatch event in the database.
32+
/// Updates status of a task dispatch event in the database.
3333
/// </summary>
3434
/// <param name="taskDispatchEvent">A TaskDispatchEvent to update.</param>
35-
/// <returns>Returns the created TaskDispatchEventInfo.</returns>
35+
/// <returns></returns>
3636
Task UpdateExecutionStatsAsync(TaskUpdateEvent taskUpdateEvent);
3737

38+
/// <summary>
39+
/// Updates status of a task now its been canceled.
40+
/// </summary>
41+
/// <param name="TaskCanceledException">A TaskCanceledException to update.</param>
42+
/// <returns></returns
43+
Task UpdateExecutionStatsAsync(TaskCancellationEvent taskCanceledEvent, string correlationId);
44+
3845
/// <summary>
3946
/// Returns paged entries between the two given dates.
4047
/// </summary>
4148
/// <param name="startTime">start of the range.</param>
4249
/// <param name="endTime">end of the range.</param>
43-
/// <returns>a paged view of entried in range</returns>
50+
/// <returns>a collections of stats</returns>
4451
Task<IEnumerable<TaskExecutionStats>> GetStatsAsync(DateTime startTime, DateTime endTime, int PageSize = 10, int PageNumber = 1, string workflowInstanceId = "", string taskId = "");
4552

4653
/// <summary>
@@ -49,14 +56,22 @@ public interface ITaskExecutionStatsRepository
4956
/// <param name="startTime">start of the range.</param>
5057
/// <param name="endTime">end of the range.</param>
5158
/// <returns>The count of all records in range</returns>
52-
Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, string workflowInstanceId = "", string taskId = "");
59+
//Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, string workflowInstanceId = "", string taskId = "");
5360

61+
/// <summary>
62+
/// Return the count of the entries with this status, or all if no status given
63+
/// </summary>
64+
/// <param name="start">start of the range.</param>
65+
/// <param name="endTime">end of the range.</param>
66+
/// <param name="status">the status to get count of, or string.empty</param>
67+
/// <returns>The count of all records in range</returns>
68+
Task<long> GetStatsStatusCountAsync(DateTime start, DateTime endTime, string status = "", string workflowInstanceId = "", string taskId = "");
5469
/// <summary>
5570
/// Returns all stats in Failed or PartialFail status.
5671
/// </summary>
5772
/// <param name="startTime">start of the range.</param>
5873
/// <param name="endTime">end of the range.</param>
59-
/// <returns>All stats NOT of that status</returns>
74+
/// <returns>All stats that failed or partially failed</returns>
6075
Task<long> GetStatsStatusFailedCountAsync(DateTime startTime, DateTime endTime, string workflowInstanceId = "", string taskId = "");
6176

6277
/// <summary>

src/TaskManager/Database/TaskExecutionStatsRepository.cs

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,29 @@ await _taskExecutionStatsCollection.UpdateOneAsync(o =>
117117
}
118118
}
119119

120+
public async Task UpdateExecutionStatsAsync(TaskCancellationEvent taskCanceledEvent, string correlationId)
121+
{
122+
Guard.Against.Null(taskCanceledEvent, "taskCanceledEvent");
123+
124+
try
125+
{
126+
var updateMe = new TaskExecutionStats(taskCanceledEvent, correlationId);
127+
var duration = updateMe.CompletedAtUTC == default ? 0 : (updateMe.CompletedAtUTC - updateMe.StartedUTC).TotalMilliseconds / 1000;
128+
await _taskExecutionStatsCollection.UpdateOneAsync(o =>
129+
o.ExecutionId == updateMe.ExecutionId,
130+
Builders<TaskExecutionStats>.Update
131+
.Set(w => w.Status, updateMe.Status)
132+
.Set(w => w.LastUpdatedUTC, DateTime.UtcNow)
133+
.Set(w => w.CompletedAtUTC, updateMe.CompletedAtUTC)
134+
.Set(w => w.DurationSeconds, duration)
135+
136+
, new UpdateOptions { IsUpsert = true }).ConfigureAwait(false);
137+
}
138+
catch (Exception e)
139+
{
140+
_logger.DatabaseException(nameof(CreateAsync), e);
141+
}
142+
}
120143
public async Task<IEnumerable<TaskExecutionStats>> GetStatsAsync(DateTime startTime, DateTime endTime, int PageSize = 10, int PageNumber = 1, string workflowInstanceId = "", string taskId = "")
121144
{
122145
startTime = startTime.ToUniversalTime();
@@ -128,12 +151,13 @@ public async Task<IEnumerable<TaskExecutionStats>> GetStatsAsync(DateTime startT
128151
T.StartedUTC >= startTime &&
129152
T.StartedUTC <= endTime.ToUniversalTime() &&
130153
(workflowinstanceNull || T.WorkflowInstanceId == workflowInstanceId) &&
131-
(taskIdNull || T.TaskId == taskId) &&
132-
(
133-
T.Status == TaskExecutionStatus.Succeeded.ToString()
134-
|| T.Status == TaskExecutionStatus.Failed.ToString()
135-
|| T.Status == TaskExecutionStatus.PartialFail.ToString()
136-
)
154+
(taskIdNull || T.TaskId == taskId)
155+
//&&
156+
//(
157+
// T.Status == TaskExecutionStatus.Succeeded.ToString()
158+
// || T.Status == TaskExecutionStatus.Failed.ToString()
159+
// || T.Status == TaskExecutionStatus.PartialFail.ToString()
160+
// )
137161
)
138162
.Limit(PageSize)
139163
.Skip((PageNumber - 1) * PageSize)
@@ -173,24 +197,19 @@ private static TaskExecutionStats ExposeExecutionStats(TaskExecutionStats taskEx
173197
}
174198
return taskExecutionStats;
175199
}
176-
177-
public async Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, string workflowInstanceId = "", string taskId = "")
200+
public async Task<long> GetStatsStatusCountAsync(DateTime start, DateTime endTime, string status = "", string workflowInstanceId = "", string taskId = "")
178201
{
202+
var statusNull = string.IsNullOrWhiteSpace(status);
179203
var workflowinstanceNull = string.IsNullOrWhiteSpace(workflowInstanceId);
180204
var taskIdNull = string.IsNullOrWhiteSpace(taskId);
181205

182206
return await _taskExecutionStatsCollection.CountDocumentsAsync(T =>
183-
T.StartedUTC >= startTime.ToUniversalTime() &&
207+
T.StartedUTC >= start.ToUniversalTime() &&
184208
T.StartedUTC <= endTime.ToUniversalTime() &&
185209
(workflowinstanceNull || T.WorkflowInstanceId == workflowInstanceId) &&
186210
(taskIdNull || T.TaskId == taskId) &&
187-
(
188-
T.Status == TaskExecutionStatus.Succeeded.ToString() ||
189-
T.Status == TaskExecutionStatus.Failed.ToString() ||
190-
T.Status == TaskExecutionStatus.PartialFail.ToString())
191-
);
211+
(statusNull || T.Status == status));
192212
}
193-
194213
public async Task<long> GetStatsStatusFailedCountAsync(DateTime start, DateTime endTime, string workflowInstanceId = "", string taskId = "")
195214
{
196215
var workflowinstanceNull = string.IsNullOrWhiteSpace(workflowInstanceId);

src/TaskManager/Plug-ins/Argo/ArgoClient.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,17 @@ public async Task<WorkflowTemplate> Argo_GetWorkflowTemplateAsync(string argoNam
143143
public virtual async Task<WorkflowTemplate> Argo_CreateWorkflowTemplateAsync(string argoNamespace, WorkflowTemplateCreateRequest body, CancellationToken cancellationToken)
144144
{
145145
Guard.Against.NullOrWhiteSpace(argoNamespace);
146-
Guard.Against.Null(body);
146+
Guard.Against.Null(body.Template);
147147

148148
var urlBuilder = new StringBuilder();
149149
urlBuilder.Append(CultureInfo.InvariantCulture, $"{FormattedBaseUrl}/api/v1/workflow-templates/{argoNamespace}");
150150

151151
var method = "POST";
152-
var content = new StringContent(Newtonsoft.Json.JsonConvert.SerializeObject(body));
152+
var stringBody = Newtonsoft.Json.JsonConvert.SerializeObject(body);
153+
var content = new StringContent(stringBody);
154+
155+
var _logger = NLog.LogManager.GetCurrentClassLogger();
156+
_logger.Debug($"Sending content to Argo :{stringBody}");
153157
return await SendRequest<WorkflowTemplate>(content, urlBuilder, method, cancellationToken).ConfigureAwait(false);
154158
}
155159

0 commit comments

Comments
 (0)