Skip to content

Commit

Permalink
Parallel defrag with index rebuild (#4646)
Browse files Browse the repository at this point in the history
* Defrag

* cancel

* removed owner set

* code analysis comment

* change owner for local only

* Adding start date
  • Loading branch information
SergeyGaluzo authored Sep 27, 2024
1 parent 1d80f9a commit a745775
Show file tree
Hide file tree
Showing 29 changed files with 5,699 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public async Task GivenBulkDeleteJob_WhenNoResourceTypeIsGiven_ThenProcessingJob
};

await _orchestratorJob.ExecuteAsync(jobInfo, CancellationToken.None);
await _queueClient.ReceivedWithAnyArgs(1).EnqueueAsync(Arg.Any<byte>(), Arg.Any<string[]>(), Arg.Any<long?>(), false, false, Arg.Any<CancellationToken>());
await _queueClient.ReceivedWithAnyArgs(1).EnqueueAsync(Arg.Any<byte>(), Arg.Any<string[]>(), Arg.Any<long?>(), false, Arg.Any<CancellationToken>());
await _searchService.ReceivedWithAnyArgs(1).GetUsedResourceTypes(Arg.Any<CancellationToken>());

// Checks that one processing job was queued
Expand Down Expand Up @@ -95,7 +95,7 @@ public async Task GivenBulkDeleteJob_WhenResourceTypeIsGiven_ThenOneProcessingJo
};

await _orchestratorJob.ExecuteAsync(jobInfo, CancellationToken.None);
await _queueClient.ReceivedWithAnyArgs(1).EnqueueAsync(Arg.Any<byte>(), Arg.Any<string[]>(), Arg.Any<long?>(), false, false, Arg.Any<CancellationToken>());
await _queueClient.ReceivedWithAnyArgs(1).EnqueueAsync(Arg.Any<byte>(), Arg.Any<string[]>(), Arg.Any<long?>(), false, Arg.Any<CancellationToken>());
await _searchService.DidNotReceiveWithAnyArgs().GetUsedResourceTypes(Arg.Any<CancellationToken>());

// Checks that one processing job was queued
Expand Down Expand Up @@ -128,7 +128,7 @@ public async Task GivenBulkDeleteJob_WhenNoResourcesMatchCriteria_ThenNoProcessi
};

await _orchestratorJob.ExecuteAsync(jobInfo, CancellationToken.None);
await _queueClient.DidNotReceiveWithAnyArgs().EnqueueAsync(Arg.Any<byte>(), Arg.Any<string[]>(), Arg.Any<long?>(), false, false, Arg.Any<CancellationToken>());
await _queueClient.DidNotReceiveWithAnyArgs().EnqueueAsync(Arg.Any<byte>(), Arg.Any<string[]>(), Arg.Any<long?>(), false, Arg.Any<CancellationToken>());
await _searchService.ReceivedWithAnyArgs(1).GetUsedResourceTypes(Arg.Any<CancellationToken>());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public async Task GivenBulkDeleteRequest_WhenJobCreationRequested_ThenJobIsCreat

