Skip to content

moving execution repo #779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/Shared/Shared/ApiControllerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
using Monai.Deploy.WorkflowManager.Shared.Filter;
using Monai.Deploy.WorkflowManager.Shared.Wrappers;
using Monai.Deploy.WorkflowManager.Shared.Services;
using Microsoft.AspNetCore.Routing;

namespace Monai.Deploy.WorkflowManager.ControllersShared
{
Expand Down Expand Up @@ -85,7 +84,7 @@ public PagedResponse<IEnumerable<T>> CreatePagedReponse<T>(IEnumerable<T> pagedD

public StatsPagedResponse<IEnumerable<T>> CreateStatsPagedReponse<T>(IEnumerable<T> pagedData, PaginationFilter validFilter, long totalRecords, IUriService uriService, string route)
{
var response = new StatsPagedResponse<IEnumerable<T>>(pagedData, validFilter.PageNumber, validFilter.PageSize.Value);
var response = new StatsPagedResponse<IEnumerable<T>>(pagedData, validFilter.PageNumber, validFilter.PageSize ?? 10);
response.SetUp(validFilter, totalRecords, uriService, route);
return response;
}
Expand Down
25 changes: 25 additions & 0 deletions src/Shared/Shared/Filter/TimeFilter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Monai.Deploy.WorkflowManager.Shared.Filter
{
public class TimeFilter : PaginationFilter
{
public DateTime StartTime { get; set; }

public DateTime EndTime { get; set; }
}
}
4 changes: 2 additions & 2 deletions src/Shared/Shared/MonaiServiceLocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public Dictionary<string, ServiceStatus> GetServiceStatus()
return _runningServices.ToDictionary(k => k.ServiceName, v => v.Status);
}

private IMonaiService GetService(Type type)
private IMonaiService? GetService(Type type)
{
Guard.Against.Null(type, nameof(type));

Expand All @@ -58,7 +58,7 @@ private static List<Type> LocateTypes()
serviceType.IsAssignableFrom(p) &&
p != serviceType &&
!p.IsAbstract &&
p.FullName.StartsWith("Monai", StringComparison.InvariantCulture));
p.FullName is not null && p.FullName.StartsWith("Monai", StringComparison.InvariantCulture));
return services.Distinct().ToList();
}

Expand Down
10 changes: 5 additions & 5 deletions src/Shared/Shared/Wrappers/PagedResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ public PagedResponse(T data, int pageNumber, int pageSize)
/// <summary>
/// Gets or sets FirstPage.
/// </summary>
public string FirstPage { get; set; }
public string? FirstPage { get; set; }

/// <summary>
/// Gets or sets LastPage.
/// </summary>
public string LastPage { get; set; }
public string? LastPage { get; set; }

/// <summary>
/// Gets or sets TotalPages.
Expand All @@ -74,12 +74,12 @@ public PagedResponse(T data, int pageNumber, int pageSize)
/// <summary>
/// Gets or sets NextPage.
/// </summary>
public string NextPage { get; set; }
public string? NextPage { get; set; }

/// <summary>
/// Gets or sets previousPage.
/// </summary>
public string PreviousPage { get; set; }
public string? PreviousPage { get; set; }

public void SetUp(PaginationFilter validFilter, long totalRecords, IUriService uriService, string route)
{
Expand All @@ -94,7 +94,7 @@ public void SetUp(PaginationFilter validFilter, long totalRecords, IUriService u
PreviousPage =
validFilter.PageNumber - 1 >= 1 && validFilter.PageNumber <= roundedTotalPages
? uriService.GetPageUriString(new PaginationFilter(validFilter.PageNumber - 1, PageSize), route)
: null;
: null;

FirstPage = uriService.GetPageUriString(new PaginationFilter(1, PageSize), route);
LastPage = uriService.GetPageUriString(new PaginationFilter(roundedTotalPages, PageSize), route);
Expand Down
4 changes: 2 additions & 2 deletions src/Shared/Shared/Wrappers/Response.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ public Response(T data)
/// <summary>
/// Gets or sets errors.
/// </summary>
public string[] Errors { get; set; }
public string?[] Errors { get; set; }

/// <summary>
/// Gets or sets message.
/// </summary>
public string Message { get; set; }
public string? Message { get; set; }
}
}
2 changes: 1 addition & 1 deletion src/TaskManager/API/Migrations/DocumentVersionConvert.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class DocumentVersionConvert : JsonConverter
var minor = 0;
var revision = 0;

