From f0776e2471f3340f163e494220b1cfbdf94147ce Mon Sep 17 00:00:00 2001 From: Robert Johnson Date: Thu, 4 Jan 2024 18:05:04 -0500 Subject: [PATCH] Bulk delete enhancements (#3505) Add audit logging and improve throughput --- docs/rest/BulkDelete.http | 13 +- .../Features/Routing/KnownRoutes.cs | 3 + .../Features/Routing/RouteNames.cs | 2 + .../BulkDeleteOrchestratorJobTests.cs | 14 +- .../BulkDeleteProcessingJobTests.cs | 43 +++- .../CreateBulkDeleteHandlerTests.cs | 8 +- .../BulkDelete/GetBulkDeleteHandlerTests.cs | 214 +++++++++++------ .../bulk-delete-soft-deleted.json | 24 ++ .../Data/OperationDefinition/bulk-delete.json | 2 +- .../Extensions/SearchServiceExtensions.cs | 7 +- .../BulkDelete/BulkDeleteDefinition.cs | 13 +- .../BulkDeleteMediatorExtensions.cs | 4 +- .../BulkDelete/BulkDeleteOrchestratorJob.cs | 57 ++++- .../BulkDelete/BulkDeleteProcessingJob.cs | 40 +++- .../Handlers/CreateBulkDeleteHandler.cs | 10 +- .../Handlers/GetBulkDeleteHandler.cs | 34 ++- .../Messages/CreateBulkDeleteRequest.cs | 6 +- .../Operations/JobRecordProperties.cs | 4 + .../Operations/OperationsConstants.cs | 2 + .../Features/Persistence/IDeletionService.cs | 2 +- .../Features/Search/SearchService.cs | 23 +- .../ConditionalDeleteResourceRequest.cs | 7 +- .../Microsoft.Health.Fhir.Core.csproj | 3 +- .../Controllers/BulkDeleteController.cs | 32 ++- .../OperationDefinitionController.cs | 8 + .../Resources/ProvenanceHeaderBehavior.cs | 1 + .../Resources/ResourceHandlerTests.cs | 18 +- .../ConditionalDeleteResourceHandler.cs | 4 +- .../Resources/Delete/DeletionService.cs | 223 ++++++++++++++---- .../Features/Search/SqlCommandSimplifier.cs | 1 + .../Features/Storage/JobInfoExtensions.cs | 2 +- .../JobInfoExtensions.cs | 4 +- .../Rest/BulkDeleteTests.cs | 18 +- .../Persistence/FhirStorageTestsFixture.cs | 7 +- tools/PowerShell/MonitorAsyncJobs.ps1 | 76 ++++++ .../RegisterAndMonitorConfiguration.cs | 11 + .../RegisterAndMonitorImport.cs | 24 +- .../VerbosityLevel.cs | 17 ++ tools/RegisterAndMonitorImport/readme.md | 3 +- 39 files changed, 782 insertions(+), 202 deletions(-) create mode 100644 src/Microsoft.Health.Fhir.Core/Data/OperationDefinition/bulk-delete-soft-deleted.json create mode 100644 tools/PowerShell/MonitorAsyncJobs.ps1 create mode 100644 tools/RegisterAndMonitorImport/VerbosityLevel.cs diff --git a/docs/rest/BulkDelete.http b/docs/rest/BulkDelete.http index f4c654c061..85b57d1891 100644 --- a/docs/rest/BulkDelete.http +++ b/docs/rest/BulkDelete.http @@ -28,7 +28,7 @@ Authorization: Bearer {{bearer.response.body.access_token}} ### Record Bulk Delete content location @bulkDeleteLocation = {{bulkDelete.response.headers.Content-Location}} -### Bulk Delete with search parameteres +### Bulk Delete with search parameteres. This also sets data up for the soft deleted sample below. # @name bulkDelete DELETE https://{{hostname}}/$bulk-delete?_tag=oldData Prefer: respond-async @@ -46,9 +46,16 @@ DELETE https://{{hostname}}/$bulk-delete?_tag=oldData&_purgeHistory=true Prefer: respond-async Authorization: Bearer {{bearer.response.body.access_token}} -### Bulk Delete Encounters with hard delete +### Bulk Delete Patient with hard delete # @name bulkDelete -DELETE https://{{hostname}}/Encounter/$bulk-delete?_hardDelete=true +DELETE https://{{hostname}}/Patient/$bulk-delete?_tag=oldData&_hardDelete=true +Prefer: respond-async +Authorization: Bearer {{bearer.response.body.access_token}} + +### Bulk Delete soft deleted resources. Run the $bulk-delete sample above to setup data for this test. +# Since this test can't use _tag it will delete all soft deleted data in the database. +# @name bulkDelete +DELETE https://{{hostname}}/$bulk-delete-soft-deleted Prefer: respond-async Authorization: Bearer {{bearer.response.body.access_token}} diff --git a/src/Microsoft.Health.Fhir.Api/Features/Routing/KnownRoutes.cs b/src/Microsoft.Health.Fhir.Api/Features/Routing/KnownRoutes.cs index 1421957e04..a2a234f28a 100644 --- a/src/Microsoft.Health.Fhir.Api/Features/Routing/KnownRoutes.cs +++ b/src/Microsoft.Health.Fhir.Api/Features/Routing/KnownRoutes.cs @@ -87,8 +87,11 @@ internal class KnownRoutes public const string BulkDelete = "$bulk-delete"; public const string BulkDeleteResourceType = ResourceType + "/" + BulkDelete; + public const string BulkDeleteSoftDeleted = "$bulk-delete-soft-deleted"; + public const string BulkDeleteSoftDeletedResourceType = ResourceType + "/" + BulkDeleteSoftDeleted; public const string BulkDeleteJobLocation = OperationsConstants.Operations + "/" + OperationsConstants.BulkDelete + "/" + IdRouteSegment; public const string BulkDeleteOperationDefinition = OperationDefinition + "/" + OperationsConstants.BulkDelete; public const string ResourceTypeBulkDeleteOperationDefinition = OperationDefinition + "/" + OperationsConstants.ResourceTypeBulkDelete; + public const string BulkDeleteSoftDeletedOperationDefinition = OperationDefinition + "/" + OperationsConstants.BulkDeleteSoftDeleted; } } diff --git a/src/Microsoft.Health.Fhir.Api/Features/Routing/RouteNames.cs b/src/Microsoft.Health.Fhir.Api/Features/Routing/RouteNames.cs index 88272dc0d1..6d2e3d56ac 100644 --- a/src/Microsoft.Health.Fhir.Api/Features/Routing/RouteNames.cs +++ b/src/Microsoft.Health.Fhir.Api/Features/Routing/RouteNames.cs @@ -78,5 +78,7 @@ internal static class RouteNames internal const string CancelBulkDelete = nameof(CancelBulkDelete); internal const string BulkDeleteDefinition = nameof(BulkDeleteDefinition); + + internal const string BulkDeleteSoftDeletedDefinition = nameof(BulkDeleteSoftDeletedDefinition); } } diff --git a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/BulkDeleteOrchestratorJobTests.cs b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/BulkDeleteOrchestratorJobTests.cs index 686242bad8..3ff5a559d0 100644 --- a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/BulkDeleteOrchestratorJobTests.cs +++ b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/BulkDeleteOrchestratorJobTests.cs @@ -34,6 +34,11 @@ public BulkDeleteOrchestratorJobTests() { _queueClient = Substitute.For(); _searchService = Substitute.For(); + _searchService.SearchAsync(Arg.Any(), Arg.Any>>(), Arg.Any()).Returns((x) => + { + var result = new SearchResult(2, new List>()); + return Task.FromResult(result); + }); _orchestratorJob = new BulkDeleteOrchestratorJob(_queueClient, _searchService); _progress = new Progress((result) => { }); @@ -62,9 +67,14 @@ public async Task GivenBulkDeleteJob_WhenNoResourceTypeIsGiven_ThenProcessingJob await _queueClient.ReceivedWithAnyArgs(1).EnqueueAsync(Arg.Any(), Arg.Any(), Arg.Any(), false, false, Arg.Any()); await _searchService.ReceivedWithAnyArgs(1).GetUsedResourceTypes(Arg.Any()); - // Checks that two processing jobs were queued + // Checks that one processing job was queued var calls = _queueClient.ReceivedCalls(); - Assert.Equal(2, ((string[])calls.First().GetArguments()[1]).Length); + var definitions = (string[])calls.First().GetArguments()[1]; + Assert.Single(definitions); + + // Checks that the processing job lists both resource types + var actualDefinition = JsonConvert.DeserializeObject(definitions[0]); + Assert.Equal(2, actualDefinition.Type.SplitByOrSeparator().Count()); } [Fact] diff --git a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/BulkDeleteProcessingJobTests.cs b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/BulkDeleteProcessingJobTests.cs index 182cd2e19e..25e93c3765 100644 --- a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/BulkDeleteProcessingJobTests.cs +++ b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/BulkDeleteProcessingJobTests.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using MediatR; @@ -14,6 +15,7 @@ using Microsoft.Health.Fhir.Core.Features.Operations; using Microsoft.Health.Fhir.Core.Features.Operations.BulkDelete; using Microsoft.Health.Fhir.Core.Features.Persistence; +using Microsoft.Health.Fhir.Core.Features.Search; using Microsoft.Health.Fhir.Core.Messages.Delete; using Microsoft.Health.Fhir.Tests.Common; using Microsoft.Health.JobManagement; @@ -31,13 +33,19 @@ public class BulkDeleteProcessingJobTests private IDeletionService _deleter; private IProgress _progress; private BulkDeleteProcessingJob _processingJob; + private ISearchService _searchService; + private IQueueClient _queueClient; public BulkDeleteProcessingJobTests() { + _searchService = Substitute.For(); + _searchService.SearchAsync(Arg.Any(), Arg.Any>>(), Arg.Any(), resourceVersionTypes: Arg.Any()) + .Returns(Task.FromResult(new SearchResult(5, new List>()))); + _queueClient = Substitute.For(); _deleter = Substitute.For(); var deleter = Substitute.For>(); deleter.Value.Returns(_deleter); - _processingJob = new BulkDeleteProcessingJob(() => deleter, Substitute.For>(), Substitute.For()); + _processingJob = new BulkDeleteProcessingJob(() => deleter, Substitute.For>(), Substitute.For(), _searchService, _queueClient); _progress = new Progress((result) => { }); } @@ -55,7 +63,7 @@ public async Task GivenProcessingJob_WhenJobIsRun_ThenResourcesAreDeleted() }; _deleter.DeleteMultipleAsync(Arg.Any(), Arg.Any()) - .Returns(args => new HashSet { "1", "2", "3"}); + .Returns(args => 3); var result = JsonConvert.DeserializeObject(await _processingJob.ExecuteAsync(jobInfo, _progress, CancellationToken.None)); Assert.Single(result.ResourcesDeleted); @@ -63,5 +71,36 @@ public async Task GivenProcessingJob_WhenJobIsRun_ThenResourcesAreDeleted() await _deleter.ReceivedWithAnyArgs(1).DeleteMultipleAsync(Arg.Any(), Arg.Any()); } + + [Fact] + public async Task GivenProcessingJob_WhenJobIsRunWithMultipleResourceTypes_ThenFollowupJobIsCreated() + { + _deleter.ClearReceivedCalls(); + + var definition = new BulkDeleteDefinition(JobType.BulkDeleteProcessing, DeleteOperation.HardDelete, "Patient,Observation,Device", new List>(), "https:\\\\test.com", "https:\\\\test.com", "test"); + var jobInfo = new JobInfo() + { + Id = 1, + Definition = JsonConvert.SerializeObject(definition), + }; + + _deleter.DeleteMultipleAsync(Arg.Any(), Arg.Any()) + .Returns(args => 3); + + var result = JsonConvert.DeserializeObject(await _processingJob.ExecuteAsync(jobInfo, _progress, CancellationToken.None)); + Assert.Single(result.ResourcesDeleted); + Assert.Equal(3, result.ResourcesDeleted["Patient"]); + + await _deleter.ReceivedWithAnyArgs(1).DeleteMultipleAsync(Arg.Any(), Arg.Any()); + + // Checks that one processing job was queued + var calls = _queueClient.ReceivedCalls(); + var definitions = (string[])calls.First().GetArguments()[1]; + Assert.Single(definitions); + + // Checks that the processing job removed the resource type that was processed and lists the remaining two resource types + var actualDefinition = JsonConvert.DeserializeObject(definitions[0]); + Assert.Equal(2, actualDefinition.Type.SplitByOrSeparator().Count()); + } } } diff --git a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/CreateBulkDeleteHandlerTests.cs b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/CreateBulkDeleteHandlerTests.cs index f6826f535b..9f832e9763 100644 --- a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/CreateBulkDeleteHandlerTests.cs +++ b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/CreateBulkDeleteHandlerTests.cs @@ -86,7 +86,7 @@ public async Task GivenBulkDeleteRequest_WhenJobCreationRequested_ThenJobIsCreat }; }); - var request = new CreateBulkDeleteRequest(DeleteOperation.HardDelete, KnownResourceTypes.Patient, searchParams); + var request = new CreateBulkDeleteRequest(DeleteOperation.HardDelete, KnownResourceTypes.Patient, searchParams, false); var response = await _handler.Handle(request, CancellationToken.None); Assert.NotNull(response); @@ -105,7 +105,7 @@ public async Task GivenBulkDeleteRequestWithInvalidSearchParameter_WhenJobCreati _authorizationService.CheckAccess(Arg.Any(), Arg.Any()).Returns(DataActions.HardDelete | DataActions.Delete); _contextAccessor.RequestContext.BundleIssues.Add(new OperationOutcomeIssue(OperationOutcomeConstants.IssueSeverity.Warning, OperationOutcomeConstants.IssueType.Conflict)); - var request = new CreateBulkDeleteRequest(DeleteOperation.HardDelete, KnownResourceTypes.Patient, searchParams); + var request = new CreateBulkDeleteRequest(DeleteOperation.HardDelete, KnownResourceTypes.Patient, searchParams, false); await Assert.ThrowsAsync(async () => await _handler.Handle(request, CancellationToken.None)); } @@ -118,7 +118,7 @@ public async Task GivenUnauthorizedUser_WhenJobCreationRequested_ThenUnauthorize { _authorizationService.CheckAccess(Arg.Any(), Arg.Any()).Returns(userRole); - var request = new CreateBulkDeleteRequest(deleteOperation, null, null); + var request = new CreateBulkDeleteRequest(deleteOperation, null, null, false); await Assert.ThrowsAsync(async () => await _handler.Handle(request, CancellationToken.None)); } @@ -129,7 +129,7 @@ public async Task GivenBulkDeleteRequest_WhenJobCreationFails_ThenExceptionIsThr _contextAccessor.RequestContext.BundleIssues.Clear(); _queueClient.EnqueueAsync((byte)QueueType.BulkDelete, Arg.Any(), Arg.Any(), false, false, Arg.Any()).Returns(new List()); - var request = new CreateBulkDeleteRequest(DeleteOperation.HardDelete, null, new List>()); + var request = new CreateBulkDeleteRequest(DeleteOperation.HardDelete, null, new List>(), false); await Assert.ThrowsAsync(async () => await _handler.Handle(request, CancellationToken.None)); } } diff --git a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/GetBulkDeleteHandlerTests.cs b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/GetBulkDeleteHandlerTests.cs index b190c8d6e5..bb35c1dba5 100644 --- a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/GetBulkDeleteHandlerTests.cs +++ b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/GetBulkDeleteHandlerTests.cs @@ -57,8 +57,8 @@ public async Task GivenCompletedBulkDeleteJob_WhenStatusRequested_ThenStatusIsRe var resourcesDeleted = new List> { - new(KnownResourceTypes.Patient, new FhirDecimal(22)), - new(KnownResourceTypes.Observation, new FhirDecimal(5)), + new(KnownResourceTypes.Patient, new Integer64(22)), + new(KnownResourceTypes.Observation, new Integer64(5)), }; var resultsDictionary = new Dictionary>>() @@ -67,23 +67,29 @@ public async Task GivenCompletedBulkDeleteJob_WhenStatusRequested_ThenStatusIsRe }; await RunGetBulkDeleteTest( - new List + new List> { - new() - { - Status = JobStatus.Completed, - Result = JsonConvert.SerializeObject(patientResult1), - }, - new() - { - Status = JobStatus.Completed, - Result = JsonConvert.SerializeObject(patientResult2), - }, - new() - { - Status = JobStatus.Completed, - Result = JsonConvert.SerializeObject(observationResult), - }, + new Tuple( + new() + { + Status = JobStatus.Completed, + Result = JsonConvert.SerializeObject(patientResult1), + }, + 15), + new Tuple( + new() + { + Status = JobStatus.Completed, + Result = JsonConvert.SerializeObject(patientResult2), + }, + 7), + new Tuple( + new() + { + Status = JobStatus.Completed, + Result = JsonConvert.SerializeObject(observationResult), + }, + 5), }, new GetBulkDeleteResponse(ToParameters(resultsDictionary).ToArray(), null, System.Net.HttpStatusCode.OK)); } @@ -98,8 +104,8 @@ public async Task GivenRunningBulkDeleteJob_WhenStatusRequested_ThenStatusIsRetu var resourcesDeleted = new List>() { - new(KnownResourceTypes.Patient, new FhirDecimal(15)), - new(KnownResourceTypes.Observation, new FhirDecimal(5)), + new(KnownResourceTypes.Patient, new Integer64(15)), + new(KnownResourceTypes.Observation, new Integer64(5)), }; var resultsDictionary = new Dictionary>>() @@ -116,22 +122,28 @@ public async Task GivenRunningBulkDeleteJob_WhenStatusRequested_ThenStatusIsRetu }; await RunGetBulkDeleteTest( - new List + new List>() { - new() - { - Status = JobStatus.Completed, - Result = JsonConvert.SerializeObject(patientResult1), - }, - new() - { - Status = JobStatus.Running, - }, - new() - { - Status = JobStatus.Completed, - Result = JsonConvert.SerializeObject(observationResult), - }, + new Tuple( + new() + { + Status = JobStatus.Completed, + Result = JsonConvert.SerializeObject(patientResult1), + }, + 15), + new Tuple( + new() + { + Status = JobStatus.Running, + }, + 0), + new Tuple( + new() + { + Status = JobStatus.Completed, + Result = JsonConvert.SerializeObject(observationResult), + }, + 5), }, new GetBulkDeleteResponse(ToParameters(resultsDictionary).ToArray(), issues, System.Net.HttpStatusCode.Accepted)); } @@ -144,7 +156,7 @@ public async Task GivenFailedBulkDeleteJob_WhenStatusRequested_ThenStatusIsRetur var resourcesDeleted = new List> { - new(KnownResourceTypes.Patient, new FhirDecimal(15)), + new(KnownResourceTypes.Patient, new Integer64(15)), }; var resultsDictionary = new Dictionary>>() @@ -161,26 +173,34 @@ public async Task GivenFailedBulkDeleteJob_WhenStatusRequested_ThenStatusIsRetur }; await RunGetBulkDeleteTest( - new List + new List>() { - new() - { - Status = JobStatus.Completed, - Result = JsonConvert.SerializeObject(patientResult1), - }, - new() - { - Status = JobStatus.Running, - }, - new() - { - Status = JobStatus.Failed, - Result = JsonConvert.SerializeObject(new { message = "Job failed" }), - }, - new() - { - Status = JobStatus.Running, - }, + new Tuple( + new() + { + Status = JobStatus.Completed, + Result = JsonConvert.SerializeObject(patientResult1), + }, + 15), + new Tuple( + new() + { + Status = JobStatus.Running, + }, + 0), + new Tuple( + new() + { + Status = JobStatus.Failed, + Result = JsonConvert.SerializeObject(new { message = "Job failed" }), + }, + 0), + new Tuple( + new() + { + Status = JobStatus.Running, + }, + 0), }, new GetBulkDeleteResponse(ToParameters(resultsDictionary).ToArray(), issues, System.Net.HttpStatusCode.InternalServerError)); } @@ -193,7 +213,7 @@ public async Task GivenCancelledBulkDeleteJob_WhenStatusRequested_ThenStatusIsRe var resourcesDeleted = new List>() { - new(KnownResourceTypes.Patient, new FhirDecimal(15)), + new(KnownResourceTypes.Patient, new Integer64(15)), }; var resultsDictionary = new Dictionary>>() @@ -210,21 +230,65 @@ public async Task GivenCancelledBulkDeleteJob_WhenStatusRequested_ThenStatusIsRe }; await RunGetBulkDeleteTest( - new List + new List>() { - new() - { - Status = JobStatus.Completed, - Result = JsonConvert.SerializeObject(patientResult1), - }, - new() - { - Status = JobStatus.Cancelled, - }, - new() - { - Status = JobStatus.Running, - }, + new Tuple( + new() + { + Status = JobStatus.Completed, + Result = JsonConvert.SerializeObject(patientResult1), + }, + 15), + new Tuple( + new() + { + Status = JobStatus.Cancelled, + }, + 0), + new Tuple( + new() + { + Status = JobStatus.Running, + }, + 0), + }, + new GetBulkDeleteResponse(ToParameters(resultsDictionary).ToArray(), issues, System.Net.HttpStatusCode.OK)); + } + + [Fact] + public async Task GivenMiscountedBulkDeleteJob_WhenStatusRequested_ThenStatusIsReturned() + { + var patientResult1 = new BulkDeleteResult(); + patientResult1.ResourcesDeleted.Add(KnownResourceTypes.Patient, 15); + + var resourcesDeleted = new List> + { + new(KnownResourceTypes.Patient, new Integer64(15)), + }; + + var resultsDictionary = new Dictionary>>() + { + { _countLabel, resourcesDeleted }, + }; + + var issues = new List() + { + new( + OperationOutcomeConstants.IssueSeverity.Warning, + OperationOutcomeConstants.IssueType.Informational, + detailsText: "There was a count mismatch when checking the job results. This could mean a job was restarted unexpetedly or resources were deleted by another process while the job was running. Please double check that all desired resources have been deleted. Audit logs can be referenced to get a list of the resources deleted during this operation."), + }; + + await RunGetBulkDeleteTest( + new List>() + { + new Tuple( + new() + { + Status = JobStatus.Completed, + Result = JsonConvert.SerializeObject(patientResult1), + }, + 17), }, new GetBulkDeleteResponse(ToParameters(resultsDictionary).ToArray(), issues, System.Net.HttpStatusCode.OK)); } @@ -248,17 +312,17 @@ public async Task GivenNonExistantBulkDeleteJob_WhenStatusRequested_ThenNotFound await Assert.ThrowsAsync(async () => await _handler.Handle(request, CancellationToken.None)); } - private async Task RunGetBulkDeleteTest(IReadOnlyList jobs, GetBulkDeleteResponse expectedResponse) + private async Task RunGetBulkDeleteTest(IReadOnlyList> jobs, GetBulkDeleteResponse expectedResponse) { _authorizationService.CheckAccess(Arg.Any(), Arg.Any()).Returns(DataActions.Read); - var definition = JsonConvert.SerializeObject(new BulkDeleteDefinition(JobType.BulkDeleteProcessing, DeleteOperation.HardDelete, null, null, "test", "test", "test")); foreach (var job in jobs) { - job.Definition = definition; + var definition = JsonConvert.SerializeObject(new BulkDeleteDefinition(JobType.BulkDeleteProcessing, DeleteOperation.HardDelete, null, null, "test", "test", "test", job.Item2)); + job.Item1.Definition = definition; } - _queueClient.GetJobByGroupIdAsync((byte)QueueType.BulkDelete, Arg.Any(), true, Arg.Any()).Returns(jobs); + _queueClient.GetJobByGroupIdAsync((byte)QueueType.BulkDelete, Arg.Any(), true, Arg.Any()).Returns(jobs.Select(job => job.Item1).ToList()); var request = new GetBulkDeleteRequest(1); var response = await _handler.Handle(request, CancellationToken.None); @@ -277,7 +341,7 @@ private async Task RunGetBulkDeleteTest(IReadOnlyList jobs, GetBulkDele } else { - Assert.Null(response.Issues); + Assert.Empty(response.Issues); } if (expectedResponse.Results != null) diff --git a/src/Microsoft.Health.Fhir.Core/Data/OperationDefinition/bulk-delete-soft-deleted.json b/src/Microsoft.Health.Fhir.Core/Data/OperationDefinition/bulk-delete-soft-deleted.json new file mode 100644 index 0000000000..dbb7538d8a --- /dev/null +++ b/src/Microsoft.Health.Fhir.Core/Data/OperationDefinition/bulk-delete-soft-deleted.json @@ -0,0 +1,24 @@ +{ + "resourceType": "OperationDefinition", + "id": "bulk-delete-soft-deleted", + "url": "[base]/OperationDefinition/bulk-delete-soft-deleted", + "version": "1.0.0", + "name": "Bulk Delete Soft Deleted", + "status": "active", + "kind": "operation", + "description": "Deletes all data from a FHIR server that has been soft deleted and matches the search parameters. The only supported search parameter is _lastUpdated. This operation is asynchronous as defined in the [FHIR Asynchronous Request Pattern](http://hl7.org/fhir/async.html)", + "code": "bulk-delete-soft-deleted", + "system": true, + "type": true, + "instance": false, + "parameter": [ + { + "name": "purgeHistory", + "use": "in", + "min": 0, + "max": "1", + "documentation": "A boolean flag to indicate whether the resources' historical versions should be removed from the database. Setting this flag removes the affected resources' historical records but leaves the current verisons.", + "type": "boolean" + } + ] +} diff --git a/src/Microsoft.Health.Fhir.Core/Data/OperationDefinition/bulk-delete.json b/src/Microsoft.Health.Fhir.Core/Data/OperationDefinition/bulk-delete.json index b0dcb00dff..a6c8ef3a90 100644 --- a/src/Microsoft.Health.Fhir.Core/Data/OperationDefinition/bulk-delete.json +++ b/src/Microsoft.Health.Fhir.Core/Data/OperationDefinition/bulk-delete.json @@ -25,7 +25,7 @@ "use": "in", "min": 0, "max": "1", - "documentation": "A boolean flag to indicate whether the resources' historical versions should be removed from the database. Setting this flag removes the affected resources' historical records but leaves the current verisons. Must be used with the hardDelete parameter.", + "documentation": "A boolean flag to indicate whether the resources' historical versions should be removed from the database. Setting this flag removes the affected resources' historical records but leaves the current verisons.", "type": "boolean" } ] diff --git a/src/Microsoft.Health.Fhir.Core/Extensions/SearchServiceExtensions.cs b/src/Microsoft.Health.Fhir.Core/Extensions/SearchServiceExtensions.cs index d9f5b8c9a2..0a36ff2271 100644 --- a/src/Microsoft.Health.Fhir.Core/Extensions/SearchServiceExtensions.cs +++ b/src/Microsoft.Health.Fhir.Core/Extensions/SearchServiceExtensions.cs @@ -41,6 +41,8 @@ public static class SearchServiceExtensions /// The CancellationToken /// The search Count. /// An optional ContinuationToken + /// The versions of a resource to return + /// The logger /// Search collection and a continuationToken /// Returns this exception when all passed in params match the search result unusedParams internal static async Task<(IReadOnlyCollection Results, string ContinuationToken)> ConditionalSearchAsync( @@ -50,7 +52,8 @@ public static class SearchServiceExtensions CancellationToken cancellationToken, int? count = 2, // Most "Conditional" logic needs only 0, 1 or >1, so here we can limit to "2" string continuationToken = null, - Microsoft.Extensions.Logging.ILogger logger = null) + ResourceVersionType versionType = ResourceVersionType.Latest, + ILogger logger = null) { // Filters search parameters that can limit the number of results (e.g. _count=1) List> filteredParameters = conditionalParameters @@ -80,7 +83,7 @@ public static class SearchServiceExtensions statistics.Iterate(); - SearchResult results = await searchService.SearchAsync(instanceType, searchParameters.ToImmutableList(), cancellationToken); + SearchResult results = await searchService.SearchAsync(instanceType, searchParameters.ToImmutableList(), cancellationToken, resourceVersionTypes: versionType); lastContinuationToken = results?.ContinuationToken; // Check if all parameters passed in were unused, this would result in no search parameters being applied to search results diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteDefinition.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteDefinition.cs index bdcdb6b4ed..aff6783779 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteDefinition.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteDefinition.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; +using Microsoft.Health.Fhir.Core.Features.Search; using Microsoft.Health.Fhir.Core.Messages.Delete; using Microsoft.Health.JobManagement; using Newtonsoft.Json; @@ -20,7 +21,9 @@ public BulkDeleteDefinition( IList> searchParameters, string url, string baseUrl, - string parentRequestId) + string parentRequestId, + long startingResourceCount = 0, + ResourceVersionType versionType = ResourceVersionType.Latest) { TypeId = (int)jobType; DeleteOperation = deleteOperation; @@ -29,6 +32,8 @@ public BulkDeleteDefinition( Url = url; BaseUrl = baseUrl; ParentRequestId = parentRequestId; + ExpectedResourceCount = startingResourceCount; + VersionType = versionType; } [JsonConstructor] @@ -56,5 +61,11 @@ protected BulkDeleteDefinition() [JsonProperty(JobRecordProperties.ParentRequestId)] public string ParentRequestId { get; private set; } + + [JsonProperty(JobRecordProperties.ExpectedResourceCount)] + public long ExpectedResourceCount { get; private set; } + + [JsonProperty(JobRecordProperties.VersionType)] + public ResourceVersionType VersionType { get; private set; } } } diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteMediatorExtensions.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteMediatorExtensions.cs index 42d4c9dcb8..50a7f99297 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteMediatorExtensions.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteMediatorExtensions.cs @@ -16,11 +16,11 @@ namespace Microsoft.Health.Fhir.Core.Features.Operations.BulkDelete { public static class BulkDeleteMediatorExtensions { - public static async Task BulkDeleteAsync(this IMediator mediator, DeleteOperation deleteOperation, string resourceType, IList> searchParameters, CancellationToken cancellationToken) + public static async Task BulkDeleteAsync(this IMediator mediator, DeleteOperation deleteOperation, string resourceType, IList> searchParameters, bool includeSoftDeleted, CancellationToken cancellationToken) { EnsureArg.IsNotNull(mediator, nameof(mediator)); - var request = new CreateBulkDeleteRequest(deleteOperation, resourceType, searchParameters); + var request = new CreateBulkDeleteRequest(deleteOperation, resourceType, searchParameters, includeSoftDeleted); CreateBulkDeleteResponse response = await mediator.Send(request, cancellationToken); return response; diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteOrchestratorJob.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteOrchestratorJob.cs index c48d9cc5b5..e5d01b6490 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteOrchestratorJob.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteOrchestratorJob.cs @@ -5,9 +5,11 @@ using System; using System.Collections.Generic; +using System.Collections.ObjectModel; using System.Threading; using System.Threading.Tasks; using EnsureThat; +using Hl7.Fhir.Rest; using Microsoft.Health.Fhir.Core.Features.Search; using Microsoft.Health.JobManagement; @@ -38,25 +40,62 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre BulkDeleteDefinition definition = jobInfo.DeserializeDefinition(); - var definitions = new List(); + BulkDeleteDefinition processingDefinition = null; + if (string.IsNullOrEmpty(definition.Type)) { IReadOnlyList resourceTypes = await _searchService.GetUsedResourceTypes(cancellationToken); - foreach (var resourceType in resourceTypes) - { - var processingDefinition = new BulkDeleteDefinition(JobType.BulkDeleteProcessing, definition.DeleteOperation, resourceType, definition.SearchParameters, definition.Url, definition.BaseUrl, definition.ParentRequestId); - definitions.Add(processingDefinition); - } + processingDefinition = await CreateProcessingDefinition(definition, _searchService, new List(resourceTypes), cancellationToken); } else { - var processingDefinition = new BulkDeleteDefinition(JobType.BulkDeleteProcessing, definition.DeleteOperation, definition.Type, definition.SearchParameters, definition.Url, definition.BaseUrl, definition.ParentRequestId); - definitions.Add(processingDefinition); + processingDefinition = await CreateProcessingDefinition(definition, _searchService, new List() { definition.Type }, cancellationToken); } - await _queueClient.EnqueueAsync(QueueType.BulkDelete, cancellationToken, jobInfo.GroupId, definitions: definitions.ToArray()); + await _queueClient.EnqueueAsync(QueueType.BulkDelete, cancellationToken, jobInfo.GroupId, definitions: processingDefinition); return OperationCompleted; } + + // Creates a bulk delete processing job. + // Each processing job only deletes one resource type, but it contains a comma seperated list of all resource types to be deleted. Once one type is deleted it will start a new job to delete the next one. + internal static async Task CreateProcessingDefinition(BulkDeleteDefinition baseDefinition, ISearchService searchService, IList resourceTypes, CancellationToken cancellationToken) + { + var searchParameters = new List>() + { + new Tuple(KnownQueryParameterNames.Summary, "count"), + }; + + if (baseDefinition.SearchParameters != null) + { + searchParameters.AddRange(baseDefinition.SearchParameters); + } + + while (resourceTypes.Count > 0) + { + int numResources = (await searchService.SearchAsync(resourceTypes[0], searchParameters, cancellationToken, resourceVersionTypes: baseDefinition.VersionType)).TotalCount.GetValueOrDefault(); + + if (numResources == 0) + { + resourceTypes.RemoveAt(0); + continue; + } + + string resourceType = resourceTypes.JoinByOrSeparator(); + + return new BulkDeleteDefinition( + JobType.BulkDeleteProcessing, + baseDefinition.DeleteOperation, + resourceType, + baseDefinition.SearchParameters, + baseDefinition.Url, + baseDefinition.BaseUrl, + baseDefinition.ParentRequestId, + numResources, + baseDefinition.VersionType); + } + + return null; + } } } diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteProcessingJob.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteProcessingJob.cs index c09e87063b..07fcee0934 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteProcessingJob.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteProcessingJob.cs @@ -18,6 +18,7 @@ using Microsoft.Health.Fhir.Core.Features.Context; using Microsoft.Health.Fhir.Core.Features.Operations.BulkDelete.Messages; using Microsoft.Health.Fhir.Core.Features.Persistence; +using Microsoft.Health.Fhir.Core.Features.Search; using Microsoft.Health.Fhir.Core.Messages.Delete; using Microsoft.Health.JobManagement; using Newtonsoft.Json; @@ -30,15 +31,21 @@ public class BulkDeleteProcessingJob : IJob private readonly Func> _deleterFactory; private readonly RequestContextAccessor _contextAccessor; private readonly IMediator _mediator; + private readonly ISearchService _searchService; + private readonly IQueueClient _queueClient; public BulkDeleteProcessingJob( Func> deleterFactory, RequestContextAccessor contextAccessor, - IMediator mediator) + IMediator mediator, + ISearchService searchService, + IQueueClient queueClient) { _deleterFactory = EnsureArg.IsNotNull(deleterFactory, nameof(deleterFactory)); _contextAccessor = EnsureArg.IsNotNull(contextAccessor, nameof(contextAccessor)); _mediator = EnsureArg.IsNotNull(mediator, nameof(mediator)); + _searchService = EnsureArg.IsNotNull(searchService, nameof(searchService)); + _queueClient = EnsureArg.IsNotNull(queueClient, nameof(queueClient)); } public async Task ExecuteAsync(JobInfo jobInfo, IProgress progress, CancellationToken cancellationToken) @@ -58,7 +65,7 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre method: "BulkDelete", uriString: definition.Url, baseUriString: definition.BaseUrl, - correlationId: jobInfo.Id.ToString(), + correlationId: jobInfo.Id.ToString() + '-' + jobInfo.GroupId.ToString(), requestHeaders: new Dictionary(), responseHeaders: new Dictionary()) { @@ -67,31 +74,33 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre _contextAccessor.RequestContext = fhirRequestContext; var result = new BulkDeleteResult(); - IReadOnlySet itemsDeleted; + long numDeleted; using IScoped deleter = _deleterFactory.Invoke(); Exception exception = null; + List types = definition.Type.SplitByOrSeparator().ToList(); try { - itemsDeleted = await deleter.Value.DeleteMultipleAsync( + numDeleted = await deleter.Value.DeleteMultipleAsync( new ConditionalDeleteResourceRequest( - definition.Type, + types[0], (IReadOnlyList>)definition.SearchParameters, definition.DeleteOperation, maxDeleteCount: null, - deleteAll: true), + deleteAll: true, + versionType: definition.VersionType), cancellationToken); } - catch (IncompleteOperationException> ex) + catch (IncompleteOperationException ex) { - itemsDeleted = ex.PartialResults; + numDeleted = ex.PartialResults; result.Issues.Add(ex.Message); exception = ex; } - result.ResourcesDeleted.Add(definition.Type, itemsDeleted.Count); + result.ResourcesDeleted.Add(types[0], numDeleted); - await _mediator.Publish(new BulkDeleteMetricsNotification(jobInfo.Id, itemsDeleted.Count), cancellationToken); + await _mediator.Publish(new BulkDeleteMetricsNotification(jobInfo.Id, numDeleted), cancellationToken); if (exception != null) { @@ -100,6 +109,17 @@ public async Task ExecuteAsync(JobInfo jobInfo, IProgress progre throw jobException; } + if (types.Count > 1) + { + types.RemoveAt(0); + BulkDeleteDefinition processingDefinition = await BulkDeleteOrchestratorJob.CreateProcessingDefinition(definition, _searchService, types, cancellationToken); + + if (processingDefinition != null) + { + await _queueClient.EnqueueAsync(QueueType.BulkDelete, cancellationToken, jobInfo.GroupId, definitions: processingDefinition); + } + } + return JsonConvert.SerializeObject(result); } finally diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/Handlers/CreateBulkDeleteHandler.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/Handlers/CreateBulkDeleteHandler.cs index 9aa899bbc5..7cbaf74619 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/Handlers/CreateBulkDeleteHandler.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/Handlers/CreateBulkDeleteHandler.cs @@ -71,7 +71,15 @@ public async Task Handle(CreateBulkDeleteRequest reque throw new BadRequestException(_contextAccessor.RequestContext.BundleIssues.Select(issue => issue.Diagnostics).ToList()); } - var processingDefinition = new BulkDeleteDefinition(JobType.BulkDeleteOrchestrator, request.DeleteOperation, request.ResourceType, searchParameters, _contextAccessor.RequestContext.Uri.ToString(), _contextAccessor.RequestContext.BaseUri.ToString(), _contextAccessor.RequestContext.CorrelationId); + var processingDefinition = new BulkDeleteDefinition( + JobType.BulkDeleteOrchestrator, + request.DeleteOperation, + request.ResourceType, + searchParameters, + _contextAccessor.RequestContext.Uri.ToString(), + _contextAccessor.RequestContext.BaseUri.ToString(), + _contextAccessor.RequestContext.CorrelationId, + versionType: request.IncludeSoftDeleted ? ResourceVersionType.SoftDeleted : ResourceVersionType.Latest); IReadOnlyList jobInfo = await _queueClient.EnqueueAsync(QueueType.BulkDelete, cancellationToken, definitions: processingDefinition); diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/Handlers/GetBulkDeleteHandler.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/Handlers/GetBulkDeleteHandler.cs index f9e136bd6a..8b8905750c 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/Handlers/GetBulkDeleteHandler.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/Handlers/GetBulkDeleteHandler.cs @@ -55,6 +55,7 @@ public async Task Handle(GetBulkDeleteRequest request, Ca var failed = false; var cancelled = false; var succeeded = true; + var addBadCountWarning = false; var resourcesDeleted = new Dictionary(); var issues = new List(); var failureResultCode = HttpStatusCode.OK; @@ -110,13 +111,34 @@ public async Task Handle(GetBulkDeleteRequest request, Ca if (job.GetJobTypeId() == (int)JobType.BulkDeleteProcessing && result != null) { + long jobTotal = 0; foreach (var key in result.ResourcesDeleted.Keys) { + jobTotal += result.ResourcesDeleted[key]; if (!resourcesDeleted.TryAdd(key, result.ResourcesDeleted[key])) { resourcesDeleted[key] += result.ResourcesDeleted[key]; } } + + if (job.Status == JobStatus.Completed) + { + var definition = job.DeserializeDefinition(); + if (jobTotal < definition.ExpectedResourceCount) + { + addBadCountWarning = true; + } + else if (jobTotal > definition.ExpectedResourceCount) + { + // I have no clue how this could happen and it imiplies more data was deleted than existed when the job started. + failed = true; + failureResultCode = HttpStatusCode.InternalServerError; + issues.Add(new OperationOutcomeIssue( + OperationOutcomeConstants.IssueSeverity.Error, + OperationOutcomeConstants.IssueType.Exception, + detailsText: "Count mismatch exception. More resources were deleted than existed at the start of the job run. Please review audit logs to check the number and ids of deleted resources.")); + } + } } } @@ -126,7 +148,7 @@ public async Task Handle(GetBulkDeleteRequest request, Ca { Tuple[] tuples = resourcesDeleted .Where(x => x.Value > 0) - .Select(x => Tuple.Create(x.Key, (DataType)new FhirDecimal(x.Value))) + .Select(x => Tuple.Create(x.Key, (DataType)new Integer64(x.Value))) .ToArray(); if (tuples.Any()) @@ -149,6 +171,14 @@ public async Task Handle(GetBulkDeleteRequest request, Ca } } + if (addBadCountWarning) + { + issues.Add(new OperationOutcomeIssue( + OperationOutcomeConstants.IssueSeverity.Warning, + OperationOutcomeConstants.IssueType.Informational, + detailsText: "There was a count mismatch when checking the job results. This could mean a job was restarted unexpetedly or resources were deleted by another process while the job was running. Please double check that all desired resources have been deleted. Audit logs can be referenced to get a list of the resources deleted during this operation.")); + } + if (failed && issues.Count > 0) { return new GetBulkDeleteResponse(fhirResults, issues, failureResultCode); @@ -168,7 +198,7 @@ public async Task Handle(GetBulkDeleteRequest request, Ca } else if (succeeded) { - return new GetBulkDeleteResponse(fhirResults, null, HttpStatusCode.OK); + return new GetBulkDeleteResponse(fhirResults, issues, HttpStatusCode.OK); } else { diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/Messages/CreateBulkDeleteRequest.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/Messages/CreateBulkDeleteRequest.cs index 9d0b6e603b..5bffd9511f 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/Messages/CreateBulkDeleteRequest.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/Messages/CreateBulkDeleteRequest.cs @@ -15,11 +15,13 @@ public class CreateBulkDeleteRequest : IRequest public CreateBulkDeleteRequest( DeleteOperation deleteOperation, string resourceType, - IList> conditionalParameters) + IList> conditionalParameters, + bool includeSoftDeleted) { DeleteOperation = deleteOperation; ResourceType = resourceType; ConditionalParameters = conditionalParameters; + IncludeSoftDeleted = includeSoftDeleted; } public DeleteOperation DeleteOperation { get; } @@ -27,5 +29,7 @@ public CreateBulkDeleteRequest( public string ResourceType { get; } public IList> ConditionalParameters { get; } + + public bool IncludeSoftDeleted { get; } } } diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/JobRecordProperties.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/JobRecordProperties.cs index 80b68036fd..10420d9f6e 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/JobRecordProperties.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/JobRecordProperties.cs @@ -172,5 +172,9 @@ public static class JobRecordProperties public const string BaseUrl = "baseUrl"; public const string ParentRequestId = "parentRequestId"; + + public const string ExpectedResourceCount = "expectedResourceCount"; + + public const string VersionType = "versionType"; } } diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/OperationsConstants.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/OperationsConstants.cs index 17a34268ed..af6b2d1858 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/OperationsConstants.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/OperationsConstants.cs @@ -44,5 +44,7 @@ public static class OperationsConstants public const string BulkDelete = "bulk-delete"; public const string ResourceTypeBulkDelete = "resource-type-bulk-delete"; + + public const string BulkDeleteSoftDeleted = "bulk-delete-soft-deleted"; } } diff --git a/src/Microsoft.Health.Fhir.Core/Features/Persistence/IDeletionService.cs b/src/Microsoft.Health.Fhir.Core/Features/Persistence/IDeletionService.cs index 6dce76d146..38d60ac85b 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Persistence/IDeletionService.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Persistence/IDeletionService.cs @@ -14,6 +14,6 @@ public interface IDeletionService { public Task DeleteAsync(DeleteResourceRequest request, CancellationToken cancellationToken); - public Task> DeleteMultipleAsync(ConditionalDeleteResourceRequest request, CancellationToken cancellationToken); + public Task DeleteMultipleAsync(ConditionalDeleteResourceRequest request, CancellationToken cancellationToken); } } diff --git a/src/Microsoft.Health.Fhir.Core/Features/Search/SearchService.cs b/src/Microsoft.Health.Fhir.Core/Features/Search/SearchService.cs index 8446d3310b..3848580cda 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Search/SearchService.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Search/SearchService.cs @@ -229,6 +229,27 @@ public async Task SearchForReindexAsync( throw new NotImplementedException(); } + /* + public virtual Task> GetResourceTimeRanges( + string resourceType, + DateTime start, + DateTime end, + int rangeSize, + IReadOnlyList> queryParameters, + CancellationToken cancellation) + { + // Remove _lastUpdated=gt and _lastUpdated=lt query parameters + + // Add gt and lt lastUpdated query paraemters + // Get count on the search + // If count = rangeSize+-10% add time range to list + // Else if count > rangeSize+10% remove half the previous add + // Else if count < rangeSize-10% add half the previous cut back + // Once till time > end time, till = end and make it the last entry in the list + // Return list + } + */ + public abstract Task> GetUsedResourceTypes(CancellationToken cancellationToken); /// @@ -240,5 +261,5 @@ protected abstract Task SearchForReindexInternalAsync( SearchOptions searchOptions, string searchParameterHash, CancellationToken cancellationToken); - } + } } diff --git a/src/Microsoft.Health.Fhir.Core/Messages/Delete/ConditionalDeleteResourceRequest.cs b/src/Microsoft.Health.Fhir.Core/Messages/Delete/ConditionalDeleteResourceRequest.cs index c9258d43ce..433147cf1d 100644 --- a/src/Microsoft.Health.Fhir.Core/Messages/Delete/ConditionalDeleteResourceRequest.cs +++ b/src/Microsoft.Health.Fhir.Core/Messages/Delete/ConditionalDeleteResourceRequest.cs @@ -7,6 +7,7 @@ using System.Collections.Generic; using EnsureThat; using Microsoft.Health.Fhir.Core.Features.Conformance; +using Microsoft.Health.Fhir.Core.Features.Search; using Microsoft.Health.Fhir.Core.Models; namespace Microsoft.Health.Fhir.Core.Messages.Delete @@ -21,7 +22,8 @@ public ConditionalDeleteResourceRequest( DeleteOperation deleteOperation, int? maxDeleteCount, BundleResourceContext bundleResourceContext = null, - bool deleteAll = false) + bool deleteAll = false, + ResourceVersionType versionType = ResourceVersionType.Latest) : base(resourceType, conditionalParameters, bundleResourceContext) { EnsureArg.IsNotNull(conditionalParameters, nameof(conditionalParameters)); @@ -29,6 +31,7 @@ public ConditionalDeleteResourceRequest( DeleteOperation = deleteOperation; MaxDeleteCount = maxDeleteCount; DeleteAll = deleteAll; + VersionType = versionType; } public DeleteOperation DeleteOperation { get; } @@ -37,6 +40,8 @@ public ConditionalDeleteResourceRequest( public bool DeleteAll { get; } + public ResourceVersionType VersionType { get; } + protected override IEnumerable GetCapabilities() => Capabilities; } } diff --git a/src/Microsoft.Health.Fhir.Core/Microsoft.Health.Fhir.Core.csproj b/src/Microsoft.Health.Fhir.Core/Microsoft.Health.Fhir.Core.csproj index a7d682d79e..d88b8ea3ee 100644 --- a/src/Microsoft.Health.Fhir.Core/Microsoft.Health.Fhir.Core.csproj +++ b/src/Microsoft.Health.Fhir.Core/Microsoft.Health.Fhir.Core.csproj @@ -1,7 +1,5 @@  - - @@ -28,6 +26,7 @@ + diff --git a/src/Microsoft.Health.Fhir.Shared.Api/Controllers/BulkDeleteController.cs b/src/Microsoft.Health.Fhir.Shared.Api/Controllers/BulkDeleteController.cs index 88b94327b8..33f3c3d09d 100644 --- a/src/Microsoft.Health.Fhir.Shared.Api/Controllers/BulkDeleteController.cs +++ b/src/Microsoft.Health.Fhir.Shared.Api/Controllers/BulkDeleteController.cs @@ -16,6 +16,7 @@ using Microsoft.Health.Fhir.Api.Features.Filters; using Microsoft.Health.Fhir.Api.Features.Headers; using Microsoft.Health.Fhir.Api.Features.Routing; +using Microsoft.Health.Fhir.Core.Exceptions; using Microsoft.Health.Fhir.Core.Features; using Microsoft.Health.Fhir.Core.Features.Conformance.Models; using Microsoft.Health.Fhir.Core.Features.Operations; @@ -55,7 +56,7 @@ public BulkDeleteController( [AuditEventType(AuditEventSubType.BulkDelete)] public async Task BulkDelete([FromQuery(Name = KnownQueryParameterNames.HardDelete)] bool hardDelete, [FromQuery(Name = KnownQueryParameterNames.PurgeHistory)] bool purgeHistory) { - return await SendDeleteRequest(null, hardDelete, purgeHistory); + return await SendDeleteRequest(null, hardDelete, purgeHistory, false); } [HttpDelete] @@ -64,7 +65,25 @@ public async Task BulkDelete([FromQuery(Name = KnownQueryParamete [AuditEventType(AuditEventSubType.BulkDelete)] public async Task BulkDeleteByResourceType(string typeParameter, [FromQuery(Name = KnownQueryParameterNames.HardDelete)] bool hardDelete, [FromQuery(Name = KnownQueryParameterNames.PurgeHistory)] bool purgeHistory) { - return await SendDeleteRequest(typeParameter, hardDelete, purgeHistory); + return await SendDeleteRequest(typeParameter, hardDelete, purgeHistory, false); + } + + [HttpDelete] + [Route(KnownRoutes.BulkDeleteSoftDeleted)] + [ServiceFilter(typeof(ValidateAsyncRequestFilterAttribute))] + [AuditEventType(AuditEventSubType.BulkDelete)] + public async Task BulkDeleteSoftDeleted([FromQuery(Name = KnownQueryParameterNames.PurgeHistory)] bool purgeHistory) + { + return await SendDeleteRequest(null, true, purgeHistory, true); + } + + [HttpDelete] + [Route(KnownRoutes.BulkDeleteSoftDeletedResourceType)] + [ServiceFilter(typeof(ValidateAsyncRequestFilterAttribute))] + [AuditEventType(AuditEventSubType.BulkDelete)] + public async Task BulkDeleteSoftDeletedByResourceType(string typeParameter, [FromQuery(Name = KnownQueryParameterNames.PurgeHistory)] bool purgeHistory) + { + return await SendDeleteRequest(typeParameter, true, purgeHistory, true); } [HttpGet] @@ -91,12 +110,17 @@ public async Task CancelBulkDelete(long idParameter) return new JobResult(result.StatusCode); } - private async Task SendDeleteRequest(string typeParameter, bool hardDelete, bool purgeHistory) + private async Task SendDeleteRequest(string typeParameter, bool hardDelete, bool purgeHistory, bool softDeleteCleanup) { IList> searchParameters = Request.GetQueriesForSearch().ToList(); searchParameters = searchParameters.Where(param => !_exlcudedParameters.Contains(param.Item1)).ToList(); + if (softDeleteCleanup && searchParameters.Any(param => param.Item1 != KnownQueryParameterNames.LastUpdated)) + { + throw new RequestNotValidException(string.Format(Resources.UnsupportedParameter, searchParameters.Where(param => param.Item1 != KnownQueryParameterNames.LastUpdated).Select(param => param.Item1).Aggregate((param, next) => param += ", " + next))); + } + DeleteOperation deleteOperation = (hardDelete, purgeHistory) switch { { hardDelete: true } => DeleteOperation.HardDelete, @@ -104,7 +128,7 @@ private async Task SendDeleteRequest(string typeParameter, bool h _ => DeleteOperation.SoftDelete, }; - CreateBulkDeleteResponse result = await _mediator.BulkDeleteAsync(deleteOperation, typeParameter, searchParameters, HttpContext.RequestAborted); + CreateBulkDeleteResponse result = await _mediator.BulkDeleteAsync(deleteOperation, typeParameter, searchParameters, softDeleteCleanup, HttpContext.RequestAborted); var response = JobResult.Accepted(); response.SetContentLocationHeader(_urlResolver, OperationsConstants.BulkDelete, result.Id.ToString()); diff --git a/src/Microsoft.Health.Fhir.Shared.Api/Controllers/OperationDefinitionController.cs b/src/Microsoft.Health.Fhir.Shared.Api/Controllers/OperationDefinitionController.cs index 61b5803fcb..23ceaf7a9f 100644 --- a/src/Microsoft.Health.Fhir.Shared.Api/Controllers/OperationDefinitionController.cs +++ b/src/Microsoft.Health.Fhir.Shared.Api/Controllers/OperationDefinitionController.cs @@ -132,6 +132,14 @@ public async Task BulkDeleteOperationDefinition() return await GetOperationDefinitionAsync(OperationsConstants.BulkDelete); } + [HttpGet] + [Route(KnownRoutes.BulkDeleteSoftDeletedOperationDefinition, Name = RouteNames.BulkDeleteSoftDeletedDefinition)] + [AllowAnonymous] + public async Task BulkDeleteSoftDeletedOperationDefinition() + { + return await GetOperationDefinitionAsync(OperationsConstants.BulkDeleteSoftDeleted); + } + [HttpGet] [Route(KnownRoutes.SearchParametersStatusQueryDefintion, Name = RouteNames.SearchParameterStatusOperationDefinition)] [AllowAnonymous] diff --git a/src/Microsoft.Health.Fhir.Shared.Api/Features/Resources/ProvenanceHeaderBehavior.cs b/src/Microsoft.Health.Fhir.Shared.Api/Features/Resources/ProvenanceHeaderBehavior.cs index 3a18ff8376..d9ae291bb3 100644 --- a/src/Microsoft.Health.Fhir.Shared.Api/Features/Resources/ProvenanceHeaderBehavior.cs +++ b/src/Microsoft.Health.Fhir.Shared.Api/Features/Resources/ProvenanceHeaderBehavior.cs @@ -10,6 +10,7 @@ using Hl7.Fhir.Serialization; using MediatR; using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.Primitives; using Microsoft.Health.Fhir.Api.Features.Exceptions; using Microsoft.Health.Fhir.Core.Extensions; using Microsoft.Health.Fhir.Core.Features; diff --git a/src/Microsoft.Health.Fhir.Shared.Core.UnitTests/Features/Resources/ResourceHandlerTests.cs b/src/Microsoft.Health.Fhir.Shared.Core.UnitTests/Features/Resources/ResourceHandlerTests.cs index 7daf3abaf4..82f4977675 100644 --- a/src/Microsoft.Health.Fhir.Shared.Core.UnitTests/Features/Resources/ResourceHandlerTests.cs +++ b/src/Microsoft.Health.Fhir.Shared.Core.UnitTests/Features/Resources/ResourceHandlerTests.cs @@ -4,6 +4,7 @@ // ------------------------------------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Linq; using System.Net.Http; using System.Threading; @@ -14,11 +15,13 @@ using MediatR; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Primitives; using Microsoft.Health.Core.Features.Context; using Microsoft.Health.Core.Features.Security.Authorization; using Microsoft.Health.Extensions.DependencyInjection; using Microsoft.Health.Fhir.Core.Exceptions; using Microsoft.Health.Fhir.Core.Extensions; +using Microsoft.Health.Fhir.Core.Features.Audit; using Microsoft.Health.Fhir.Core.Features.Conformance; using Microsoft.Health.Fhir.Core.Features.Context; using Microsoft.Health.Fhir.Core.Features.Persistence; @@ -95,7 +98,11 @@ public ResourceHandlerTests() observationResource.ConditionalDelete = CapabilityStatement.ConditionalDeleteStatus.Single; patientResource.Versioning = CapabilityStatement.ResourceVersionPolicy.VersionedUpdate; - _conformanceProvider.GetCapabilityStatementOnStartup().Returns(_conformanceStatement.ToTypedElement().ToResourceElement()); + _conformanceProvider.GetCapabilityStatementOnStartup(Arg.Any()).Returns((x) => + { + var typedElement = _conformanceStatement.ToTypedElement(); + return Task.FromResult(typedElement.ToResourceElement()); + }); var lazyConformanceProvider = new Lazy(() => _conformanceProvider); var collection = new ServiceCollection(); @@ -104,12 +111,15 @@ public ResourceHandlerTests() _authorizationService = Substitute.For>(); _authorizationService.CheckAccess(Arg.Any(), Arg.Any()).Returns(ci => ci.Arg()); - var contextAccessor = Substitute.For>(); - + var contextAccessor = Substitute.For(); + contextAccessor.RequestContext = new FhirRequestContext("method", "http://localhost", "http://localhost", "id", new Dictionary(), new Dictionary()); var referenceResolver = new ResourceReferenceResolver(_searchService, new TestQueryStringParser()); _resourceIdProvider = new ResourceIdProvider(); - var deleter = new DeletionService(_resourceWrapperFactory, lazyConformanceProvider, _fhirDataStore, _searchService, _resourceIdProvider); + var auditLogger = Substitute.For(); + var logger = Substitute.For>(); + + var deleter = new DeletionService(_resourceWrapperFactory, lazyConformanceProvider, _fhirDataStore, _searchService, _resourceIdProvider, contextAccessor, auditLogger, logger); var conditionalCreateLogger = Substitute.For>(); var conditionalUpsertLogger = Substitute.For>(); diff --git a/src/Microsoft.Health.Fhir.Shared.Core/Features/Resources/Delete/ConditionalDeleteResourceHandler.cs b/src/Microsoft.Health.Fhir.Shared.Core/Features/Resources/Delete/ConditionalDeleteResourceHandler.cs index 90305d0fcb..f177cc59f0 100644 --- a/src/Microsoft.Health.Fhir.Shared.Core/Features/Resources/Delete/ConditionalDeleteResourceHandler.cs +++ b/src/Microsoft.Health.Fhir.Shared.Core/Features/Resources/Delete/ConditionalDeleteResourceHandler.cs @@ -124,8 +124,8 @@ private async Task DeleteSingleAsync(ConditionalDeleteRe private async Task DeleteMultipleAsync(ConditionalDeleteResourceRequest request, CancellationToken cancellationToken) { - IReadOnlySet itemsDeleted = await _deleter.DeleteMultipleAsync(request, cancellationToken); - return new DeleteResourceResponse(itemsDeleted.Count); + long numDeleted = await _deleter.DeleteMultipleAsync(request, cancellationToken); + return new DeleteResourceResponse((int)numDeleted); } } } diff --git a/src/Microsoft.Health.Fhir.Shared.Core/Features/Resources/Delete/DeletionService.cs b/src/Microsoft.Health.Fhir.Shared.Core/Features/Resources/Delete/DeletionService.cs index 7a0a479ca6..8ee5bebaa1 100644 --- a/src/Microsoft.Health.Fhir.Shared.Core/Features/Resources/Delete/DeletionService.cs +++ b/src/Microsoft.Health.Fhir.Shared.Core/Features/Resources/Delete/DeletionService.cs @@ -6,7 +6,9 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; +using System.Net; using System.Security.Cryptography; using System.Threading; using System.Threading.Tasks; @@ -14,10 +16,15 @@ using Hl7.Fhir.ElementModel; using Hl7.Fhir.Model; using Hl7.Fhir.Serialization; +using Microsoft.Build.Framework; +using Microsoft.Extensions.Logging; using Microsoft.Health.Abstractions.Exceptions; +using Microsoft.Health.Core.Features.Audit; using Microsoft.Health.Fhir.Core.Exceptions; using Microsoft.Health.Fhir.Core.Extensions; +using Microsoft.Health.Fhir.Core.Features.Audit; using Microsoft.Health.Fhir.Core.Features.Conformance; +using Microsoft.Health.Fhir.Core.Features.Context; using Microsoft.Health.Fhir.Core.Features.Search; using Microsoft.Health.Fhir.Core.Messages.Delete; using Newtonsoft.Json.Linq; @@ -34,19 +41,31 @@ public class DeletionService : IDeletionService private readonly ISearchService _searchService; private readonly ResourceIdProvider _resourceIdProvider; private readonly AsyncRetryPolicy _retryPolicy; + private readonly FhirRequestContextAccessor _contextAccessor; + private readonly IAuditLogger _auditLogger; + private readonly ILogger _logger; + + internal const string DefaultCallerAgent = "Microsoft.Health.Fhir.Server"; + private const int MaxParallelThreads = 64; public DeletionService( IResourceWrapperFactory resourceWrapperFactory, Lazy conformanceProvider, IFhirDataStore fhirDataStore, ISearchService searchService, - ResourceIdProvider resourceIdProvider) + ResourceIdProvider resourceIdProvider, + FhirRequestContextAccessor contextAccessor, + IAuditLogger auditLogger, + ILogger logger) { _resourceWrapperFactory = EnsureArg.IsNotNull(resourceWrapperFactory, nameof(resourceWrapperFactory)); _conformanceProvider = EnsureArg.IsNotNull(conformanceProvider, nameof(conformanceProvider)); _fhirDataStore = EnsureArg.IsNotNull(fhirDataStore, nameof(fhirDataStore)); _searchService = EnsureArg.IsNotNull(searchService, nameof(searchService)); _resourceIdProvider = EnsureArg.IsNotNull(resourceIdProvider, nameof(resourceIdProvider)); + _contextAccessor = EnsureArg.IsNotNull(contextAccessor, nameof(contextAccessor)); + _auditLogger = EnsureArg.IsNotNull(auditLogger, nameof(auditLogger)); + _logger = EnsureArg.IsNotNull(logger, nameof(logger)); _retryPolicy = Policy .Handle() @@ -88,13 +107,29 @@ public async Task DeleteAsync(DeleteResourceRequest request, Cancel return new ResourceKey(key.ResourceType, key.Id, version); } - public async Task> DeleteMultipleAsync(ConditionalDeleteResourceRequest request, CancellationToken cancellationToken) + public async Task DeleteMultipleAsync(ConditionalDeleteResourceRequest request, CancellationToken cancellationToken) { EnsureArg.IsNotNull(request, nameof(request)); - (IReadOnlyCollection matchedResults, string ct) = await _searchService.ConditionalSearchAsync(request.ResourceType, request.ConditionalParameters, cancellationToken, request.MaxDeleteCount); + var stopwatch = new Stopwatch(); + stopwatch.Start(); + + var searchCount = 1000; + + (IReadOnlyCollection matchedResults, string ct) = await _searchService.ConditionalSearchAsync( + request.ResourceType, + request.ConditionalParameters, + cancellationToken, + request.DeleteAll ? searchCount : request.MaxDeleteCount, + versionType: request.VersionType); + + long numDeleted = 0; + long numQueuedForDeletion = 0; + + var initialSearchTime = stopwatch.Elapsed.TotalMilliseconds; - var itemsDeleted = new HashSet(); + var deleteTasks = new List>(); + using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); // Delete the matched results... try @@ -102,66 +137,41 @@ public async Task> DeleteMultipleAsync(ConditionalDeleteRes while (matchedResults.Any() || !string.IsNullOrEmpty(ct)) { IReadOnlyCollection resultsToDelete = - request.DeleteAll ? matchedResults : matchedResults - .Take(Math.Max(request.MaxDeleteCount.GetValueOrDefault() - itemsDeleted.Count, 0)) - .ToArray(); + request.DeleteAll ? matchedResults : matchedResults.Take(Math.Max((int)(request.MaxDeleteCount.GetValueOrDefault() - numQueuedForDeletion), 0)).ToArray(); + + numQueuedForDeletion += resultsToDelete.Count; if (request.DeleteOperation == DeleteOperation.SoftDelete) { - bool keepHistory = await _conformanceProvider.Value.CanKeepHistory(request.ResourceType, cancellationToken); - ResourceWrapperOperation[] softDeletes = resultsToDelete.Select(item => - { - ResourceWrapper deletedWrapper = CreateSoftDeletedWrapper(request.ResourceType, item.Resource.ResourceId); - return new ResourceWrapperOperation(deletedWrapper, true, keepHistory, null, false, false, bundleResourceContext: request.BundleResourceContext); - }).ToArray(); + deleteTasks.Add(SoftDeleteResourcePage(request, resultsToDelete, cancellationTokenSource.Token)); + } + else + { + deleteTasks.Add(HardDeleteResourcePage(request, resultsToDelete, cancellationTokenSource.Token)); + } - try - { - await _fhirDataStore.MergeAsync(softDeletes, cancellationToken); - } - catch (IncompleteOperationException> ex) - { - foreach (string id in ex.PartialResults.Select(item => item.Key.Id)) - { - itemsDeleted.Add(id); - } + if (deleteTasks.Any((task) => task.IsFaulted || task.IsCanceled)) + { + break; + } - throw; - } + deleteTasks.Where((task) => task.IsCompletedSuccessfully).ToList().ForEach((Task result) => numDeleted += result.Result); + deleteTasks = deleteTasks.Where((task) => !task.IsCompletedSuccessfully).ToList(); - foreach (string id in itemsDeleted.Concat(resultsToDelete.Select(item => item.Resource.ResourceId))) - { - itemsDeleted.Add(id); - } - } - else + if (deleteTasks.Count >= MaxParallelThreads) { - var parallelBag = new ConcurrentBag(); - try - { - await Parallel.ForEachAsync(resultsToDelete, cancellationToken, async (item, innerCt) => - { - await _retryPolicy.ExecuteAsync(async () => await _fhirDataStore.HardDeleteAsync(new ResourceKey(item.Resource.ResourceTypeName, item.Resource.ResourceId), request.DeleteOperation == DeleteOperation.PurgeHistory, innerCt)); - parallelBag.Add(item.Resource.ResourceId); - }); - } - finally - { - foreach (string item in parallelBag) - { - itemsDeleted.Add(item); - } - } + await deleteTasks[0]; } - if (!string.IsNullOrEmpty(ct) && (request.MaxDeleteCount - itemsDeleted.Count > 0 || request.DeleteAll)) + if (!string.IsNullOrEmpty(ct) && (request.DeleteAll || (int)(request.MaxDeleteCount - numQueuedForDeletion) > 0)) { (matchedResults, ct) = await _searchService.ConditionalSearchAsync( request.ResourceType, request.ConditionalParameters, cancellationToken, - request.DeleteAll ? null : request.MaxDeleteCount - itemsDeleted.Count, - ct); + request.DeleteAll ? searchCount : (int)(request.MaxDeleteCount - numQueuedForDeletion), + ct, + request.VersionType); } else { @@ -171,10 +181,87 @@ await Parallel.ForEachAsync(resultsToDelete, cancellationToken, async (item, inn } catch (Exception ex) { - throw new IncompleteOperationException>(ex, itemsDeleted); + _logger.LogError(ex, "Error deleting"); + await cancellationTokenSource.CancelAsync(); } - return itemsDeleted; + System.Threading.Tasks.Task.WaitAll(deleteTasks.ToArray(), cancellationToken); + deleteTasks.Where((task) => task.IsCompletedSuccessfully).ToList().ForEach((Task result) => numDeleted += result.Result); + + if (deleteTasks.Any((task) => task.IsFaulted || task.IsCanceled)) + { + var exceptions = new List(); + deleteTasks.Where((task) => task.IsFaulted || task.IsCanceled).ToList().ForEach((Task result) => + { + if (result.Exception != null) + { + result.Exception.InnerExceptions.Where((ex) => ex is IncompleteOperationException).ToList().ForEach((ex) => numDeleted += ((IncompleteOperationException)ex).PartialResults); + if (result.IsFaulted) + { + exceptions.AddRange(result.Exception.InnerExceptions); + } + } + }); + var aggrigateException = new AggregateException(exceptions); + throw new IncompleteOperationException(aggrigateException, numDeleted); + } + + return numDeleted; + } + + private async Task SoftDeleteResourcePage(ConditionalDeleteResourceRequest request, IReadOnlyCollection resourcesToDelete, CancellationToken cancellationToken) + { + await CreateAuditLog(request.ResourceType, request.DeleteOperation, false, resourcesToDelete.Select((item) => item.Resource.ResourceId)); + + bool keepHistory = await _conformanceProvider.Value.CanKeepHistory(request.ResourceType, cancellationToken); + ResourceWrapperOperation[] softDeletes = resourcesToDelete.Select(item => + { + ResourceWrapper deletedWrapper = CreateSoftDeletedWrapper(request.ResourceType, item.Resource.ResourceId); + return new ResourceWrapperOperation(deletedWrapper, true, keepHistory, null, false, false, bundleResourceContext: request.BundleResourceContext); + }).ToArray(); + + try + { + await _fhirDataStore.MergeAsync(softDeletes, cancellationToken); + } + catch (IncompleteOperationException> ex) + { + _logger.LogError(ex.InnerException, "Error soft deleting"); + + var ids = ex.PartialResults.Select(item => item.Key.Id); + await CreateAuditLog(request.ResourceType, request.DeleteOperation, true, ids); + + throw; + } + + await CreateAuditLog(request.ResourceType, request.DeleteOperation, true, resourcesToDelete.Select((item) => item.Resource.ResourceId)); + + return resourcesToDelete.Count; + } + + private async Task HardDeleteResourcePage(ConditionalDeleteResourceRequest request, IReadOnlyCollection resourcesToDelete, CancellationToken cancellationToken) + { + await CreateAuditLog(request.ResourceType, request.DeleteOperation, false, resourcesToDelete.Select((item) => item.Resource.ResourceId)); + + var parallelBag = new ConcurrentBag(); + try + { + // This throws AggrigateExceptions + await Parallel.ForEachAsync(resourcesToDelete, cancellationToken, async (item, innerCt) => + { + await _retryPolicy.ExecuteAsync(async () => await _fhirDataStore.HardDeleteAsync(new ResourceKey(item.Resource.ResourceTypeName, item.Resource.ResourceId), request.DeleteOperation == DeleteOperation.PurgeHistory, innerCt)); + parallelBag.Add(item.Resource.ResourceId); + }); + } + catch (Exception ex) + { + await CreateAuditLog(request.ResourceType, request.DeleteOperation, true, parallelBag); + throw new IncompleteOperationException(ex, parallelBag.Count); + } + + await CreateAuditLog(request.ResourceType, request.DeleteOperation, true, parallelBag); + + return parallelBag.Count; } private ResourceWrapper CreateSoftDeletedWrapper(string resourceType, string resourceId) @@ -193,5 +280,37 @@ private ResourceWrapper CreateSoftDeletedWrapper(string resourceType, string res ResourceWrapper deletedWrapper = _resourceWrapperFactory.CreateResourceWrapper(emptySourceNode.ToPoco(), _resourceIdProvider, deleted: true, keepMeta: false); return deletedWrapper; } + + private System.Threading.Tasks.Task CreateAuditLog(string resourceType, DeleteOperation operation, bool complete, IEnumerable items, HttpStatusCode statusCode = HttpStatusCode.OK) + { + var auditTask = System.Threading.Tasks.Task.Run(() => + { + AuditAction action = complete ? AuditAction.Executed : AuditAction.Executing; + var context = _contextAccessor.RequestContext; + var deleteAdditionalProperties = new Dictionary(); + deleteAdditionalProperties["Affected Items"] = items.Aggregate( + (aggregate, item) => + { + aggregate += ", " + item; + return aggregate; + }); + + _auditLogger.LogAudit( + auditAction: action, + operation: operation.ToString(), + resourceType: resourceType, + requestUri: context.Uri, + statusCode: statusCode, + correlationId: context.CorrelationId, + callerIpAddress: string.Empty, + callerClaims: null, + customHeaders: null, + operationType: string.Empty, + callerAgent: DefaultCallerAgent, + additionalProperties: deleteAdditionalProperties); + }); + + return auditTask; + } } } diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Search/SqlCommandSimplifier.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Search/SqlCommandSimplifier.cs index deead819d2..eee9bd72a6 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Search/SqlCommandSimplifier.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Search/SqlCommandSimplifier.cs @@ -7,6 +7,7 @@ using System.Collections.Generic; using System.Text.RegularExpressions; using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Azure; using Microsoft.Extensions.Logging; using Microsoft.Health.SqlServer; diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/JobInfoExtensions.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/JobInfoExtensions.cs index e4f0e888b9..653d115759 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/JobInfoExtensions.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/JobInfoExtensions.cs @@ -57,7 +57,7 @@ public static JobInfo LoadJobInfo(SqlDataReader sqlDataReader) DateTime createDate = sqlDataReader.Read(jobQueueTable.CreateDate, 8); object startDateObj = sqlDataReader.GetValue(9); DateTime? startDate = startDateObj is DBNull ? null : (DateTime)startDateObj; - object endDateObj = sqlDataReader.GetValue(9); + object endDateObj = sqlDataReader.GetValue(10); DateTime? endDate = endDateObj is DBNull ? null : (DateTime)endDateObj; DateTime heartbeatDate = sqlDataReader.Read(jobQueueTable.HeartbeatDate, 11); bool cancelRequested = sqlDataReader.Read(jobQueueTable.CancelRequested, 12); diff --git a/src/Microsoft.Health.TaskManagement/JobInfoExtensions.cs b/src/Microsoft.Health.TaskManagement/JobInfoExtensions.cs index 95491bddea..7aadb947c3 100644 --- a/src/Microsoft.Health.TaskManagement/JobInfoExtensions.cs +++ b/src/Microsoft.Health.TaskManagement/JobInfoExtensions.cs @@ -34,9 +34,9 @@ public static T DeserializeDefinition(this JobInfo jobInfo) { EnsureArg.IsNotNull(jobInfo, nameof(jobInfo)); - if (jobInfo.Definition == null) + if (jobInfo.Definition == null || jobInfo.Definition.Equals("null", StringComparison.OrdinalIgnoreCase)) { - return default; + throw new ArgumentNullException(nameof(jobInfo)); } return JsonConvert.DeserializeObject(jobInfo.Definition); diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.E2E/Rest/BulkDeleteTests.cs b/test/Microsoft.Health.Fhir.Shared.Tests.E2E/Rest/BulkDeleteTests.cs index e64d883c05..f3c208104f 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.E2E/Rest/BulkDeleteTests.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.E2E/Rest/BulkDeleteTests.cs @@ -43,7 +43,7 @@ public async Task GivenVariousResourcesOfDifferentTypes_WhenBulkDeleted_ThenAllA { CheckBulkDeleteEnabled(); - var resourceTypes = new Dictionary + var resourceTypes = new Dictionary { { "Patient", 2 }, { "Location", 1 }, @@ -60,7 +60,7 @@ public async Task GivenResourcesOfOneType_WhenBulkDeletedByType_ThenAllOfThatTyp { CheckBulkDeleteEnabled(); - var resourceTypes = new Dictionary() + var resourceTypes = new Dictionary() { { resourceType, 4 }, }; @@ -89,7 +89,7 @@ public async Task GivenSoftBulkDeleteRequest_WhenCompleted_ThenHistoricalRecords { CheckBulkDeleteEnabled(); - var resourceTypes = new Dictionary() + var resourceTypes = new Dictionary() { { "Patient", 1 }, }; @@ -114,7 +114,7 @@ public async Task GivenHardBulkDeleteRequest_WhenCompleted_ThenHistoricalRecords { CheckBulkDeleteEnabled(); - var resourceTypes = new Dictionary() + var resourceTypes = new Dictionary() { { "Patient", 1 }, }; @@ -143,7 +143,7 @@ public async Task GivenPurgeBulkDeleteRequest_WhenCompleted_ThenHistoricalRecord { CheckBulkDeleteEnabled(); - var resourceTypes = new Dictionary() + var resourceTypes = new Dictionary() { { "Patient", 1 }, }; @@ -176,7 +176,7 @@ public async Task GivenPurgeBulkDeleteRequest_WhenCompleted_ThenHistoricalRecord } private async Task RunBulkDeleteRequest( - Dictionary expectedResults, + Dictionary expectedResults, bool addUndeletedResource = false, string path = "$bulk-delete", Dictionary queryParams = null) @@ -189,7 +189,7 @@ private async Task RunBulkDeleteRequest( string tag = Guid.NewGuid().ToString(); foreach (var key in expectedResults.Keys) { - await _fhirClient.CreateResourcesAsync(ModelInfoProvider.GetTypeForFhirType(key), expectedResults[key], tag); + await _fhirClient.CreateResourcesAsync(ModelInfoProvider.GetTypeForFhirType(key), (int)expectedResults[key], tag); } using HttpRequestMessage request = GenerateBulkDeleteRequest(tag, path, queryParams); @@ -231,7 +231,7 @@ private HttpRequestMessage GenerateBulkDeleteRequest( return request; } - private async Task MonitorBulkDeleteJob(Uri location, Dictionary expectedResults) + private async Task MonitorBulkDeleteJob(Uri location, Dictionary expectedResults) { var result = (await _fhirClient.WaitForBulkDeleteStatus(location)).Resource; @@ -248,7 +248,7 @@ private async Task MonitorBulkDeleteJob(Uri location, Dictionary ex foreach (var part in parameter.Part) { var resourceName = part.Name; - var numberDeleted = (int)((FhirDecimal)part.Value).Value; + var numberDeleted = (long)((Integer64)part.Value).Value; Assert.Equal(expectedResults[resourceName], numberDeleted); resultsChecked++; diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/FhirStorageTestsFixture.cs b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/FhirStorageTestsFixture.cs index a5e23d14d6..b621639c6c 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/FhirStorageTestsFixture.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/FhirStorageTestsFixture.cs @@ -15,12 +15,14 @@ using Microsoft.AspNetCore.Mvc.Infrastructure; using Microsoft.AspNetCore.Mvc.Routing; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Health.Abstractions.Features.Transactions; using Microsoft.Health.Core.Features.Context; using Microsoft.Health.Fhir.Api.Features.Bundle; using Microsoft.Health.Fhir.Api.Features.Routing; using Microsoft.Health.Fhir.Core.Extensions; +using Microsoft.Health.Fhir.Core.Features.Audit; using Microsoft.Health.Fhir.Core.Features.Conformance; using Microsoft.Health.Fhir.Core.Features.Context; using Microsoft.Health.Fhir.Core.Features.Definition; @@ -182,7 +184,10 @@ public async Task InitializeAsync() GetResourceHandler = new GetResourceHandler(DataStore, new Lazy(() => ConformanceProvider), resourceWrapperFactory, _resourceIdProvider, _dataResourceFilter, DisabledFhirAuthorizationService.Instance, FhirRequestContextAccessor, SearchService); - var deleter = new DeletionService(resourceWrapperFactory, new Lazy(() => ConformanceProvider), DataStore, SearchService, _resourceIdProvider); + var auditLogger = Substitute.For(); + var logger = Substitute.For>(); + + var deleter = new DeletionService(resourceWrapperFactory, new Lazy(() => ConformanceProvider), DataStore, SearchService, _resourceIdProvider, new FhirRequestContextAccessor(), auditLogger, logger); var collection = new ServiceCollection(); diff --git a/tools/PowerShell/MonitorAsyncJobs.ps1 b/tools/PowerShell/MonitorAsyncJobs.ps1 new file mode 100644 index 0000000000..f98622e3d4 --- /dev/null +++ b/tools/PowerShell/MonitorAsyncJobs.ps1 @@ -0,0 +1,76 @@ +function RunJobMonitor($SQLServer, $SQLDBName, $uid, $pwd, $QueueType, $GroupId) +{ + while($true) + { + CheckAsyncJob -SqlServer $SQLServer -SQLDBName $SQLDBName -uid $uid -pwd $pwd -QueueType $QueueType -GroupId $GroupId + Start-Sleep -Second 300 + } +} + +function CheckAsyncJob($SQLServer, $SQLDBName, $uid, $pwd, $QueueType, $GroupId) +{ + $SqlQuery = "SELECT JobId, Status, CreateDate, StartDate, EndDate, HeartbeatDate from JobQueue where QueueType = $QueueType and GroupId = $GroupId;"; + $SqlConnection = New-Object System.Data.SqlClient.SqlConnection; + $SqlConnection.ConnectionString = "Server = $SQLServer; Database = $SQLDBName; Integrated Security = False; User ID = $uid; Password = $pwd;"; + $SqlCmd = New-Object System.Data.SqlClient.SqlCommand; + $SqlCmd.CommandText = $SqlQuery; + $SqlCmd.Connection = $SqlConnection; + $SqlAdapter = New-Object System.Data.SqlClient.SqlDataAdapter; + $SqlAdapter.SelectCommand = $SqlCmd; + $DataSet = New-Object System.Data.DataSet; + $SqlAdapter.Fill($DataSet); + + $createTime = $DataSet.Tables[0].Rows[0].CreateDate + $DataSet.Tables[0].Rows | Foreach-Object { + if($createTime -ge $_.CreateDate) { + $createTime = $_.CreateDate + } + } + + $endTime = $createTime + $DataSet.Tables[0].Rows | Foreach-Object { + if($_.Status -ne 0) { + if($endTime -le $_.HeartbeatDate) { + $endTime = $_.HeartbeatDate + } + } + } + + $queued = 0 + $running = 0 + $succeeded = 0 + $failed = 0 + $cancelled = 0 + $DataSet.Tables[0].Rows | Foreach-Object { + switch($_.Status) { + 0 { $queued++ } + 1 { $running++ } + 2 { $succeeded++ } + 3 { $failed++ } + 4 { $cancelled++ } + } + } + + $total = $DataSet.Tables[0].Rows.Count + $finished = $succeeded + $failed + $cancelled + + $runTime = $endTime-$createTime + $runSeconds = $runTime.TotalSeconds + + $estimatedTotalRunSeconds = $runSeconds*$total/$finished + + Write-Host "" + Write-Host "Progress: `t$finished/$total jobs finished" + Write-Host "" + Write-Host "Queued: `t$queued" + Write-Host "Running: `t$running" + Write-Host "Succeeded: `t$succeeded" + Write-Host "Failed: `t$failed" + Write-Host "Cancelled: `t$cancelled" + Write-Host "" + Write-Host "Create Time: `t$createTime" + Write-Host "End Time: `t$endTime" + Write-Host "" + Write-Host "Run Duration (sec): `t`t$runSeconds" + Write-Host "Estimated Duration (sec): `t$estimatedTotalRunSeconds" +} \ No newline at end of file diff --git a/tools/RegisterAndMonitorImport/RegisterAndMonitorConfiguration.cs b/tools/RegisterAndMonitorImport/RegisterAndMonitorConfiguration.cs index 3fdfcccd03..4bde65d257 100644 --- a/tools/RegisterAndMonitorImport/RegisterAndMonitorConfiguration.cs +++ b/tools/RegisterAndMonitorImport/RegisterAndMonitorConfiguration.cs @@ -73,6 +73,17 @@ internal sealed class RegisterAndMonitorConfiguration /// public string TokenClientSecret { get; set; } + /// + /// Sets the amount of information outputted. + /// Possible values: + /// Full: Prints a list of all finished jobs with every poll (default) + /// FullOnComplete: Prints a list of all finished jobs when all the jobs complete + /// Error: Prints a list of jobs that errored with every poll + /// ErrorOnComplete: Prints a list of all jobs that errored when all the jobs complete + /// Minimal: Only prints summary information + /// + public VerbosityLevel ReportVerbosity { get; set; } = VerbosityLevel.Full; + public bool IsMonitorImportStatusEndpoint => !string.IsNullOrWhiteSpace(MonitorImportStatusEndpoint); } } diff --git a/tools/RegisterAndMonitorImport/RegisterAndMonitorImport.cs b/tools/RegisterAndMonitorImport/RegisterAndMonitorImport.cs index 258c1e7ad8..98d8817586 100644 --- a/tools/RegisterAndMonitorImport/RegisterAndMonitorImport.cs +++ b/tools/RegisterAndMonitorImport/RegisterAndMonitorImport.cs @@ -172,7 +172,7 @@ private async Task RunImport() } var sb = new StringBuilder(); - sb.Append("{\"resourceType\": \"Parameters\", \"parameter\": [{\"name\": \"inputFormat\",\"valueString\": \"application/fhir+ndjson\"},{\"name\": \"mode\",\"valueString\": \"InitialLoad\"},"); + sb.Append("{\"resourceType\": \"Parameters\", \"parameter\": [{\"name\": \"inputFormat\",\"valueString\": \"application/fhir+ndjson\"},{\"name\": \"mode\",\"valueString\": \"IncrementalLoad\"},"); var size = 0L; @@ -258,7 +258,7 @@ private async Task GetImportStatus(string url) Console.WriteLine($"StatusCode = {response.StatusCode}"); ImportResponse importJson = TryParseJson(content); - PrintImportResponse(importJson); + PrintImportResponse(importJson, monitorConfiguration, response.StatusCode); bool addedUrl = SaveImportedUrl(importJson); Console.WriteLine($"{(addedUrl ? "Completed" : "Processing")} file elapsed time: {swSingleTime.Elapsed.Duration()}"); @@ -295,6 +295,8 @@ private static bool SaveImportedUrl(ImportResponse response) bool addedUrl = false; if (response != null && s_blobContainerClientSource != null) { + // This doesn't work now that single blobs are being split into multiple jobs. + // in case there are only error conditions and no success in the output foreach (ImportResponse.Json r in response.Error) { @@ -316,7 +318,7 @@ private static bool SaveImportedUrl(ImportResponse response) return addedUrl; } - private static void PrintImportResponse(ImportResponse response) + private static void PrintImportResponse(ImportResponse response, RegisterAndMonitorConfiguration configuration, HttpStatusCode statusCode) { if (response != null) { @@ -328,14 +330,26 @@ private static void PrintImportResponse(ImportResponse response) return; } - PrintResponse(response.Output); - if (response.Error.Count > 0) + if (configuration.ReportVerbosity == VerbosityLevel.Full + || (configuration.ReportVerbosity == VerbosityLevel.FullOnComplete && statusCode != HttpStatusCode.Accepted)) + { + PrintResponse(response.Output); + } + + if (response.Error.Count > 0 + && (configuration.ReportVerbosity == VerbosityLevel.Full + || configuration.ReportVerbosity == VerbosityLevel.Error + || ((configuration.ReportVerbosity == VerbosityLevel.FullOnComplete || configuration.ReportVerbosity == VerbosityLevel.ErrorOnComplete) + && statusCode != HttpStatusCode.Accepted))) { Console.BackgroundColor = ConsoleColor.Black; Console.ForegroundColor = ConsoleColor.Red; PrintResponse(response.Error); Console.ResetColor(); } + + Console.WriteLine($"{response.Output.Count} jobs finished successfully"); + Console.WriteLine($"{response.Error.Count} jobs finished with errors"); } } diff --git a/tools/RegisterAndMonitorImport/VerbosityLevel.cs b/tools/RegisterAndMonitorImport/VerbosityLevel.cs new file mode 100644 index 0000000000..a187a64d75 --- /dev/null +++ b/tools/RegisterAndMonitorImport/VerbosityLevel.cs @@ -0,0 +1,17 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +namespace Microsoft.Health.Internal.Fhir.RegisterAndMonitorImport +{ + public enum VerbosityLevel + { + Full = 5, + FullOnComplete = 4, + Error = 3, + ErrorOnComplete = 2, + Minimal = 1, + None = 0, + } +} diff --git a/tools/RegisterAndMonitorImport/readme.md b/tools/RegisterAndMonitorImport/readme.md index b4d7919ec3..a160a54981 100644 --- a/tools/RegisterAndMonitorImport/readme.md +++ b/tools/RegisterAndMonitorImport/readme.md @@ -14,8 +14,7 @@ files that were already imported and saved to disk. The app.config has descriptions for all of the necessary values that you will need to supply in order to run this tool. When running FHIR OSS be sure to have these configuration settings in the portal: -FhirServer__Operations__Import__Enabled = True -FhirServer__Operations__Import__InitialImportMode = True +FhirServer__Operations__Import__Enabled = True FhirServer__Operations__IntegrationDataStore__StorageAccountConnection = your_storageaccount_access_key_connection_string TaskHosting__MaxRunningTaskCount = some_value_greater_than_1