_authorizationService.CheckAccess(Arg.Any<DataActions>(), Arg.Any<CancellationToken>()).Returns(DataActions.HardDelete | DataActions.Delete);
_contextAccessor.RequestContext.BundleIssues.Clear();
_queueClient.EnqueueAsync((byte)QueueType.BulkDelete, Arg.Any<string[]>(), Arg.Any<long?>(), false, false, Arg.Any<CancellationToken>()).Returns(args =>
_queueClient.EnqueueAsync((byte)QueueType.BulkDelete, Arg.Any<string[]>(), Arg.Any<long?>(), false, Arg.Any<CancellationToken>()).Returns(args =>
{
var definition = JsonConvert.DeserializeObject<BulkDeleteDefinition>(args.ArgAt<string[]>(1)[0]);
Assert.Equal(_testUrl, definition.Url);
Expand All @@ -91,7 +91,7 @@ public async Task GivenBulkDeleteRequest_WhenJobCreationRequested_ThenJobIsCreat
var response = await _handler.Handle(request, CancellationToken.None);
Assert.NotNull(response);
Assert.Equal(1, response.Id);
await _queueClient.ReceivedWithAnyArgs(1).EnqueueAsync((byte)QueueType.BulkDelete, Arg.Any<string[]>(), Arg.Any<long?>(), false, false, Arg.Any<CancellationToken>());
await _queueClient.ReceivedWithAnyArgs(1).EnqueueAsync((byte)QueueType.BulkDelete, Arg.Any<string[]>(), Arg.Any<long?>(), false, Arg.Any<CancellationToken>());
}

[Fact]
Expand Down Expand Up @@ -127,7 +127,7 @@ public async Task GivenBulkDeleteRequest_WhenJobCreationFails_ThenExceptionIsThr
{
_authorizationService.CheckAccess(Arg.Any<DataActions>(), Arg.Any<CancellationToken>()).Returns(DataActions.HardDelete | DataActions.Delete);
_contextAccessor.RequestContext.BundleIssues.Clear();
_queueClient.EnqueueAsync((byte)QueueType.BulkDelete, Arg.Any<string[]>(), Arg.Any<long?>(), false, false, Arg.Any<CancellationToken>()).Returns(new List<JobInfo>());
_queueClient.EnqueueAsync((byte)QueueType.BulkDelete, Arg.Any<string[]>(), Arg.Any<long?>(), false, Arg.Any<CancellationToken>()).Returns(new List<JobInfo>());

var request = new CreateBulkDeleteRequest(DeleteOperation.HardDelete, null, new List<Tuple<string, string>>(), false);
await Assert.ThrowsAsync<JobNotExistException>(async () => await _handler.Handle(request, CancellationToken.None));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public virtual async Task<ExportJobOutcome> CreateExportJobAsync(ExportJobRecord
{
var clone = jobRecord.Clone();
clone.QueuedTime = DateTime.Parse("1900-01-01");
var results = await _queueClient.EnqueueAsync(QueueType.Export, cancellationToken, isCompleted: clone.Status == OperationStatus.Completed, definitions: clone);
var results = await _queueClient.EnqueueAsync(QueueType.Export, cancellationToken, definitions: clone);

if (results.Count != 1)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ namespace Microsoft.Health.Fhir.Core.Features.Operations;

public static class QueueClientExtensions
{
public static Task<IReadOnlyList<JobInfo>> EnqueueAsync<T>(this IQueueClient queueClient, QueueType queueType, CancellationToken cancellationToken, long? groupId = null, bool forceOneActiveJobGroup = false, bool isCompleted = false, params T[] definitions)
public static Task<IReadOnlyList<JobInfo>> EnqueueAsync<T>(this IQueueClient queueClient, QueueType queueType, CancellationToken cancellationToken, long? groupId = null, bool forceOneActiveJobGroup = false, params T[] definitions)
where T : IJobData
{
EnsureArg.HasItems(definitions, nameof(definitions));
return queueClient.EnqueueAsync((byte)queueType, definitions.Select(x => JsonConvert.SerializeObject(x)).ToArray(), groupId, forceOneActiveJobGroup, isCompleted, cancellationToken);
return queueClient.EnqueueAsync((byte)queueType, definitions.Select(x => JsonConvert.SerializeObject(x)).ToArray(), groupId, forceOneActiveJobGroup, cancellationToken);
}

public static Task<IReadOnlyCollection<JobInfo>> DequeueJobsAsync(this IQueueClient queueClient, QueueType queueType, int numberOfJobsToDequeue, string worker, int heartbeatTimeoutSec, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private void SetupMockQueue(long orchestratorJobId, List<JobInfo> defaultJobs =
_mockSearchService.GetUsedResourceTypes(Arg.Any<CancellationToken>()).Returns(new List<string>() { "Patient", "Observation", "Encounter" });
_mockSearchService.GetFeedRanges(Arg.Any<CancellationToken>()).Returns(new List<string>() { "Range1", "Range2", "Range3" });

_mockQueueClient.EnqueueAsync(Arg.Any<byte>(), Arg.Any<string[]>(), orchestratorJobId, false, false, Arg.Any<CancellationToken>()).Returns(x =>
_mockQueueClient.EnqueueAsync(Arg.Any<byte>(), Arg.Any<string[]>(), orchestratorJobId, false, Arg.Any<CancellationToken>()).Returns(x =>
{
string[] definitions = x.ArgAt<string[]>(1);
Assert.Single(definitions); // CosmosDB export jobs always have a single definition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, CancellationToken cancel
string[] definitions = [JsonConvert.SerializeObject(processingRecord)];

_logger.LogJobInformation(jobInfo, "Enqueuing export job (1).");
await _queueClient.EnqueueAsync((byte)QueueType.Export, definitions, jobInfo.GroupId, false, false, cancellationToken);
await _queueClient.EnqueueAsync((byte)QueueType.Export, definitions, jobInfo.GroupId, false, cancellationToken);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public async Task<IReadOnlyList<JobInfo>> EnqueueAsync(
string[] definitions,
long? groupId,
bool forceOneActiveJobGroup,
bool isCompleted,
CancellationToken cancellationToken)
{
EnsureArg.IsNotNull(definitions, nameof(definitions));
Expand Down Expand Up @@ -121,18 +120,18 @@ AND ARRAY_CONTAINS([{string.Join(",", definitionHashes.Select(x => $"'{x.Key}'")
throw new JobConflictException("Failed to enqueue job.");
}

jobInfos.AddRange(await CreateNewJob(id, queueType, newDefinitions, groupId, isCompleted, cancellationToken));
jobInfos.AddRange(await CreateNewJob(id, queueType, newDefinitions, groupId, cancellationToken));
}
}
else
{
jobInfos.AddRange(await CreateNewJob(id, queueType, newDefinitions, groupId, isCompleted, cancellationToken));
jobInfos.AddRange(await CreateNewJob(id, queueType, newDefinitions, groupId, cancellationToken));
}

return jobInfos;
}

private async Task<IReadOnlyList<JobInfo>> CreateNewJob(long id, byte queueType, string[] definitions, long? groupId, bool isCompleted, CancellationToken cancellationToken)
private async Task<IReadOnlyList<JobInfo>> CreateNewJob(long id, byte queueType, string[] definitions, long? groupId, CancellationToken cancellationToken)
{
var jobInfo = new JobGroupWrapper
{
Expand All @@ -151,7 +150,7 @@ private async Task<IReadOnlyList<JobInfo>> CreateNewJob(long id, byte queueType,
var definitionInfo = new JobDefinitionWrapper
{
JobId = (jobId++).ToString(),
Status = isCompleted ? (byte)JobStatus.Completed : (byte)JobStatus.Created,
Status = (byte)JobStatus.Created,
Definition = item,
DefinitionHash = item.ComputeHash(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private void SetupMockQueue(int numExpectedJobsPerResourceType, long orchestrato
return list;
});

_mockQueueClient.EnqueueAsync(Arg.Any<byte>(), Arg.Any<string[]>(), orchestratorJobId, false, false, Arg.Any<CancellationToken>()).Returns(x =>
_mockQueueClient.EnqueueAsync(Arg.Any<byte>(), Arg.Any<string[]>(), orchestratorJobId, false, Arg.Any<CancellationToken>()).Returns(x =>
{
string[] definitions = x.ArgAt<string[]>(1);
Assert.Equal(numExpectedJobsPerResourceType, definitions.Length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public async Task GivenAnOrchestratorJobAndWrongEtag_WhenOrchestratorJobStart_Th
return properties;
});
TestQueueClient testQueueClient = new TestQueueClient();
JobInfo orchestratorJobInfo = (await testQueueClient.EnqueueAsync(0, new string[] { JsonConvert.SerializeObject(importOrchestratorInputData) }, 1, false, false, CancellationToken.None)).First();
JobInfo orchestratorJobInfo = (await testQueueClient.EnqueueAsync(0, new string[] { JsonConvert.SerializeObject(importOrchestratorInputData) }, 1, false, CancellationToken.None)).First();

ImportOrchestratorJob orchestratorJob = new ImportOrchestratorJob(
mediator,
Expand Down Expand Up @@ -160,7 +160,7 @@ public async Task GivenAnOrchestratorJob_WhenIntegrationExceptionThrown_ThenJobS
throw new IntegrationDataStoreException("dummy", HttpStatusCode.Unauthorized);
});
TestQueueClient testQueueClient = new TestQueueClient();
JobInfo orchestratorJobInfo = (await testQueueClient.EnqueueAsync(0, new string[] { JsonConvert.SerializeObject(importOrchestratorJobInputData) }, 1, false, false, CancellationToken.None)).First();
JobInfo orchestratorJobInfo = (await testQueueClient.EnqueueAsync(0, new string[] { JsonConvert.SerializeObject(importOrchestratorJobInputData) }, 1, false, CancellationToken.None)).First();

ImportOrchestratorJob orchestratorJob = new ImportOrchestratorJob(
mediator,
Expand Down
Loading

0 comments on commit a745775

Please sign in to comment.