var res = (reader.Value as string)?.Split(new char[] { ',' });
var res = (reader.Value as string)?.Split(new char[] { ',', '.' });

if (res?.Length == 3)
{
Expand Down
3 changes: 1 addition & 2 deletions src/TaskManager/Plug-ins/Argo/Logging/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/

using Argo;
using Microsoft.Extensions.Logging;

namespace Monai.Deploy.WorkflowManager.TaskManager.Argo.Logging
Expand Down Expand Up @@ -78,7 +77,7 @@ public static partial class Log
[LoggerMessage(EventId = 1018, Level = LogLevel.Error, Message = "Error deserializing WorkflowTemplateCreateRequest. {message}")]
public static partial void ErrorDeserializingWorkflowTemplateCreateRequest(this ILogger logger, string message, Exception ex);

[LoggerMessage(EventId = 1017, Level = LogLevel.Error, Message = "Error deleting Template in Argo.")]
[LoggerMessage(EventId = 1019, Level = LogLevel.Error, Message = "Error deleting Template in Argo.")]
public static partial void ErrorDeletingWorkflowTemplate(this ILogger logger, Exception ex);

}
Expand Down
6 changes: 0 additions & 6 deletions src/TaskManager/TaskManager/Logging/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,5 @@ public static partial class Log

[LoggerMessage(EventId = 120, Level = LogLevel.Error, Message = "Recovering connection to storage service: {reason}.")]
public static partial void MessagingServiceErrorRecover(this ILogger logger, string reason);

[LoggerMessage(EventId = 121, Level = LogLevel.Error, Message = "Unexpected error occurred in GET tasks/statsoverview API.")]
public static partial void GetStatsOverviewAsyncError(this ILogger logger, Exception ex);

