Skip to content

Commit

Permalink
CON-1185 Implements data migration for ALE Id
Browse files Browse the repository at this point in the history
  • Loading branch information
dev4valtech committed Dec 17, 2019
1 parent 4e519dc commit 6bf08d6
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Esfa.Recruit.Vacancies.Client.Application.Queues;
using Esfa.Recruit.Vacancies.Client.Application.Queues.Messages;
using Esfa.Recruit.Vacancies.Client.Domain.Repositories;
using Esfa.Recruit.Vacancies.Client.Infrastructure.StorageQueue;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

namespace Esfa.Recruit.Vacancies.Jobs.Triggers.QueueTriggers
{
public class BeginMigrationQueueTrigger
{
private readonly IVacancyQuery _vacancyQuery;
private readonly IRecruitQueueService _recruitQueueService;
private readonly ILogger<BeginMigrationQueueTrigger> _logger;
public BeginMigrationQueueTrigger(
IVacancyQuery vacancyQuery,
IRecruitQueueService recruitQueueService,
ILogger<BeginMigrationQueueTrigger> logger)
{
_vacancyQuery = vacancyQuery;
_recruitQueueService = recruitQueueService;
_logger = logger;
}

public async Task BeginMigrationAsync(
[QueueTrigger(QueueNames.BeginMigrationQueueName, Connection = "QueueStorage")] string message,
TextWriter log)
{
await RunVacancyALEIdMigrationAsync();
}

private async Task RunVacancyALEIdMigrationAsync()
{
_logger.LogInformation("Begining queuing vacancies for ALE Id migration process");
var tasks = new List<Task>();
var vacancyIds = await _vacancyQuery.GetAllVacancyIdsAsync();
_logger.LogInformation($"Found {vacancyIds.Count()} vacancies for ALE Id migration");
foreach (var vacancyId in vacancyIds)
tasks.Add(SendVacancyALEIdMigrationMessage(vacancyId));
await Task.WhenAll(tasks);
}

private Task SendVacancyALEIdMigrationMessage(Guid vacancyId)
{
_logger.LogInformation($"Queueing up vacancy {vacancyId} for ALE Id migration");
var message = new DataMigrationQueueMessage {VacancyId = vacancyId};
return _recruitQueueService.AddMessageAsync<DataMigrationQueueMessage>(message);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System.IO;
using System.Threading.Tasks;
using Esfa.Recruit.Vacancies.Client.Application.Queues.Messages;
using Esfa.Recruit.Vacancies.Client.Domain.Repositories;
using Esfa.Recruit.Vacancies.Client.Infrastructure.Services.EmployerAccount;
using Esfa.Recruit.Vacancies.Client.Infrastructure.StorageQueue;
using Microsoft.Azure.WebJobs;
using System.Linq;
using Microsoft.Extensions.Logging;

namespace Esfa.Recruit.Vacancies.Jobs.Triggers.QueueTriggers
{
public class PerformDataMigrationQueueTrigger
{
private readonly IVacancyRepository _vacancyRepository;
private readonly IEmployerAccountProvider _employerAccountProvider;
private readonly ILogger<PerformDataMigrationQueueTrigger> _logger;
public PerformDataMigrationQueueTrigger(
IVacancyRepository vacancyRepository,
IEmployerAccountProvider employerAccountProvider,
ILogger<PerformDataMigrationQueueTrigger> logger)
{
_vacancyRepository = vacancyRepository;
_employerAccountProvider = employerAccountProvider;
_logger = logger;
}

public async Task ExecuteAsync(
[QueueTrigger(QueueNames.DataMigrationQueueName, Connection = "QueueStorage")] DataMigrationQueueMessage message,
TextWriter log)
{
await PerformVacancyALEIdMigration(message);
}

private async Task PerformVacancyALEIdMigration(DataMigrationQueueMessage message)
{
var vacancy = await _vacancyRepository.GetVacancyAsync(message.VacancyId);
if (vacancy == null || vacancy.LegalEntityId == 0 || string.IsNullOrWhiteSpace(vacancy.AccountLegalEntityPublicHashedId) == false)
{
_logger.LogWarning($"Bypassing vacancy {message.VacancyId}");
return;
}
var legalEntities = await _employerAccountProvider.GetLegalEntitiesConnectedToAccountAsync(vacancy.EmployerAccountId);
var selectedLegalEntity = legalEntities.FirstOrDefault(l => l.LegalEntityId == vacancy.LegalEntityId);
if (selectedLegalEntity == null)
{
_logger.LogError($"Unable to find legal entity for vacancy {message.VacancyId}");
return;
}
_logger.LogInformation($"Updating vacancy: {vacancy.Id} setting AccountLegalEntityPublicHashedId: {selectedLegalEntity.AccountLegalEntityPublicHashedId}");
vacancy.AccountLegalEntityPublicHashedId = selectedLegalEntity.AccountLegalEntityPublicHashedId;
await _vacancyRepository.UpdateAsync(vacancy);
_logger.LogInformation($"Successfully updated vacancy: {vacancy.Id} with AccountLegalEntityPublicHashedId: {selectedLegalEntity.AccountLegalEntityPublicHashedId}");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System;

namespace Esfa.Recruit.Vacancies.Client.Application.Queues.Messages
{
public class DataMigrationQueueMessage
{
public Guid VacancyId { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public interface IVacancyQuery
Task<IEnumerable<string>> GetDistinctVacancyOwningEmployerAccountsAsync();
Task<IEnumerable<long>> GetDistinctVacancyOwningProviderAccountsAsync();
Task<IEnumerable<long>> GetAllVacancyReferencesAsync();
Task<IEnumerable<Guid>> GetAllVacancyIdsAsync();
Task<IEnumerable<ProviderVacancySummary>> GetVacanciesAssociatedToProvider(long ukprn);
Task<IEnumerable<Vacancy>> GetProviderOwnedVacanciesForLegalEntityAsync(long ukprn, long legalEntityId);
Task<IEnumerable<Vacancy>> GetProviderOwnedVacanciesForEmployerWithoutLegalEntityAsync(long ukprn, string employerAccountId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,22 @@ public async Task<IEnumerable<long>> GetAllVacancyReferencesAsync()
return result.Where(r => r.HasValue).Select(r => r.Value);
}

public async Task<IEnumerable<Guid>> GetAllVacancyIdsAsync()
{
var filter = Builders<Vacancy>.Filter.Empty;

var collection = GetCollection<Vacancy>();

var result = await RetryPolicy.ExecuteAsync(_ =>
collection
.Find(filter)
.Project(v => v.Id)
.ToListAsync(),
new Context(nameof(GetAllVacancyReferencesAsync)));

return result;
}

public async Task<IEnumerable<ProviderVacancySummary>> GetVacanciesAssociatedToProvider(long ukprn)
{
var builder = Builders<Vacancy>.Filter;
Expand Down Expand Up @@ -283,7 +299,6 @@ public async Task<IEnumerable<Vacancy>> GetProviderOwnedVacanciesForLegalEntityA
return result;
}


public async Task<IEnumerable<Vacancy>> GetProviderOwnedVacanciesForEmployerWithoutLegalEntityAsync(long ukprn, string employerAccountId)
{
var builder = Builders<Vacancy>.Filter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ public static class QueueNames
public const string UpdateEmployerUserAccountQueueName = "update-employer-user-account-queue";
public const string DeleteStaleQueryStoreDocumentsQueueName = "delete-stale-query-store-documents-queue";
public const string DeleteStaleVacanciesQueueName = "delete-stale-vacancies-queue";
public const string BeginMigrationQueueName = "begin-migration";
public const string DataMigrationQueueName = "data-migration";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ internal class RecruitStorageQueueService : StorageQueueServiceBase, IRecruitQue
{ typeof(VacancyAnalyticsQueueMessage), QueueNames.GenerateVacancyAnalyticsQueueName },
{ typeof(VacancyStatusQueueMessage), QueueNames.VacancyStatusQueueName },
{ typeof(UpdateEmployerUserAccountQueueMessage), QueueNames.UpdateEmployerUserAccountQueueName },
{ typeof(DeleteStaleQueryStoreDocumentsQueueMessage), QueueNames.DeleteStaleQueryStoreDocumentsQueueName }
{ typeof(DeleteStaleQueryStoreDocumentsQueueMessage), QueueNames.DeleteStaleQueryStoreDocumentsQueueName },
{ typeof(DataMigrationQueueMessage), QueueNames.DataMigrationQueueName }
};

protected override string ConnectionString { get; }
Expand Down

0 comments on commit 6bf08d6

Please sign in to comment.