Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import bundle #3720

Open
wants to merge 55 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
3f48846
Import bundle with parameters
SergeyGaluzo Feb 18, 2024
eca3e1c
string
SergeyGaluzo Feb 19, 2024
b5cab8c
resource wrappers
SergeyGaluzo Feb 19, 2024
2905f03
2 test methods
SergeyGaluzo Feb 19, 2024
b929c8f
return bad request on parsing error
SergeyGaluzo Feb 19, 2024
1a14aed
Merge branch 'main' into users/sergal/importbundles
SergeyGaluzo Feb 19, 2024
044bb0e
trueEx
SergeyGaluzo Feb 19, 2024
4664a38
Report bad request on dups
SergeyGaluzo Feb 20, 2024
128915c
support for new bundle import endpoint
SergeyGaluzo Feb 20, 2024
b9e6b88
dedup output
SergeyGaluzo Feb 20, 2024
ebe7476
Passng loaded resources
SergeyGaluzo Feb 20, 2024
aaeed17
test name
SergeyGaluzo Feb 20, 2024
0f01ea8
import resources and bundle json
SergeyGaluzo Feb 21, 2024
58b7c22
changed endpoint
SergeyGaluzo Feb 21, 2024
4cb7e29
concat in loop
SergeyGaluzo Feb 21, 2024
b13d928
Added auth to handler
SergeyGaluzo Feb 21, 2024
4995cb7
Write
SergeyGaluzo Feb 21, 2024
28c7213
+ in importer
SergeyGaluzo Feb 21, 2024
36a7b5b
Check for PUT
SergeyGaluzo Feb 21, 2024
a035759
nice
SergeyGaluzo Feb 21, 2024
9aa05a4
messages
SergeyGaluzo Feb 22, 2024
8c8d701
comments
SergeyGaluzo Feb 24, 2024
5bed320
FormatException
SergeyGaluzo Feb 24, 2024
0e66721
Diag option
SergeyGaluzo Feb 25, 2024
1086289
Removed not needed bundle components
SergeyGaluzo Feb 26, 2024
0306d20
max byres
SergeyGaluzo Feb 26, 2024
d385d89
Test for invalid resource
SergeyGaluzo Feb 26, 2024
a17dec7
Dressing
SergeyGaluzo Feb 26, 2024
c0b5e1e
Errors
SergeyGaluzo Feb 26, 2024
6d3c2b5
list of diag searches
SergeyGaluzo Feb 27, 2024
2dab61d
var assignment
SergeyGaluzo Feb 27, 2024
7e3ab6c
Diag retries
SergeyGaluzo Feb 27, 2024
5cb7cfc
recording current writers
SergeyGaluzo Feb 28, 2024
3e5188c
Next iteration
SergeyGaluzo Mar 2, 2024
c95b0d8
Formal import bundle
SergeyGaluzo Mar 3, 2024
44b58b2
use default input format
SergeyGaluzo Mar 3, 2024
cdd2a8b
var
SergeyGaluzo Mar 3, 2024
def486a
Merge branch 'main' into users/sergal/importbundles
SergeyGaluzo Mar 7, 2024
c5240ed
comment
SergeyGaluzo Mar 7, 2024
eac79c4
batch
SergeyGaluzo Mar 9, 2024
5ac1c48
OK
SergeyGaluzo Mar 9, 2024
7098f9c
test
SergeyGaluzo Mar 9, 2024
8689a28
error example
SergeyGaluzo Mar 11, 2024
2b86a34
test fix
SergeyGaluzo Mar 12, 2024
cc52346
Merge branch 'main' into users/sergal/importbundles
SergeyGaluzo Mar 14, 2024
8c1a111
start sw after skip
SergeyGaluzo Mar 15, 2024
fdaccb7
Adjust importer to work with profile
SergeyGaluzo Mar 15, 2024
e5859d0
clarification
SergeyGaluzo Mar 15, 2024
9a0cd3a
better
SergeyGaluzo Mar 15, 2024
901ad8b
switch to import resources
SergeyGaluzo Mar 17, 2024
d238574
more tests
SergeyGaluzo Mar 18, 2024
1c789f4
removed keys
SergeyGaluzo Mar 18, 2024
12429af
simpler
SergeyGaluzo Mar 18, 2024
4bb912c
correct ResetCount()
SergeyGaluzo Mar 18, 2024
a975aad
Added soft delete import test
SergeyGaluzo Mar 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System.Net;
using EnsureThat;
using Microsoft.Health.Fhir.Core.Features.Operations.Import;

