Skip to content

Commit

Permalink
CON-1185 added delay and retries (#863) +semver: patch
Browse files Browse the repository at this point in the history
* CON-1185 added delay and retries

* Updated slack client to stop throwing errors and log warning instead

* Trying to solve a bizzare situation
  • Loading branch information
Deven Shah authored Jan 21, 2020
1 parent 834a137 commit fc05192
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Esfa.Recruit.Vacancies.Client.Application.Queues;
using Esfa.Recruit.Vacancies.Client.Application.Queues.Messages;
Expand Down Expand Up @@ -40,15 +41,19 @@ private async Task RunVacancyALEIdMigrationAsync()
var tasks = new List<Task>();
var vacancyIds = await _vacancyQuery.GetAllVacancyIdsAsync();
_logger.LogInformation($"Found {vacancyIds.Count()} vacancies for ALE Id migration");
var serialNumber = 0;
foreach (var vacancyId in vacancyIds)
tasks.Add(SendVacancyALEIdMigrationMessage(vacancyId));
{
if(serialNumber % 40 == 0) await Task.Delay(TimeSpan.FromSeconds(5));
tasks.Add(SendVacancyALEIdMigrationMessage(++serialNumber, vacancyId));
}
await Task.WhenAll(tasks);
}

private Task SendVacancyALEIdMigrationMessage(Guid vacancyId)
private Task SendVacancyALEIdMigrationMessage(int serialNumber, Guid vacancyId)
{
_logger.LogInformation($"Queueing up vacancy {vacancyId} for ALE Id migration");
var message = new DataMigrationQueueMessage {VacancyId = vacancyId};
var message = new DataMigrationQueueMessage(serialNumber, vacancyId);
return _recruitQueueService.AddMessageAsync<DataMigrationQueueMessage>(message);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
using Microsoft.Azure.WebJobs;
using System.Linq;
using Microsoft.Extensions.Logging;
using Esfa.Recruit.Vacancies.Client.Domain.Entities;
using System;
using SFA.DAS.EAS.Account.Api.Types;
using Polly;
using System.Collections.Generic;

namespace Esfa.Recruit.Vacancies.Jobs.Triggers.QueueTriggers
{
Expand Down Expand Up @@ -34,23 +39,78 @@ public async Task ExecuteAsync(

private async Task PerformVacancyALEIdMigration(DataMigrationQueueMessage message)
{
var vacancy = await _vacancyRepository.GetVacancyAsync(message.VacancyId);
if (vacancy == null || vacancy.LegalEntityId == 0 || string.IsNullOrWhiteSpace(vacancy.AccountLegalEntityPublicHashedId) == false)
var vacancyId = message.VacancyId;

_logger.LogInformation($"{message.SerialNumber}: Carrying out ALEId migration on vacancy {vacancyId} ");
Vacancy vacancy;
try
{
vacancy = await _vacancyRepository.GetVacancyAsync(vacancyId);
}
catch (Exception ex)
{
_logger.LogError(ex, $"{message.SerialNumber}: Error fetching vacancy - Bypassing vacancy {vacancyId}");
return;
}

if (vacancy == null)
{
_logger.LogWarning($"Bypassing vacancy {message.VacancyId}");
_logger.LogWarning($"{message.SerialNumber}: Not found - Bypassing vacancy {vacancyId}");
return;
}
var legalEntities = await _employerAccountProvider.GetLegalEntitiesConnectedToAccountAsync(vacancy.EmployerAccountId);
var selectedLegalEntity = legalEntities.FirstOrDefault(l => l.LegalEntityId == vacancy.LegalEntityId);

if (vacancy.LegalEntityId == 0)
{
_logger.LogWarning($"{message.SerialNumber}: Missing legalEntity - Bypassing vacancy {vacancyId}");
return;
}

if (string.IsNullOrWhiteSpace(vacancy.AccountLegalEntityPublicHashedId) == false)
{
_logger.LogWarning($"{message.SerialNumber}: Already updated - Bypassing vacancy {vacancyId}");
return;
}

LegalEntityViewModel selectedLegalEntity;
try
{
var retryPolicy = GetApiRetryPolicy();
var legalEntities = await retryPolicy.ExecuteAsync(context =>
_employerAccountProvider.GetLegalEntitiesConnectedToAccountAsync(vacancy.EmployerAccountId),
new Dictionary<string, object>() {{ "apiCall", "employer details" }});
selectedLegalEntity = legalEntities.FirstOrDefault(l => l.LegalEntityId == vacancy.LegalEntityId);
}
catch (Exception ex)
{
_logger.LogError(ex, $"{message.SerialNumber}: Error fetching legal entity details - Bypassing vacancy {vacancyId}");
throw;
}

if (selectedLegalEntity == null)
{
_logger.LogError($"Unable to find legal entity for vacancy {message.VacancyId}");
_logger.LogError($"Unable to find legal entity for vacancy {vacancyId}");
return;
}
_logger.LogInformation($"Updating vacancy: {vacancy.Id} setting AccountLegalEntityPublicHashedId: {selectedLegalEntity.AccountLegalEntityPublicHashedId}");

_logger.LogInformation($"Updating vacancy: {vacancyId} setting AccountLegalEntityPublicHashedId: {selectedLegalEntity.AccountLegalEntityPublicHashedId}");
vacancy.AccountLegalEntityPublicHashedId = selectedLegalEntity.AccountLegalEntityPublicHashedId;
await _vacancyRepository.UpdateAsync(vacancy);
_logger.LogInformation($"Successfully updated vacancy: {vacancy.Id} with AccountLegalEntityPublicHashedId: {selectedLegalEntity.AccountLegalEntityPublicHashedId}");
_logger.LogInformation($"Successfully updated vacancy: {vacancyId} with AccountLegalEntityPublicHashedId: {selectedLegalEntity.AccountLegalEntityPublicHashedId}");
}

private Polly.Retry.RetryPolicy GetApiRetryPolicy()
{
return Policy
.Handle<Exception>()
.WaitAndRetryAsync(new[]
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(10),
TimeSpan.FromSeconds(15)
}, (exception, timeSpan, retryCount, context) => {
_logger.LogWarning(exception, $"Error connecting to Apprenticeships Api for {context["apiCall"]}. Retrying in {timeSpan.Seconds} secs...attempt: {retryCount}");
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ namespace Esfa.Recruit.Vacancies.Client.Application.Queues.Messages
{
public class DataMigrationQueueMessage
{
public Guid VacancyId { get; set; }
public int SerialNumber { get; }
public Guid VacancyId { get; }

public DataMigrationQueueMessage(int serialNumber, Guid vacancyId)
{
SerialNumber = serialNumber;
VacancyId = vacancyId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ public async Task UpdateAsync(Vacancy vacancy)
{
var filter = Builders<Vacancy>.Filter.Eq(v => v.Id, vacancy.Id);
var collection = GetCollection<Vacancy>();
await RetryPolicy.ExecuteAsync(_ =>
collection.ReplaceOneAsync(filter, vacancy),
await RetryPolicy.ExecuteAsync(async _ =>
await collection.ReplaceOneAsync(filter, vacancy),
new Context(nameof(UpdateAsync)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Net.Http;
using System.Threading.Tasks;
using Esfa.Recruit.Vacancies.Client.Application.Exceptions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
Expand All @@ -12,19 +13,21 @@ internal class SlackClient : ISlackClient
{
private readonly string _webhookUrl;
private readonly HttpClient _httpClient;
private readonly ILogger<SlackClient> _logger;

private readonly JsonSerializerSettings _jsonSerializerSettings = new JsonSerializerSettings
{
ContractResolver = new DefaultContractResolver { NamingStrategy = new SnakeCaseNamingStrategy() },
NullValueHandling = NullValueHandling.Ignore
};

public SlackClient(IHttpClientFactory clientFactory, IOptions<SlackConfiguration> slackConfig)
public SlackClient(IHttpClientFactory clientFactory, IOptions<SlackConfiguration> slackConfig, ILogger<SlackClient> logger)
{
_httpClient = clientFactory.CreateClient();
_httpClient.Timeout = new TimeSpan(0, 0, 30);

_webhookUrl = slackConfig.Value.WebHookUrl;
_logger = logger;
}

public async Task PostAsync(SlackMessage message, SlackVacancyNotificationType emoji)
Expand All @@ -41,7 +44,7 @@ public async Task PostAsync(SlackMessage message, SlackVacancyNotificationType e
var success = content.Equals("ok", StringComparison.OrdinalIgnoreCase);

if (!success)
throw new NotificationException($"Failed to send notification to Slack with url: {_webhookUrl}");
_logger.LogWarning($"Failed to send notification to Slack with url: {_webhookUrl}");
}
}

Expand Down

0 comments on commit fc05192

Please sign in to comment.