[LoggerMessage(EventId = 122, Level = LogLevel.Error, Message = "Unexpected error occurred in GET tasks/stats API.")]
public static partial void GetStatsAsyncError(this ILogger logger, Exception ex);
}
}
2 changes: 0 additions & 2 deletions src/TaskManager/TaskManager/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,8 @@ private static void ConfigureServices(HostBuilderContext hostContext, IServiceCo

// Mongo DB (Workflow Manager)
services.Configure<TaskManagerDatabaseSettings>(hostContext.Configuration.GetSection("WorkloadManagerDatabase"));
services.Configure<TaskExecutionDatabaseSettings>(hostContext.Configuration.GetSection("WorkloadManagerDatabase"));
services.AddSingleton<IMongoClient, MongoClient>(s => new MongoClient(hostContext.Configuration["WorkloadManagerDatabase:ConnectionString"]));
services.AddTransient<ITaskDispatchEventRepository, TaskDispatchEventRepository>();
services.AddTransient<ITaskExecutionStatsRepository, TaskExecutionStatsRepository>();
services.AddTransient<IFileSystem, FileSystem>();
services.AddMigration(new MongoMigrationSettings
{
Expand Down
6 changes: 0 additions & 6 deletions src/TaskManager/TaskManager/TaskManager.cs
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
using Monai.Deploy.WorkflowManager.TaskManager.API;
using Monai.Deploy.WorkflowManager.TaskManager.API.Extensions;
using Monai.Deploy.WorkflowManager.TaskManager.API.Models;
using Monai.Deploy.WorkflowManager.TaskManager.Database;
using Monai.Deploy.WorkflowManager.TaskManager.Logging;

namespace Monai.Deploy.WorkflowManager.TaskManager
Expand All @@ -47,7 +46,6 @@ public class TaskManager : IHostedService, IDisposable, IMonaiService
private readonly IServiceScope _scope;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly IStorageService _storageService;
private readonly ITaskExecutionStatsRepository _taskExecutionStatsRepository;
private readonly ITaskDispatchEventService _taskDispatchEventService;
private CancellationToken _cancellationToken;
private IMessageBrokerPublisherService? _messageBrokerPublisherService;
Expand Down Expand Up @@ -76,7 +74,6 @@ public TaskManager(
_storageAdminService = _scope.ServiceProvider.GetService<IStorageAdminService>() ?? throw new ServiceNotFoundException(nameof(IStorageAdminService));
_storageService = _scope.ServiceProvider.GetService<IStorageService>() ?? throw new ServiceNotFoundException(nameof(IStorageService));
_taskDispatchEventService = _scope.ServiceProvider.GetService<ITaskDispatchEventService>() ?? throw new ServiceNotFoundException(nameof(ITaskDispatchEventService));
_taskExecutionStatsRepository = _scope.ServiceProvider.GetService<ITaskExecutionStatsRepository>() ?? throw new ServiceNotFoundException(nameof(ITaskExecutionStatsRepository));
_messageBrokerPublisherService = null;
_messageBrokerSubscriberService = null;
_activeJobs = 0;
Expand Down Expand Up @@ -230,7 +227,6 @@ private async Task HandleCancellationTask(JsonMessage<TaskCancellationEvent> mes
var taskRunner = typeof(ITaskPlugin).CreateInstance<ITaskPlugin>(serviceProvider: _scope.ServiceProvider, typeString: pluginAssembly, _serviceScopeFactory, taskExecEvent);
await taskRunner.HandleTimeout(message.Body.Identity);

await _taskExecutionStatsRepository.UpdateExecutionStatsAsync(message.Body, message.CorrelationId);
AcknowledgeMessage(message);
}
catch (Exception ex)
Expand Down Expand Up @@ -390,7 +386,6 @@ private async Task HandleDispatchTask(JsonMessage<TaskDispatchEvent> message)
await _taskDispatchEventService.CreateAsync(eventInfo).ConfigureAwait(false);
message.Body.Validate();
pluginAssembly = _options.Value.TaskManager.PluginAssemblyMappings[message.Body.TaskPluginType];
await _taskExecutionStatsRepository.CreateAsync(eventInfo);
}
catch (MessageValidationException ex)
{
Expand Down Expand Up @@ -553,7 +548,6 @@ private async Task SendUpdateEvent(JsonMessage<TaskUpdateEvent> message)

try
{
await _taskExecutionStatsRepository.UpdateExecutionStatsAsync(message.Body);
_logger.SendingTaskUpdateMessage(_options.Value.Messaging.Topics.TaskUpdateRequest, message.Body.Reason);
await _messageBrokerPublisherService!.Publish(_options.Value.Messaging.Topics.TaskUpdateRequest, message.ToMessage()).ConfigureAwait(false);
_logger.TaskUpdateMessageSent(_options.Value.Messaging.Topics.TaskUpdateRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class DocumentVersionConvert : JsonConverter
var minor = 0;
var revision = 0;

var res = (reader.Value as string)?.Split(new char[] { ',' });
var res = (reader.Value as string)?.Split(new char[] { ',', '.' });

if (res?.Length == 3)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
* limitations under the License.
*/

using Monai.Deploy.WorkflowManager.TaskManager.API.Models;
using System;
using Monai.Deploy.WorkflowManager.Contracts.Models;
using Mongo.Migration.Migrations.Document;
using MongoDB.Bson;

namespace Monai.Deploy.WorkflowManager.TaskManager.API.Migrations
namespace Monai.Deploy.WorkflowManager.Contracts.Migrations
{
public class M001_TaskExecutionStats_addVersion : DocumentMigration<TaskExecutionStats>
public class M001_TaskExecutionStats_addVersion : DocumentMigration<ExecutionStats>
{
public M001_TaskExecutionStats_addVersion() : base("1.0.0") { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
* limitations under the License.
*/

namespace Monai.Deploy.WorkflowManager.TaskManager.API.Models
using System;

namespace Monai.Deploy.WorkflowManager.Contracts.Models
{

public class ExecutionStatDTO
{
public ExecutionStatDTO(TaskExecutionStats stats)
public ExecutionStatDTO(ExecutionStats stats)
{
ExecutionId = stats.ExecutionId;
StartedAt = stats.StartedUTC;
Expand All @@ -32,7 +34,7 @@ public ExecutionStatDTO(TaskExecutionStats stats)
public DateTime StartedAt { get; set; }
public DateTime FinishedAt { get; set; }
public double ExecutionDurationSeconds { get; set; }
public string Status { get; set; } = TaskStatus.Created.ToString();
public string Status { get; set; } = "Created";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,20 @@
* limitations under the License.
*/

using System;
using System.ComponentModel.DataAnnotations;
using Monai.Deploy.WorkflowManager.Contracts.Migrations;
using Ardalis.GuardClauses;
using Monai.Deploy.Messaging.Events;
using Monai.Deploy.WorkflowManager.TaskManager.Migrations;
using Mongo.Migration.Documents;
using Mongo.Migration.Documents.Attributes;
using MongoDB.Bson.Serialization.Attributes;
using Newtonsoft.Json;

namespace Monai.Deploy.WorkflowManager.TaskManager.API.Models
namespace Monai.Deploy.WorkflowManager.Contracts.Models
{
[CollectionLocation("ExecutionStats"), RuntimeVersion("1.0.0")]
public class TaskExecutionStats : IDocument
public class ExecutionStats : IDocument
{
/// <summary>
/// Gets or sets the ID of the object.
Expand All @@ -46,28 +47,28 @@ public class TaskExecutionStats : IDocument
/// </summary>
[JsonProperty(PropertyName = "correlation_id")]
[Required]
public string CorrelationId { get; set; }
public string CorrelationId { get; set; } = "";

/// <summary>
/// the workflow Instance that triggered the event
/// </summary>
[JsonProperty(PropertyName = "workflow_instance_id")]
[Required]
public string WorkflowInstanceId { get; set; }
public string WorkflowInstanceId { get; set; } = "";

/// <summary>
/// This execution ID
/// </summary>
[JsonProperty(PropertyName = "execution_id")]
[Required]
public string ExecutionId { get; set; }
public string ExecutionId { get; set; } = "";

/// <summary>
/// The event Task ID
/// </summary>
[Required]
[JsonProperty(PropertyName = "task_id")]
public string TaskId { get; set; }
public string TaskId { get; set; } = "";

/// <summary>
/// Gets or sets the date time that the task started with the plug-in.
Expand Down Expand Up @@ -111,23 +112,23 @@ public double DurationSeconds
get; set;
}

public TaskExecutionStats()
public ExecutionStats()
{

}

public TaskExecutionStats(TaskDispatchEventInfo dispatchInfo)
public ExecutionStats(TaskExecution execution, string correlationId)
{
Guard.Against.Null(dispatchInfo, "dispatchInfo");
CorrelationId = dispatchInfo.Event.CorrelationId;
WorkflowInstanceId = dispatchInfo.Event.WorkflowInstanceId;
ExecutionId = dispatchInfo.Event.ExecutionId;
TaskId = dispatchInfo.Event.TaskId;
StartedUTC = dispatchInfo.Started.ToUniversalTime();
Status = dispatchInfo.Event.Status.ToString();
Guard.Against.Null(execution, "dispatchInfo");
CorrelationId = correlationId;
WorkflowInstanceId = execution.WorkflowInstanceId;
ExecutionId = execution.ExecutionId;
TaskId = execution.TaskId;
StartedUTC = execution.TaskStartTime.ToUniversalTime();
Status = execution.Status.ToString();
}

public TaskExecutionStats(TaskUpdateEvent taskUpdateEvent)
public ExecutionStats(TaskUpdateEvent taskUpdateEvent)
{
Guard.Against.Null(taskUpdateEvent, "taskUpdateEvent");
CorrelationId = taskUpdateEvent.CorrelationId;
Expand All @@ -137,7 +138,7 @@ public TaskExecutionStats(TaskUpdateEvent taskUpdateEvent)
Status = taskUpdateEvent.Status.ToString();
}

public TaskExecutionStats(TaskCancellationEvent taskCanceledEvent, string correlationId)
public ExecutionStats(TaskCancellationEvent taskCanceledEvent, string correlationId)
{
Guard.Against.Null(taskCanceledEvent, "taskCanceledEvent");
CorrelationId = correlationId;
Expand Down
Loading