namespace Microsoft.Health.Fhir.Api.Features.ActionResults
{
/// <summary>
/// Used to return the result of a import bundle operation.
/// </summary>
public class ImportBundleResult : ResourceActionResult<ImportJobResult>
{
public ImportBundleResult(int count, HttpStatusCode statusCode)
: base(null, statusCode)
{
LoadedResources = count;
}

public int LoadedResources { get; }
}
}
2 changes: 2 additions & 0 deletions src/Microsoft.Health.Fhir.Api/Features/Routing/KnownRoutes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ internal class KnownRoutes
public const string ImportDataOperationDefinition = OperationDefinition + "/" + OperationsConstants.Import;
public const string ImportJobLocation = OperationsConstants.Operations + "/" + OperationsConstants.Import + "/" + IdRouteSegment;

public const string ImportBundle = "$import/Bundle";

public const string CompartmentTypeByResourceType = CompartmentTypeRouteSegment + "/" + IdRouteSegment + "/" + CompartmentResourceTypeRouteSegment;

public const string WellKnown = ".well-known";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ namespace Microsoft.Health.Fhir.Core.Extensions
{
public static class ImportMediatorExtensions
{
public static async Task<ImportBundleResponse> ImportBundleAsync(
this IMediator mediator,
IList<ImportResource> resources,
CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(mediator, nameof(mediator));
var request = new ImportBundleRequest(resources);
var response = await mediator.Send(request, cancellationToken);
return response;
}

public static async Task<CreateImportResponse> ImportAsync(
this IMediator mediator,
Uri requestUri,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using Microsoft.Health.Fhir.Core.Features.Metrics;
using Microsoft.Health.Fhir.ValueSets;

namespace Microsoft.Health.Fhir.Core.Features.Operations.Import
{
public class ImportBundleMetricsNotification : IMetricsNotification
{
public ImportBundleMetricsNotification(
DateTimeOffset startTime,
DateTimeOffset endTime,
long succeededCount)
{
FhirOperation = AuditEventSubType.ImportBundle;
ResourceType = null;

StartTime = startTime;
EndTime = endTime;
SucceededCount = succeededCount;
}

public string FhirOperation { get; }

public string ResourceType { get; }

public DateTimeOffset StartTime { get; }

public DateTimeOffset EndTime { get; }

public long SucceededCount { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EnsureThat;
using Hl7.Fhir.Serialization;
using MediatR;
using Microsoft.Extensions.Logging;
using Microsoft.Health.Core.Features.Security.Authorization;
using Microsoft.Health.Fhir.Core.Exceptions;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Security;
using Microsoft.Health.Fhir.Core.Messages.Import;
using Microsoft.Health.JobManagement;

namespace Microsoft.Health.Fhir.Core.Features.Operations.Import
{
/// <summary>
/// MediatR request handler. Called when the ImportController processes ImportBundleRequest.
/// </summary>
public class ImportBundleRequestHandler : IRequestHandler<ImportBundleRequest, ImportBundleResponse>
{
private readonly IFhirDataStore _store;
private readonly ILogger<ImportBundleRequestHandler> _logger;
private readonly IAuthorizationService<DataActions> _authorizationService;

public ImportBundleRequestHandler(
IFhirDataStore store,
ILogger<ImportBundleRequestHandler> logger,
IAuthorizationService<DataActions> authorizationService)
{
_store = EnsureArg.IsNotNull(store, nameof(store));
_logger = EnsureArg.IsNotNull(logger, nameof(logger));
_authorizationService = EnsureArg.IsNotNull(authorizationService, nameof(authorizationService));
}

public async Task<ImportBundleResponse> Handle(ImportBundleRequest request, CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(request, nameof(request));
if (await _authorizationService.CheckAccess(DataActions.Write, cancellationToken) != DataActions.Write)
{
throw new UnauthorizedFhirActionException();
}

var input = request.Resources.Select(_ => new ResourceWrapperOperation(_.ResourceWrapper, true, false, null, false, false, null)).ToList();
SergeyGaluzo marked this conversation as resolved.
Show resolved Hide resolved
await _store.MergeAsync(input, new MergeOptions(false), cancellationToken);
return await Task.FromResult(new ImportBundleResponse(request.Resources.Count));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using EnsureThat;
using MediatR;
using Microsoft.Health.Fhir.Core.Features.Operations.Import;
using Microsoft.Health.Fhir.Core.Features.Operations.Import.Models;
using Microsoft.Health.Fhir.Core.Features.Persistence;

namespace Microsoft.Health.Fhir.Core.Messages.Import
{
public class ImportBundleRequest : IRequest<ImportBundleResponse>
{
public ImportBundleRequest(IList<ImportResource> resources)
{
Resources = EnsureArg.IsNotNull(resources, nameof(resources));
}

/// <summary>
/// Resources to import
/// </summary>
public IList<ImportResource> Resources { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

namespace Microsoft.Health.Fhir.Core.Messages.Import
{
public class ImportBundleResponse
{
public ImportBundleResponse(int count)
{
LoadedResources = count;
}

/// <summary>
/// Number of loaded resources
/// </summary>
public int LoadedResources { get; }
SergeyGaluzo marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Operations.Import;
using Microsoft.Health.Fhir.Core.Features.Operations.Import.Models;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Routing;
using Microsoft.Health.Fhir.Tests.Common;
using Microsoft.Health.Test.Utilities;
Expand Down Expand Up @@ -100,6 +101,7 @@ private ImportController GetController(ImportTaskConfiguration bulkImportConfig)
_urlResolver,
optionsOperationConfiguration,
optionsFeatures,
Substitute.For<IResourceWrapperFactory>(),
NullLogger<ImportController>.Instance);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,31 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using EnsureThat;
using Hl7.Fhir.Model;
using Hl7.Fhir.Serialization;
using MediatR;
using Microsoft.AspNetCore.DataProtection.KeyManagement;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Primitives;
using Microsoft.Health.Api.Features.Audit;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Fhir.Api.Configs;
using Microsoft.Health.Fhir.Api.Features.ActionResults;
using Microsoft.Health.Fhir.Api.Features.Filters;
using Microsoft.Health.Fhir.Api.Features.Headers;
using Microsoft.Health.Fhir.Api.Features.Operations;
using Microsoft.Health.Fhir.Api.Features.Operations.Import;
using Microsoft.Health.Fhir.Api.Features.Routing;
using Microsoft.Health.Fhir.Core.Configs;
Expand All @@ -29,9 +39,11 @@
using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Operations.Import;
using Microsoft.Health.Fhir.Core.Features.Operations.Import.Models;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Routing;
using Microsoft.Health.Fhir.Core.Messages.Import;
using Microsoft.Health.Fhir.ValueSets;
using Newtonsoft.Json;

namespace Microsoft.Health.Fhir.Api.Controllers
{
Expand All @@ -58,13 +70,15 @@ public class ImportController : Controller
private readonly FeatureConfiguration _features;
private readonly ILogger<ImportController> _logger;
private readonly ImportTaskConfiguration _importConfig;
private readonly IResourceWrapperFactory _resourceWrapperFactory;

public ImportController(
IMediator mediator,
RequestContextAccessor<IFhirRequestContext> fhirRequestContextAccessor,
IUrlResolver urlResolver,
IOptions<OperationsConfiguration> operationsConfig,
IOptions<FeatureConfiguration> features,
IResourceWrapperFactory resourceWrapperFactory,
ILogger<ImportController> logger)
{
EnsureArg.IsNotNull(fhirRequestContextAccessor, nameof(fhirRequestContextAccessor));
Expand All @@ -79,9 +93,80 @@ public ImportController(
_urlResolver = urlResolver;
_features = features.Value;
_mediator = mediator;
_resourceWrapperFactory = EnsureArg.IsNotNull(resourceWrapperFactory, nameof(resourceWrapperFactory));
_logger = logger;
}

[HttpPost]
[Route(KnownRoutes.ImportBundle)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update once we pass the discussion point.

[AuditEventType(AuditEventSubType.ImportBundle)]
public async Task<IActionResult> ImportBundle()
{
var sw = Stopwatch.StartNew();
var startDate = DateTime.UtcNow;
var resources = new List<ImportResource>();
var keys = new HashSet<ResourceKey>();
Fixed Show fixed Hide fixed
Fixed Show fixed Hide fixed
var fhirParser = new FhirJsonParser();
var importParser = new ImportResourceParser(fhirParser, _resourceWrapperFactory);
var index = 0L;
using var reader = new StreamReader(Request.Body, Encoding.UTF8);
try
{
if (Request.ContentType != null && Request.ContentType.Contains("application/fhir+ndjson", StringComparison.OrdinalIgnoreCase))
{
var line = await reader.ReadLineAsync();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the stream is really large, should we have some validation or limits?

Copy link
Contributor Author

@SergeyGaluzo SergeyGaluzo Feb 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get to default behavior using current endpoint for import bundles.

while (line != null)
{
var importResource = importParser.Parse(index, 0, 0, line, ImportMode.IncrementalLoad);
var key = importResource.ResourceWrapper.ToResourceKey(true);
if (!keys.Add(key))
{
throw new RequestNotValidException(string.Format(Resources.ResourcesMustBeUnique, key));
}

resources.Add(importResource);
line = await reader.ReadLineAsync();
index++;
}
}
else
{
var str = await reader.ReadToEndAsync();
var resource = await fhirParser.ParseAsync(str);
var bundle = resource.ToResourceElement().ToPoco<Bundle>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bypasses the MVC InputFormatters - https://github.com/microsoft/fhir-server/blob/main/src/Microsoft.Health.Fhir.Shared.Api/Features/Formatters/FhirJsonInputFormatter.cs.
I think this should be two different Action methods that delineate by media-type instead of trying to parse everything

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, it would require 2 endpoints, something like: $import/ndjson and $import/batch. I am not a fan of this idea. I will try implementing checks for stream size you mentioned below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MVC natively supports routing to the correct action based on many factors such as URL, method, headers etc. The url can be the same, I'm more referring to something like https://learn.microsoft.com/en-us/dotnet/api/microsoft.aspnetcore.mvc.consumesattribute?view=aspnetcore-8.0, where for the same action, one can consume fhir+json, while the other consumes ndjson

Copy link
Contributor Author

@SergeyGaluzo SergeyGaluzo Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separated NDJSON from JSON. JSON goes to the same endpoint as today. NDJOSON goes to $import.

foreach (var entry in bundle.Entry)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a lot of things on bundle that might need to be validated. i.e. Should this only be for "Batch" or "Transaction" bundles? Bundles also have a lot of processing rules that aren't checked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to avoid deep validation. Instead, we validate Resource only, and ignore the rest. We should be clear about it.

{
if (entry.Request?.Method != Bundle.HTTPVerb.PUT)
{
throw new RequestNotValidException(string.Format(Resources.InvalidBundleEntry, entry.Request?.Method, KnownRoutes.ImportBundle));
}

var importResource = importParser.Parse(index, 0, 0, entry.Resource, ImportMode.IncrementalLoad);
var key = importResource.ResourceWrapper.ToResourceKey(true);
if (!keys.Add(key))
{
throw new RequestNotValidException(string.Format(Resources.ResourcesMustBeUnique, key));
}

resources.Add(importResource);
index++;
}
}
}
catch (Exception)
SergeyGaluzo marked this conversation as resolved.
Show resolved Hide resolved
{
throw new RequestNotValidException(string.Format(Resources.ParsingError, $"Unable to parse resource at index={index}"));
}
Fixed Show fixed Hide fixed

var request = new ImportBundleRequest(resources);
var response = await _mediator.ImportBundleAsync(request.Resources, HttpContext.RequestAborted);
SergeyGaluzo marked this conversation as resolved.
Show resolved Hide resolved
var result = new ImportBundleResult(response.LoadedResources, HttpStatusCode.OK);
result.Headers["LoadedResources"] = response.LoadedResources.ToString();
await _mediator.Publish(new ImportBundleMetricsNotification(startDate, DateTime.UtcNow, response.LoadedResources), CancellationToken.None);
SergeyGaluzo marked this conversation as resolved.
Show resolved Hide resolved
_logger.LogInformation("Loaded {LoadedResources} resources, elapsed {Milliseconds} milliseconds.", response.LoadedResources, (int)sw.Elapsed.TotalMilliseconds);
return result;
}

[HttpPost]
[Route(KnownRoutes.Import)]
[ServiceFilter(typeof(ValidateImportRequestFilterAttribute))]
Expand All @@ -106,14 +191,14 @@ public async Task<IActionResult> Import([FromBody] Parameters importTaskParamete
throw new RequestNotValidException(Resources.InitialImportModeNotEnabled);
}

CreateImportResponse response = await _mediator.ImportAsync(
_fhirRequestContextAccessor.RequestContext.Uri,
importRequest.InputFormat,
importRequest.InputSource,
importRequest.Input,
importRequest.StorageDetail,
initialLoad ? ImportMode.InitialLoad : ImportMode.IncrementalLoad, // default to incremental mode
HttpContext.RequestAborted);
var response = await _mediator.ImportAsync(
_fhirRequestContextAccessor.RequestContext.Uri,
importRequest.InputFormat,
importRequest.InputSource,
importRequest.Input,
importRequest.StorageDetail,
initialLoad ? ImportMode.InitialLoad : ImportMode.IncrementalLoad, // default to incremental mode
HttpContext.RequestAborted);

var bulkImportResult = ImportResult.Accepted();
bulkImportResult.SetContentLocationHeader(_urlResolver, OperationsConstants.Import, response.TaskId);
Expand Down
13 changes: 13 additions & 0 deletions src/Microsoft.Health.Fhir.Shared.Client/FhirClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,19 @@ public async Task<FhirResponse> ConvertDataAsync(Parameters parameters, Cancella
return new FhirResponse(response);
}

public async Task<HttpResponseMessage> ImportBundleAsync(string bundle, bool isNdJson = false, CancellationToken cancellationToken = default)
{
string requestPath = "$import/Bundle";
using var message = new HttpRequestMessage(HttpMethod.Post, requestPath)
{
Content = new StringContent(bundle, Encoding.UTF8, isNdJson ? "application/fhir+ndjson" : _contentType),
};

using var response = await HttpClient.SendAsync(message, cancellationToken);
await EnsureSuccessStatusCodeAsync(response);
return response;
}

public async Task<Uri> ImportAsync(Parameters parameters, CancellationToken cancellationToken = default)
{
string requestPath = "$import";
Expand Down
Loading
Loading