From 86daad24f26ebbd86de68e01d50fbd5d965c415c Mon Sep 17 00:00:00 2001 From: Fernando Marins <43406405+fernando-a-marins@users.noreply.github.com> Date: Wed, 1 Mar 2023 14:36:44 +0000 Subject: [PATCH] feat: delete queues cleanup (#100) --- .../KafkaClusterConfigurationBuilderHelper.cs | 41 ++-- .../KafkaClusterConfigurationBuilderHelper.cs | 15 +- .../Core/Bootstrappers/BootstrapperKafka.cs | 61 +++--- .../Repositories/SqlServerRepository.cs | 3 +- .../DeleteQueuesTests.cs | 169 ++++++++++++++++ .../Repositories/IRetryQueueItemRepository.cs | 2 + .../Repositories/IRetryQueueRepository.cs | 5 + .../Repositories/RetryQueueItemRepository.cs | 19 +- .../Repositories/RetryQueueRepository.cs | 42 +++- .../RetryQueueDataProvider.cs | 22 +++ .../Repositories/IRetryQueueRepository.cs | 4 +- .../Repositories/RetryQueueRepository.cs | 27 ++- .../RetryQueueDataProvider.cs | 19 ++ .../Polling/CleanupPollingDefinitionTests.cs | 53 +++++ .../RetryDurablePollingDefinitionTests.cs | 33 +--- ...ryDurableRetryPlanBeforeDefinitionTests.cs | 2 +- .../Polling/JobDataProvidersFactoryTests.cs | 68 +++++++ .../Polling/Jobs/CleanupPollingJobTests.cs | 86 ++++++++ .../RetryDurablePollingJobTests.cs} | 24 ++- .../Polling/QueueTrackerCoordinatorTests.cs | 106 +++++----- .../Polling/QueueTrackerFactoryTests.cs | 67 +++---- .../Durable/Polling/QueueTrackerTests.cs | 187 +++++++++++++----- ...ConsumerGuaranteeOrderedMiddlewareTests.cs | 2 +- ...tryDurableConsumerLatestMiddlewareTests.cs | 15 +- .../Durable/RetryDurableMiddlewareTests.cs | 2 +- .../Forever/RetryForeverDefinitionTests.cs | 2 +- .../Simple/RetrySimpleDefinitionTests.cs | 2 +- .../Readers/DboCollectionNavigatorTests.cs | 65 +++--- .../Readers/RetryQueueReaderTests.cs | 92 ++++----- .../CleanupPollingDefinitionBuilder.cs | 34 ++++ .../Polling/PollingDefinitionBuilder.cs | 22 +++ .../PollingDefinitionsAggregatorBuilder.cs | 75 +++++++ .../RetryDurablePollingDefinitionBuilder.cs | 34 ++++ .../Builders/RetryDurableDefinitionBuilder.cs | 25 ++- ...DurableEmbeddedClusterDefinitionBuilder.cs | 28 +-- ...DurableQueuePollingJobDefinitionBuilder.cs | 54 ----- .../Polling/CleanupPollingDefinition.cs | 30 +++ .../Definitions/Polling/PollingDefinition.cs | 27 +++ .../Polling/PollingDefinitionsAggregator.cs | 25 +++ .../Definitions/Polling/PollingJobType.cs | 10 + .../Polling/RetryDurablePollingDefinition.cs | 27 +++ .../RetryDurablePollingDefinition.cs | 41 ---- .../Extensions/JobDataMapExtensions.cs | 28 +++ .../Durable/Polling/IJobDataProvider.cs | 14 ++ .../Polling/IJobDataProvidersFactory.cs | 9 + .../Durable/Polling/IJobDetailProvider.cs | 9 - .../Polling/IQueueTrackerCoordinator.cs | 9 +- .../Durable/Polling/IQueueTrackerFactory.cs | 7 +- .../Durable/Polling/ITriggerProvider.cs | 3 +- .../Polling/JobDataProvidersFactory.cs | 98 +++++++++ .../Durable/Polling/JobDetailProvider.cs | 63 ------ .../Polling/Jobs/CleanupJobDataProvider.cs | 55 ++++++ .../Durable/Polling/Jobs/CleanupPollingJob.cs | 60 ++++++ .../PollingJobConstants.cs} | 7 +- .../Jobs/RetryDurableJobDataProvider.cs | 72 +++++++ .../RetryDurablePollingJob.cs} | 70 ++----- .../Durable/Polling/QueueTracker.cs | 161 ++++++++------- .../Polling/QueueTrackerCoordinator.cs | 23 +-- .../Durable/Polling/QueueTrackerFactory.cs | 54 ++--- .../Durable/Polling/TriggerProvider.cs | 39 ++-- .../Actions/Delete/DeleteQueuesInput.cs | 34 ++++ .../Actions/Delete/DeleteQueuesResult.cs | 16 ++ .../IRetryDurableQueueRepository.cs | 3 + .../IRetryDurableQueueRepositoryProvider.cs | 3 + .../NullRetryDurableQueueRepository.cs | 6 +- .../Repository/RetryDurableQueueRepository.cs | 24 ++- 66 files changed, 1808 insertions(+), 726 deletions(-) create mode 100644 src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/DeleteQueuesTests.cs create mode 100644 src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Definitions/Polling/CleanupPollingDefinitionTests.cs rename src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Definitions/{ => Polling}/RetryDurablePollingDefinitionTests.cs (53%) create mode 100644 src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/JobDataProvidersFactoryTests.cs create mode 100644 src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/Jobs/CleanupPollingJobTests.cs rename src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/{QueuePollingJobTests.cs => Jobs/RetryDurablePollingJobTests.cs} (92%) create mode 100644 src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/CleanupPollingDefinitionBuilder.cs create mode 100644 src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/PollingDefinitionBuilder.cs create mode 100644 src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/PollingDefinitionsAggregatorBuilder.cs create mode 100644 src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/RetryDurablePollingDefinitionBuilder.cs delete mode 100644 src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableQueuePollingJobDefinitionBuilder.cs create mode 100644 src/KafkaFlow.Retry/Durable/Definitions/Polling/CleanupPollingDefinition.cs create mode 100644 src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingDefinition.cs create mode 100644 src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingDefinitionsAggregator.cs create mode 100644 src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingJobType.cs create mode 100644 src/KafkaFlow.Retry/Durable/Definitions/Polling/RetryDurablePollingDefinition.cs delete mode 100644 src/KafkaFlow.Retry/Durable/Definitions/RetryDurablePollingDefinition.cs create mode 100644 src/KafkaFlow.Retry/Durable/Polling/Extensions/JobDataMapExtensions.cs create mode 100644 src/KafkaFlow.Retry/Durable/Polling/IJobDataProvider.cs create mode 100644 src/KafkaFlow.Retry/Durable/Polling/IJobDataProvidersFactory.cs delete mode 100644 src/KafkaFlow.Retry/Durable/Polling/IJobDetailProvider.cs create mode 100644 src/KafkaFlow.Retry/Durable/Polling/JobDataProvidersFactory.cs delete mode 100644 src/KafkaFlow.Retry/Durable/Polling/JobDetailProvider.cs create mode 100644 src/KafkaFlow.Retry/Durable/Polling/Jobs/CleanupJobDataProvider.cs create mode 100644 src/KafkaFlow.Retry/Durable/Polling/Jobs/CleanupPollingJob.cs rename src/KafkaFlow.Retry/Durable/Polling/{QueuePollingJobConstants.cs => Jobs/PollingJobConstants.cs} (66%) create mode 100644 src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurableJobDataProvider.cs rename src/KafkaFlow.Retry/Durable/Polling/{QueuePollingJob.cs => Jobs/RetryDurablePollingJob.cs} (66%) create mode 100644 src/KafkaFlow.Retry/Durable/Repository/Actions/Delete/DeleteQueuesInput.cs create mode 100644 src/KafkaFlow.Retry/Durable/Repository/Actions/Delete/DeleteQueuesResult.cs diff --git a/samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs b/samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs index 30996f1b..568f5d5d 100644 --- a/samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs +++ b/samples/KafkaFlow.Retry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs @@ -74,13 +74,23 @@ internal static IClusterConfigurationBuilder SetupRetryDurableMongoDb( ) .Enabled(true) ) - .WithQueuePollingJobConfiguration( + .WithPollingJobsConfiguration( configure => configure - .WithId("retry-durable-mongodb-polling-id") - .WithCronExpression("0 0/1 * 1/1 * ? *") - .WithExpirationIntervalFactor(1) - .WithFetchSize(10) - .Enabled(true) + .WithSchedulerId("retry-durable-mongodb-polling-id") + .WithRetryDurablePollingConfiguration( + configure => configure + .WithCronExpression("0 0/1 * 1/1 * ? *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(10) + .Enabled(true) + ) + .WithCleanupPollingConfiguration( + configure => configure + .WithCronExpression("0 0 * 1/1 * ? *") + .WithRowsPerRequest(1048) + .WithTimeToLiveInDays(60) + .Enabled(true) + ) )) .AddTypedHandlers( handlers => handlers @@ -150,13 +160,20 @@ internal static IClusterConfigurationBuilder SetupRetryDurableSqlServer( ) .Enabled(true) ) - .WithQueuePollingJobConfiguration( + .WithPollingJobsConfiguration( configure => configure - .WithId("retry-durable-sqlserver-polling-id") - .WithCronExpression("0 0/1 * 1/1 * ? *") - .WithExpirationIntervalFactor(1) - .WithFetchSize(10) - .Enabled(true) + .WithSchedulerId("retry-durable-sqlserver-polling-id") + .WithRetryDurablePollingConfiguration( + configure => configure + .WithCronExpression("0 0/1 * 1/1 * ? *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(10) + .Enabled(true) + ) + .WithCleanupPollingConfiguration( + configure => configure + .Enabled(false) + ) )) .AddTypedHandlers( handlers => handlers diff --git a/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs b/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs index 29bb06c2..06ebd2a0 100644 --- a/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs +++ b/samples/KafkaFlow.Retry.SchemaRegistry.Sample/Helpers/KafkaClusterConfigurationBuilderHelper.cs @@ -84,13 +84,16 @@ internal static IClusterConfigurationBuilder SetupRetryDurableMongoAvroDb( ) .Enabled(true) ) - .WithQueuePollingJobConfiguration( + .WithPollingJobsConfiguration( configure => configure - .WithId("retry-durable-mongodb-avro-polling-id") - .WithCronExpression("0 0/1 * 1/1 * ? *") - .WithExpirationIntervalFactor(1) - .WithFetchSize(10) - .Enabled(true) + .WithSchedulerId("retry-durable-mongodb-avro-polling-id") + .WithRetryDurablePollingConfiguration( + configure => configure + .WithCronExpression("0 0/1 * 1/1 * ? *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(10) + .Enabled(true) + ) )) .AddTypedHandlers( handlers => handlers diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/BootstrapperKafka.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/BootstrapperKafka.cs index 8552a79a..27b89ee5 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/BootstrapperKafka.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/BootstrapperKafka.cs @@ -89,13 +89,16 @@ internal static IClusterConfigurationBuilder SetupRetryDurableGuaranteeOrderedCo handlers => handlers .WithHandlerLifetime(InstanceLifetime.Transient) .AddHandler())) - .WithQueuePollingJobConfiguration( + .WithPollingJobsConfiguration( configure => configure - .Enabled(true) - .WithId("custom_search_key_durable_guarantee_ordered_consumption_mongo_db") - .WithCronExpression("0/30 * * ? * * *") - .WithExpirationIntervalFactor(1) - .WithFetchSize(256)) + .WithSchedulerId("custom_search_key_durable_guarantee_ordered_consumption_mongo_db") + .WithRetryDurablePollingConfiguration( + configure => configure + .Enabled(true) + .WithCronExpression("0/30 * * ? * * *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(256)) + ) .WithMongoDbDataProvider( mongoDbConnectionString, mongoDbDatabaseName, @@ -162,13 +165,16 @@ internal static IClusterConfigurationBuilder SetupRetryDurableGuaranteeOrderedCo handlers => handlers .WithHandlerLifetime(InstanceLifetime.Transient) .AddHandler())) - .WithQueuePollingJobConfiguration( + .WithPollingJobsConfiguration( configure => configure - .Enabled(true) - .WithId("custom_search_key_durable_guarantee_ordered_consumption_sql_server") - .WithCronExpression("0/30 * * ? * * *") - .WithExpirationIntervalFactor(1) - .WithFetchSize(256)) + .WithSchedulerId("custom_search_key_durable_guarantee_ordered_consumption_sql_server") + .WithRetryDurablePollingConfiguration( + configure => configure + .Enabled(true) + .WithCronExpression("0/30 * * ? * * *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(256)) + ) .WithSqlServerDataProvider( sqlServerConnectionString, sqlServerDatabaseName) @@ -235,13 +241,16 @@ internal static IClusterConfigurationBuilder SetupRetryDurableLatestConsumptionM handlers => handlers .WithHandlerLifetime(InstanceLifetime.Transient) .AddHandler())) - .WithQueuePollingJobConfiguration( + .WithPollingJobsConfiguration( configure => configure - .Enabled(true) - .WithId("custom_search_key_durable_latest_consumption_mongo_db") - .WithCronExpression("0/30 * * ? * * *") - .WithExpirationIntervalFactor(1) - .WithFetchSize(256)) + .WithSchedulerId("custom_search_key_durable_latest_consumption_mongo_db") + .WithRetryDurablePollingConfiguration( + configure => configure + .Enabled(true) + .WithCronExpression("0/30 * * ? * * *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(256)) + ) .WithMongoDbDataProvider( mongoDbConnectionString, mongoDbDatabaseName, @@ -308,13 +317,17 @@ internal static IClusterConfigurationBuilder SetupRetryDurableLatestConsumptionS handlers => handlers .WithHandlerLifetime(InstanceLifetime.Transient) .AddHandler())) - .WithQueuePollingJobConfiguration( + .WithPollingJobsConfiguration( configure => configure - .Enabled(true) - .WithId("custom_search_key_durable_latest_consumption_sql_server") - .WithCronExpression("0/30 * * ? * * *") - .WithExpirationIntervalFactor(1) - .WithFetchSize(256)) + .WithSchedulerId("custom_search_key_durable_latest_consumption_sql_server") + .WithRetryDurablePollingConfiguration( + configure => configure + .Enabled(true) + .WithCronExpression("0/30 * * ? * * *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(256)) + ) + .WithSqlServerDataProvider( sqlServerConnectionString, sqlServerDatabaseName) diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/SqlServerRepository.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/SqlServerRepository.cs index 8339ce3e..bf5fefa5 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/SqlServerRepository.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/SqlServerRepository.cs @@ -98,8 +98,7 @@ public async Task CreateQueueAsync(RetryQueue queue) DomainRetryQueueId = queue.Id, Status = item.Status, SeverityLevel = item.SeverityLevel, - Description = item.Description, - Sort = item.Sort // TODO: FIX-30: remove this after fix https://github.com/Farfetch/kafkaflow-retry-extensions/issues/30 + Description = item.Description }; var itemId = await this.retryQueueItemRepository.AddAsync(dbConnection, itemDbo); diff --git a/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/DeleteQueuesTests.cs b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/DeleteQueuesTests.cs new file mode 100644 index 00000000..9663b443 --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/RepositoryTests/RetryQueueDataProviderTests/DeleteQueuesTests.cs @@ -0,0 +1,169 @@ +namespace KafkaFlow.Retry.IntegrationTests.RepositoryTests.RetryQueueDataProviderTests +{ + using System; + using System.Linq; + using System.Threading.Tasks; + using FluentAssertions; + using KafkaFlow.Retry.Durable.Repository.Actions.Delete; + using KafkaFlow.Retry.Durable.Repository.Model; + using KafkaFlow.Retry.IntegrationTests.Core.Bootstrappers.Fixtures; + using KafkaFlow.Retry.IntegrationTests.Core.Storages; + using KafkaFlow.Retry.IntegrationTests.Core.Storages.Repositories; + using Xunit; + + public class DeleteQueuesTests : RetryQueueDataProviderTestsTemplate + { + private const RetryQueueStatus QueueStatusToDelete = RetryQueueStatus.Done; + private const RetryQueueStatus QueueStatusToKeep = RetryQueueStatus.Active; + private const string SearchGroupKeyToDelete = "SearchGroupKey-RepositoryTests-DeleteQueues"; + + public DeleteQueuesTests(BootstrapperRepositoryFixture bootstrapperRepositoryFixture) + : base(bootstrapperRepositoryFixture) + { + } + + [Theory] + [InlineData(RepositoryType.MongoDb)] + [InlineData(RepositoryType.SqlServer)] + public async Task DeleteQueuesAsync_TestingMaxRowsToDelete_DeleteAllEligibleQueuesAfterTwoDeletions(RepositoryType repositoryType) + { + // Arrange + var repository = this.GetRepository(repositoryType); + + var maxRowsToDelete = 2; + var maxLastExecutionDateToBeKept = new DateTime(2023, 2, 10, 12, 0, 0); + + var queuesInput = new[] { + new DeleteQueueTestInput + { + EligibleToDelete = true, + SearchGroupKey = SearchGroupKeyToDelete, + QueueStatus = QueueStatusToDelete, + LastExecutionDate = new DateTime(2023, 2, 9, 0, 0, 0) + }, + new DeleteQueueTestInput + { + EligibleToDelete = true, + SearchGroupKey = SearchGroupKeyToDelete, + QueueStatus = QueueStatusToDelete, + LastExecutionDate = new DateTime(2023, 2, 10, 11, 59, 59) + }, + new DeleteQueueTestInput + { + EligibleToDelete = true, + SearchGroupKey = SearchGroupKeyToDelete, + QueueStatus = QueueStatusToDelete, + LastExecutionDate = new DateTime(2021, 2, 10) + } + }; + + await this.CreateQueuesAsync(repository, queuesInput); + + var deleteQueuesInput = new DeleteQueuesInput(SearchGroupKeyToDelete, QueueStatusToDelete, maxLastExecutionDateToBeKept, maxRowsToDelete); + + // Act + var result1 = await repository.RetryQueueDataProvider.DeleteQueuesAsync(deleteQueuesInput); + var result2 = await repository.RetryQueueDataProvider.DeleteQueuesAsync(deleteQueuesInput); + + // Assert + result1.Should().NotBeNull(); + result1.TotalQueuesDeleted.Should().Be(2); + + result2.Should().NotBeNull(); + result2.TotalQueuesDeleted.Should().Be(1); + } + + [Theory] + [InlineData(RepositoryType.MongoDb)] + [InlineData(RepositoryType.SqlServer)] + public async Task DeleteQueuesAsync_WithSeveralScenarios_DeleteAllEligibleQueues(RepositoryType repositoryType) + { + // Arrange + var repository = this.GetRepository(repositoryType); + + var maxLastExecutionDateToBeKept = new DateTime(2023, 2, 10, 12, 0, 0); + + var queuesInput = new[] { + new DeleteQueueTestInput + { + EligibleToDelete = true, + SearchGroupKey = SearchGroupKeyToDelete, + QueueStatus = QueueStatusToDelete, + LastExecutionDate = new DateTime(2023, 2, 9, 0, 0, 0) + }, + new DeleteQueueTestInput + { + EligibleToDelete = true, + SearchGroupKey = SearchGroupKeyToDelete, + QueueStatus = QueueStatusToDelete, + LastExecutionDate = new DateTime(2023, 2, 10, 11, 59, 59) + }, + new DeleteQueueTestInput + { + EligibleToDelete = false, + SearchGroupKey = SearchGroupKeyToDelete, + QueueStatus = QueueStatusToDelete, + LastExecutionDate = new DateTime(2023, 2, 10, 12, 0, 0) + }, + new DeleteQueueTestInput + { + EligibleToDelete = false, + SearchGroupKey = SearchGroupKeyToDelete, + QueueStatus = QueueStatusToKeep, + LastExecutionDate = new DateTime(2022, 12, 9) + }, + new DeleteQueueTestInput + { + EligibleToDelete = false, + SearchGroupKey = SearchGroupKeyToDelete, + QueueStatus = QueueStatusToKeep, + LastExecutionDate = new DateTime(2023, 2, 13) + }, + new DeleteQueueTestInput + { + EligibleToDelete = false, + SearchGroupKey = "OtherSearchGroupKey", + QueueStatus = QueueStatusToDelete, + LastExecutionDate = new DateTime(2023, 2, 9) + } + }; + + await this.CreateQueuesAsync(repository, queuesInput); + + var deleteQueuesInput = new DeleteQueuesInput(SearchGroupKeyToDelete, QueueStatusToDelete, maxLastExecutionDateToBeKept, 100); + + // Act + var result = await repository.RetryQueueDataProvider.DeleteQueuesAsync(deleteQueuesInput); + + // Assert + result.Should().NotBeNull(); + result.TotalQueuesDeleted.Should().Be(queuesInput.Count(i => i.EligibleToDelete)); + } + + private async Task CreateQueuesAsync(IRepository repository, DeleteQueueTestInput[] queuesInput) + { + foreach (var queueInput in queuesInput) + { + var queue = new RetryQueueBuilder() + .WithSearchGroupKey(queueInput.SearchGroupKey) + .WithStatus(queueInput.QueueStatus) + .WithLastExecution(queueInput.LastExecutionDate) + .WithDefaultItem() + .Build(); + + await repository.CreateQueueAsync(queue); + } + } + + private class DeleteQueueTestInput + { + public bool EligibleToDelete { get; set; } + + public DateTime LastExecutionDate { get; set; } + + public RetryQueueStatus QueueStatus { get; set; } + + public string SearchGroupKey { get; set; } + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.MongoDb/Repositories/IRetryQueueItemRepository.cs b/src/KafkaFlow.Retry.MongoDb/Repositories/IRetryQueueItemRepository.cs index b4fa080a..0e9e8039 100644 --- a/src/KafkaFlow.Retry.MongoDb/Repositories/IRetryQueueItemRepository.cs +++ b/src/KafkaFlow.Retry.MongoDb/Repositories/IRetryQueueItemRepository.cs @@ -13,6 +13,8 @@ internal interface IRetryQueueItemRepository { Task AnyItemStillActiveAsync(Guid retryQueueId); + Task DeleteItemsAsync(IEnumerable queueIds); + Task GetItemAsync(Guid itemId); Task> GetItemsAsync( diff --git a/src/KafkaFlow.Retry.MongoDb/Repositories/IRetryQueueRepository.cs b/src/KafkaFlow.Retry.MongoDb/Repositories/IRetryQueueRepository.cs index 695b9357..d2eedca6 100644 --- a/src/KafkaFlow.Retry.MongoDb/Repositories/IRetryQueueRepository.cs +++ b/src/KafkaFlow.Retry.MongoDb/Repositories/IRetryQueueRepository.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; + using KafkaFlow.Retry.Durable.Repository.Actions.Delete; using KafkaFlow.Retry.Durable.Repository.Actions.Read; using KafkaFlow.Retry.Durable.Repository.Model; using KafkaFlow.Retry.MongoDb.Model; @@ -10,8 +11,12 @@ internal interface IRetryQueueRepository { + Task DeleteQueuesAsync(IEnumerable queueIds); + Task GetQueueAsync(string queueGroupKey); + Task> GetQueuesToDeleteAsync(string searchGroupKey, RetryQueueStatus status, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete); + Task> GetTopSortedQueuesAsync(RetryQueueStatus status, GetQueuesSortOption sortOption, string searchGroupKey, int top); Task UpdateLastExecutionAsync(Guid queueId, DateTime lastExecution); diff --git a/src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueItemRepository.cs b/src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueItemRepository.cs index 07370141..74b62620 100644 --- a/src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueItemRepository.cs +++ b/src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueItemRepository.cs @@ -9,28 +9,18 @@ using KafkaFlow.Retry.Durable.Repository.Actions.Read; using KafkaFlow.Retry.Durable.Repository.Actions.Update; using KafkaFlow.Retry.Durable.Repository.Model; - using KafkaFlow.Retry.MongoDb.Adapters; - using KafkaFlow.Retry.MongoDb.Adapters.Interfaces; using KafkaFlow.Retry.MongoDb.Model; - using KafkaFlow.Retry.MongoDb.Model.Factories; using MongoDB.Driver; internal class RetryQueueItemRepository : IRetryQueueItemRepository { private readonly DbContext dbContext; - private readonly IQueuesAdapter queuesAdapter; - private readonly RetryQueueItemDboFactory retryQueueItemDboFactory; public RetryQueueItemRepository(DbContext dbContext) { Guard.Argument(dbContext).NotNull(); this.dbContext = dbContext; - - var messageAdapter = new MessageAdapter(new HeaderAdapter()); - - this.retryQueueItemDboFactory = new RetryQueueItemDboFactory(messageAdapter); - this.queuesAdapter = new QueuesAdapter(new ItemAdapter(messageAdapter)); } public async Task AnyItemStillActiveAsync(Guid retryQueueId) @@ -45,6 +35,15 @@ public async Task AnyItemStillActiveAsync(Guid retryQueueId) return itemsDbo.Any(); } + public async Task DeleteItemsAsync(IEnumerable queueIds) + { + var itemsFilterBuilder = this.dbContext.RetryQueueItems.GetFilters(); + + var deleteFilter = itemsFilterBuilder.In(i => i.RetryQueueId, queueIds); + + await this.dbContext.RetryQueueItems.DeleteManyAsync(deleteFilter).ConfigureAwait(false); + } + public async Task GetItemAsync(Guid itemId) { var queueItemsFilterBuilder = this.dbContext.RetryQueueItems.GetFilters(); diff --git a/src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueRepository.cs b/src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueRepository.cs index f8b8e8ce..47cae8f0 100644 --- a/src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueRepository.cs +++ b/src/KafkaFlow.Retry.MongoDb/Repositories/RetryQueueRepository.cs @@ -2,32 +2,35 @@ { using System; using System.Collections.Generic; + using System.Linq; using System.Threading.Tasks; using Dawn; + using KafkaFlow.Retry.Durable.Repository.Actions.Delete; using KafkaFlow.Retry.Durable.Repository.Actions.Read; using KafkaFlow.Retry.Durable.Repository.Model; - using KafkaFlow.Retry.MongoDb.Adapters; - using KafkaFlow.Retry.MongoDb.Adapters.Interfaces; using KafkaFlow.Retry.MongoDb.Model; - using KafkaFlow.Retry.MongoDb.Model.Factories; using MongoDB.Driver; internal class RetryQueueRepository : IRetryQueueRepository { private readonly DbContext dbContext; - private readonly IQueuesAdapter queuesAdapter; - private readonly RetryQueueItemDboFactory retryQueueItemDboFactory; public RetryQueueRepository(DbContext dbContext) { Guard.Argument(dbContext).NotNull(); this.dbContext = dbContext; + } + + public async Task DeleteQueuesAsync(IEnumerable queueIds) + { + var queuesFilterBuilder = this.dbContext.RetryQueues.GetFilters(); + + var deleteFilter = queuesFilterBuilder.In(q => q.Id, queueIds); - var messageAdapter = new MessageAdapter(new HeaderAdapter()); + var deleteResult = await this.dbContext.RetryQueues.DeleteManyAsync(deleteFilter).ConfigureAwait(false); - this.retryQueueItemDboFactory = new RetryQueueItemDboFactory(messageAdapter); - this.queuesAdapter = new QueuesAdapter(new ItemAdapter(messageAdapter)); + return new DeleteQueuesResult(this.GetDeletedCount(deleteResult)); } public async Task GetQueueAsync(string queueGroupKey) @@ -39,6 +42,24 @@ public async Task GetQueueAsync(string queueGroupKey) return await this.dbContext.RetryQueues.GetOneAsync(queuesFilter).ConfigureAwait(false); } + public async Task> GetQueuesToDeleteAsync(string searchGroupKey, RetryQueueStatus status, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete) + { + var queuesFilterBuilder = this.dbContext.RetryQueues.GetFilters(); + + var findFilter = queuesFilterBuilder.Eq(q => q.SearchGroupKey, searchGroupKey) + & queuesFilterBuilder.Eq(q => q.Status, status) + & queuesFilterBuilder.Lt(q => q.LastExecution, maxLastExecutionDateToBeKept); + + var options = new FindOptions + { + Limit = maxRowsToDelete + }; + + var queuesToDelete = await this.dbContext.RetryQueues.GetAsync(findFilter, options).ConfigureAwait(false); + + return queuesToDelete.Select(q => q.Id); + } + public async Task> GetTopSortedQueuesAsync(RetryQueueStatus status, GetQueuesSortOption sortOption, string searchGroupKey, int top) { var queuesFilterBuilder = this.dbContext.RetryQueues.GetFilters(); @@ -103,5 +124,10 @@ public async Task UpdateStatusAsync(Guid queueId, RetryQueueStatus return await this.dbContext.RetryQueues.UpdateOneAsync(filter, update).ConfigureAwait(false); } + + private int GetDeletedCount(DeleteResult deleteResult) + { + return deleteResult.IsAcknowledged ? Convert.ToInt32(deleteResult.DeletedCount) : 0; + } } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.MongoDb/RetryQueueDataProvider.cs b/src/KafkaFlow.Retry.MongoDb/RetryQueueDataProvider.cs index 10c64f4f..b00ba607 100644 --- a/src/KafkaFlow.Retry.MongoDb/RetryQueueDataProvider.cs +++ b/src/KafkaFlow.Retry.MongoDb/RetryQueueDataProvider.cs @@ -7,6 +7,7 @@ using Dawn; using KafkaFlow.Retry.Durable.Repository; using KafkaFlow.Retry.Durable.Repository.Actions.Create; + using KafkaFlow.Retry.Durable.Repository.Actions.Delete; using KafkaFlow.Retry.Durable.Repository.Actions.Read; using KafkaFlow.Retry.Durable.Repository.Actions.Update; using KafkaFlow.Retry.Durable.Repository.Model; @@ -102,6 +103,27 @@ public async Task CheckQueuePendingItemsAsync(QueuePend return new QueuePendingItemsResult(QueuePendingItemsResultStatus.NoPendingItems); } + public async Task DeleteQueuesAsync(DeleteQueuesInput input) + { + Guard.Argument(input, nameof(input)).NotNull(); + + var queueIdsToDelete = await this.retryQueueRepository + .GetQueuesToDeleteAsync( + input.SearchGroupKey, + input.RetryQueueStatus, + input.MaxLastExecutionDateToBeKept, + input.MaxRowsToDelete) + .ConfigureAwait(false); + + await this.retryQueueItemRepository + .DeleteItemsAsync(queueIdsToDelete) + .ConfigureAwait(false); + + return await this.retryQueueRepository + .DeleteQueuesAsync(queueIdsToDelete) + .ConfigureAwait(false); + } + public async Task GetQueuesAsync(GetQueuesInput input) { Guard.Argument(input, nameof(input)).NotNull(); diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs index 7a3d956a..7f63f5cc 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/IRetryQueueRepository.cs @@ -11,6 +11,8 @@ internal interface IRetryQueueRepository { Task AddAsync(IDbConnection dbConnection, RetryQueueDbo retryQueueDbo); + Task DeleteQueuesAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete); + Task ExistsActiveAsync(IDbConnection dbConnection, string queueGroupKey); Task GetQueueAsync(IDbConnection dbConnection, string queueGroupKey); @@ -23,4 +25,4 @@ internal interface IRetryQueueRepository Task UpdateStatusAsync(IDbConnection dbConnection, Guid idDomain, RetryQueueStatus retryQueueStatus); } -} +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs index d9c54bfb..54e5d3ea 100644 --- a/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs +++ b/src/KafkaFlow.Retry.SqlServer/Repositories/RetryQueueRepository.cs @@ -33,6 +33,31 @@ public async Task AddAsync(IDbConnection dbConnection, RetryQueueDbo retry } } + public async Task DeleteQueuesAsync(IDbConnection dbConnection, string searchGroupKey, RetryQueueStatus retryQueueStatus, DateTime maxLastExecutionDateToBeKept, int maxRowsToDelete) + { + using (var command = dbConnection.CreateCommand()) + { + command.CommandType = System.Data.CommandType.Text; + command.CommandText = @"DELETE FROM [RetryQueues] WHERE Id IN + ( + SELECT Id FROM [RetryQueues] rq + WHERE rq.SearchGroupKey = @SearchGroupKey + AND rq.LastExecution < @MaxLastExecutionDateToBeKept + AND rq.IdStatus = @IdStatus + ORDER BY 1 + OFFSET 0 ROWS + FETCH NEXT @MaxRowsToDelete ROWS ONLY + )"; + + command.Parameters.AddWithValue("SearchGroupKey", searchGroupKey); + command.Parameters.AddWithValue("MaxLastExecutionDateToBeKept", maxLastExecutionDateToBeKept); + command.Parameters.AddWithValue("IdStatus", (byte)retryQueueStatus); + command.Parameters.AddWithValue("MaxRowsToDelete", maxRowsToDelete); + + return await command.ExecuteNonQueryAsync().ConfigureAwait(false); + } + } + public async Task ExistsActiveAsync(IDbConnection dbConnection, string queueGroupKey) { using (var command = dbConnection.CreateCommand()) @@ -197,4 +222,4 @@ private string GetOrderByCommandString(GetQueuesSortOption sortOption) } } } -} +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.SqlServer/RetryQueueDataProvider.cs b/src/KafkaFlow.Retry.SqlServer/RetryQueueDataProvider.cs index d55e9231..e209adea 100644 --- a/src/KafkaFlow.Retry.SqlServer/RetryQueueDataProvider.cs +++ b/src/KafkaFlow.Retry.SqlServer/RetryQueueDataProvider.cs @@ -7,6 +7,7 @@ using Dawn; using KafkaFlow.Retry.Durable.Repository; using KafkaFlow.Retry.Durable.Repository.Actions.Create; + using KafkaFlow.Retry.Durable.Repository.Actions.Delete; using KafkaFlow.Retry.Durable.Repository.Actions.Read; using KafkaFlow.Retry.Durable.Repository.Actions.Update; using KafkaFlow.Retry.Durable.Repository.Model; @@ -105,6 +106,24 @@ public async Task CheckQueuePendingItemsAsync(QueuePend } } + public async Task DeleteQueuesAsync(DeleteQueuesInput input) + { + Guard.Argument(input, nameof(input)).NotNull(); + + using (var dbConnection = this.connectionProvider.Create(this.sqlServerDbSettings)) + { + var totalQueuesDeleted = await this.retryQueueRepository.DeleteQueuesAsync( + dbConnection, + input.SearchGroupKey, + input.RetryQueueStatus, + input.MaxLastExecutionDateToBeKept, + input.MaxRowsToDelete) + .ConfigureAwait(false); + + return new DeleteQueuesResult(totalQueuesDeleted); + } + } + public async Task GetQueuesAsync(GetQueuesInput input) { Guard.Argument(input, nameof(input)).NotNull(); diff --git a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Definitions/Polling/CleanupPollingDefinitionTests.cs b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Definitions/Polling/CleanupPollingDefinitionTests.cs new file mode 100644 index 00000000..085786fe --- /dev/null +++ b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Definitions/Polling/CleanupPollingDefinitionTests.cs @@ -0,0 +1,53 @@ +namespace KafkaFlow.Retry.UnitTests.KafkaFlow.Retry.Durable.Definitions.Polling +{ + using System; + using FluentAssertions; + using global::KafkaFlow.Retry.Durable.Definitions.Polling; + using Xunit; + + public class CleanupPollingDefinitionTests + { + [Theory] + [InlineData("x", 30, 300, typeof(ArgumentException))] + [InlineData("0 0/1 * 1/1 * ? *", 0, 300, typeof(ArgumentOutOfRangeException))] + [InlineData("0 0/1 * 1/1 * ? *", 30, -10, typeof(ArgumentOutOfRangeException))] + [InlineData("0 0/1 * 1/1 * ? *", 30, 0, typeof(ArgumentOutOfRangeException))] + public void CleanupPollingDefinition_Ctor_Enabled_ThrowsExpectedException( + string cronExpression, + int timeToLiveInDays, + int rowsPerRequest, + Type expectedExceptionType) + { + // Act + Action act = () => new CleanupPollingDefinition( + enabled: true, + cronExpression, + timeToLiveInDays, + rowsPerRequest); + + // Assert + Assert.Throws(expectedExceptionType, act); + } + + [Theory] + [InlineData("x", 30, 300)] + [InlineData("0 0/1 * 1/1 * ? *", 0, 300)] + [InlineData("0 0/1 * 1/1 * ? *", 30, -10)] + [InlineData("0 0/1 * 1/1 * ? *", 30, 0)] + public void CleanupPollingDefinition_Ctor_NotEnabled_DoesNotThrowsExceptionWithInvalidParams( + string cronExpression, + int timeToLiveInDays, + int rowsPerRequest) + { + // Act + var actualPollingDefinition = new CleanupPollingDefinition( + enabled: false, + cronExpression, + timeToLiveInDays, + rowsPerRequest); + + // Assert + actualPollingDefinition.Should().NotBeNull(); + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Definitions/RetryDurablePollingDefinitionTests.cs b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Definitions/Polling/RetryDurablePollingDefinitionTests.cs similarity index 53% rename from src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Definitions/RetryDurablePollingDefinitionTests.cs rename to src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Definitions/Polling/RetryDurablePollingDefinitionTests.cs index 24e1ee8f..052ab4cc 100644 --- a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Definitions/RetryDurablePollingDefinitionTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Definitions/Polling/RetryDurablePollingDefinitionTests.cs @@ -1,57 +1,40 @@ -namespace KafkaFlow.Retry.UnitTests.KafkaFlow.Retry.Durable.Definitions +namespace KafkaFlow.Retry.UnitTests.KafkaFlow.Retry.Durable.Definitions.Polling { using System; using FluentAssertions; - using global::KafkaFlow.Retry.Durable.Definitions; + using global::KafkaFlow.Retry.Durable.Definitions.Polling; using Xunit; public class RetryDurablePollingDefinitionTests { [Fact] - public void RetryDurablePollingDefinition__Ctor_WithArgumentNullException_ThrowsException() + public void RetryDurablePollingDefinition_Ctor_WithArgumentException_ThrowsException() { // Act Action act = () => new RetryDurablePollingDefinition( - enabled: false, + enabled: true, cronExpression: "", fetchSize: 1, - expirationIntervalFactor: 2, - id: null); + expirationIntervalFactor: 2); // Assert - act.Should().Throw(); + act.Should().Throw(); } [Theory] [InlineData(-1, 2)] [InlineData(1, -2)] - public void RetryDurablePollingDefinition__Ctor_WithArgumentOutOfRangeException_ThrowsException(int fetchSize, int expirationIntervalFactor) + public void RetryDurablePollingDefinition_Ctor_WithArgumentOutOfRangeException_ThrowsException(int fetchSize, int expirationIntervalFactor) { // Act Action act = () => new RetryDurablePollingDefinition( enabled: false, cronExpression: "x", fetchSize, - expirationIntervalFactor, - id: "id"); + expirationIntervalFactor); // Assert act.Should().Throw(); } - - [Fact] - public void RetryDurablePollingDefinition_Ctor_WithArgumentException_ThrowsException() - { - // Act - Action act = () => new RetryDurablePollingDefinition( - enabled: true, - cronExpression: "", - fetchSize: 1, - expirationIntervalFactor: 2, - id: "id"); - - // Assert - act.Should().Throw(); - } } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Definitions/RetryDurableRetryPlanBeforeDefinitionTests.cs b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Definitions/RetryDurableRetryPlanBeforeDefinitionTests.cs index 57b27656..40a851b4 100644 --- a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Definitions/RetryDurableRetryPlanBeforeDefinitionTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Definitions/RetryDurableRetryPlanBeforeDefinitionTests.cs @@ -8,7 +8,7 @@ public class RetryDurableRetryPlanBeforeDefinitionTests { - public static readonly IEnumerable DataTest = new List + public static IEnumerable DataTest() => new List { new object[] { diff --git a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/JobDataProvidersFactoryTests.cs b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/JobDataProvidersFactoryTests.cs new file mode 100644 index 00000000..6437954d --- /dev/null +++ b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/JobDataProvidersFactoryTests.cs @@ -0,0 +1,68 @@ +namespace KafkaFlow.Retry.UnitTests.KafkaFlow.Retry.Durable.Polling +{ + using System; + using FluentAssertions; + using global::KafkaFlow.Retry.Durable.Definitions.Polling; + using global::KafkaFlow.Retry.Durable.Encoders; + using global::KafkaFlow.Retry.Durable.Polling; + using global::KafkaFlow.Retry.Durable.Repository; + using global::KafkaFlow.Retry.Durable.Repository.Adapters; + using Moq; + using Quartz; + using Xunit; + + public class JobDataProvidersFactoryTests + { + private static readonly PollingDefinitionsAggregator pollingDefinitionsAggregator = + new PollingDefinitionsAggregator( + "id", + new PollingDefinition[] + { + new RetryDurablePollingDefinition(true, "*/30 * * ? * *", 10, 100), + new CleanupPollingDefinition(true, "*/30 * * ? * *", 10, 100) + } + ); + + [Fact] + public void JobDataProvidersFactory_Create_Success() + { + // Arrange + var mockTriggerProvider = new Mock(); + mockTriggerProvider + .Setup(m => m.GetPollingTrigger(It.IsAny(), It.IsAny())) + .Returns(Mock.Of()); + + var factory = new JobDataProvidersFactory( + pollingDefinitionsAggregator, + mockTriggerProvider.Object, + Mock.Of(), + Mock.Of(), + Mock.Of()); + + // Act + var jobDataProviders = factory.Create(Mock.Of(), Mock.Of()); + + // Arrange + jobDataProviders.Should().NotBeNull(); + } + + [Theory] + [InlineData(typeof(PollingDefinitionsAggregator))] + [InlineData(typeof(ITriggerProvider))] + [InlineData(typeof(IRetryDurableQueueRepository))] + [InlineData(typeof(IMessageHeadersAdapter))] + [InlineData(typeof(IUtf8Encoder))] + public void JobDataProvidersFactory_Ctor_WithArgumentNull_ThrowsException(Type nullType) + { + // Arrange & Act + Action act = () => new JobDataProvidersFactory( + nullType == typeof(PollingDefinitionsAggregator) ? null : pollingDefinitionsAggregator, + nullType == typeof(ITriggerProvider) ? null : Mock.Of(), + nullType == typeof(IRetryDurableQueueRepository) ? null : Mock.Of(), + nullType == typeof(IMessageHeadersAdapter) ? null : Mock.Of(), + nullType == typeof(IUtf8Encoder) ? null : Mock.Of()); + // Assert + act.Should().Throw(); + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/Jobs/CleanupPollingJobTests.cs b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/Jobs/CleanupPollingJobTests.cs new file mode 100644 index 00000000..d7986d39 --- /dev/null +++ b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/Jobs/CleanupPollingJobTests.cs @@ -0,0 +1,86 @@ +namespace KafkaFlow.Retry.UnitTests.KafkaFlow.Retry.Durable.Polling.Jobs +{ + using System.Collections.Generic; + using System.Threading.Tasks; + using global::KafkaFlow.Retry.Durable; + using global::KafkaFlow.Retry.Durable.Definitions.Polling; + using global::KafkaFlow.Retry.Durable.Polling.Jobs; + using global::KafkaFlow.Retry.Durable.Repository; + using global::KafkaFlow.Retry.Durable.Repository.Actions.Delete; + using Moq; + using Quartz; + using Xunit; + + public class CleanupPollingJobTests + { + private const string SchedulerId = "schedulerIdTest"; + private static readonly CleanupPollingDefinition cleaunupPollingDefinition = new CleanupPollingDefinition(true, "0 0 14-6 ? * FRI-MON", 1, 10); + private readonly IJob job = new CleanupPollingJob(); + private readonly Mock mockIJobDetail = new Mock(); + private readonly Mock mockITrigger = new Mock(); + private readonly Mock mockJobExecutionContext = new Mock(); + private readonly Mock mockLogHandler = new Mock(); + private readonly Mock mockRetryDurableQueueRepository = new Mock(); + + public CleanupPollingJobTests() + { + mockJobExecutionContext + .Setup(d => d.JobDetail) + .Returns(mockIJobDetail.Object); + + mockITrigger + .SetupGet(t => t.Key) + .Returns(new TriggerKey(string.Empty)); + + mockJobExecutionContext + .Setup(d => d.Trigger) + .Returns(mockITrigger.Object); + + IDictionary jobData = new Dictionary + { + { "RetryDurableQueueRepository", mockRetryDurableQueueRepository.Object }, + { "CleanupPollingDefinition", cleaunupPollingDefinition}, + { "LogHandler", mockLogHandler.Object }, + { "SchedulerId", SchedulerId } + }; + + mockIJobDetail + .SetupGet(jd => jd.JobDataMap) + .Returns(new JobDataMap(jobData)); + } + + [Fact] + public async Task CleanupPollingJob_Execute_RetryDurableQueueRepositoryFailed_LogError() + { + // Arrange + mockRetryDurableQueueRepository + .Setup(d => d.DeleteQueuesAsync(It.IsAny())) + .Throws(new RetryDurableException(new RetryError(RetryErrorCode.Consumer_BlockedException), "error")); + + // Act + await job.Execute(mockJobExecutionContext.Object); + + //Assert + mockLogHandler.Verify(d => d.Info(It.IsAny(), It.IsAny()), Times.Once); + mockLogHandler.Verify(d => d.Error(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + mockRetryDurableQueueRepository.Verify(d => d.DeleteQueuesAsync(It.IsAny()), Times.Once); + } + + [Fact] + public async Task CleanupPollingJob_Execute_Success() + { + // Arrange + mockRetryDurableQueueRepository + .Setup(d => d.DeleteQueuesAsync(It.IsAny())) + .ReturnsAsync(new DeleteQueuesResult(1)); + + // Act + await job.Execute(mockJobExecutionContext.Object); + + //Assert + mockLogHandler.Verify(d => d.Info(It.IsAny(), It.IsAny()), Times.Exactly(2)); + mockLogHandler.Verify(d => d.Error(It.IsAny(), It.IsAny(), It.IsAny()), Times.Never); + mockRetryDurableQueueRepository.Verify(d => d.DeleteQueuesAsync(It.IsAny()), Times.Once); + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueuePollingJobTests.cs b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurablePollingJobTests.cs similarity index 92% rename from src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueuePollingJobTests.cs rename to src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurablePollingJobTests.cs index ba05428c..94326892 100644 --- a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueuePollingJobTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurablePollingJobTests.cs @@ -1,13 +1,13 @@ -namespace KafkaFlow.Retry.UnitTests.KafkaFlow.Retry.Durable.Polling +namespace KafkaFlow.Retry.UnitTests.KafkaFlow.Retry.Durable.Polling.Jobs { using System; using System.Collections.Generic; using System.Threading.Tasks; using global::KafkaFlow.Retry.Durable; using global::KafkaFlow.Retry.Durable.Common; - using global::KafkaFlow.Retry.Durable.Definitions; + using global::KafkaFlow.Retry.Durable.Definitions.Polling; using global::KafkaFlow.Retry.Durable.Encoders; - using global::KafkaFlow.Retry.Durable.Polling; + using global::KafkaFlow.Retry.Durable.Polling.Jobs; using global::KafkaFlow.Retry.Durable.Repository; using global::KafkaFlow.Retry.Durable.Repository.Actions.Read; using global::KafkaFlow.Retry.Durable.Repository.Actions.Update; @@ -17,10 +17,11 @@ using Quartz; using Xunit; - public class QueuePollingJobTests + public class RetryDurablePollingJobTests { - private static readonly RetryDurablePollingDefinition retryDurablePollingDefinition = new RetryDurablePollingDefinition(true, "0 0 14-6 ? * FRI-MON", 1, 1, "id"); - private readonly IJob job = new QueuePollingJob(); + private const string SchedulerId = "schedulerIdTest"; + private static readonly RetryDurablePollingDefinition retryDurablePollingDefinition = new RetryDurablePollingDefinition(true, "0 0 14-6 ? * FRI-MON", 1, 1); + private readonly IJob job = new RetryDurablePollingJob(); private readonly Mock jobExecutionContext = new Mock(); private readonly Mock logHandler = new Mock(); private readonly Mock messageAdapter = new Mock(); @@ -31,7 +32,7 @@ public class QueuePollingJobTests private readonly Mock retryDurableQueueRepository = new Mock(); private readonly Mock utf8Encoder = new Mock(); - public QueuePollingJobTests() + public RetryDurablePollingJobTests() { jobExecutionContext .Setup(d => d.JobDetail) @@ -47,7 +48,7 @@ public QueuePollingJobTests() } [Fact] - public async Task QueuePollingJob_Execute_ProduceMessageFailed_LogError() + public async Task RetryDurablePollingJob_Execute_ProduceMessageFailed_LogError() { // Arrange retryDurableQueueRepository @@ -89,6 +90,7 @@ public async Task QueuePollingJob_Execute_ProduceMessageFailed_LogError() { "MessageHeadersAdapter", messageHeadersAdapter.Object }, { "MessageAdapter", messageAdapter.Object }, { "Utf8Encoder", utf8Encoder.Object }, + { "SchedulerId", SchedulerId } }; mockIJobDetail @@ -109,7 +111,7 @@ public async Task QueuePollingJob_Execute_ProduceMessageFailed_LogError() } [Fact] - public async Task QueuePollingJob_Execute_RetryDurableQueueRepositoryFailed_LogError() + public async Task RetryDurablePollingJob_Execute_RetryDurableQueueRepositoryFailed_LogError() { // Arrange retryDurableQueueRepository @@ -125,6 +127,7 @@ public async Task QueuePollingJob_Execute_RetryDurableQueueRepositoryFailed_LogE { "MessageHeadersAdapter", messageHeadersAdapter.Object }, { "MessageAdapter", messageAdapter.Object }, { "Utf8Encoder", utf8Encoder.Object }, + { "SchedulerId", SchedulerId } }; mockIJobDetail @@ -143,7 +146,7 @@ public async Task QueuePollingJob_Execute_RetryDurableQueueRepositoryFailed_LogE } [Fact] - public async Task QueuePollingJob_Execute_Success() + public async Task RetryDurablePollingJob_Execute_Success() { // Arrange retryDurableQueueRepository @@ -184,6 +187,7 @@ public async Task QueuePollingJob_Execute_Success() { "MessageHeadersAdapter", messageHeadersAdapter.Object }, { "MessageAdapter", messageAdapter.Object }, { "Utf8Encoder", utf8Encoder.Object }, + { "SchedulerId", SchedulerId } }; mockIJobDetail .SetupGet(jd => jd.JobDataMap) diff --git a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerCoordinatorTests.cs b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerCoordinatorTests.cs index 5dc6f903..00593ea4 100644 --- a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerCoordinatorTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerCoordinatorTests.cs @@ -1,19 +1,50 @@ namespace KafkaFlow.Retry.UnitTests.KafkaFlow.Retry.Durable.Polling { using System; + using System.Threading.Tasks; using FluentAssertions; - using global::KafkaFlow.Retry.Durable.Definitions; + using global::KafkaFlow.Retry.Durable.Definitions.Polling; using global::KafkaFlow.Retry.Durable.Polling; + using global::KafkaFlow.Retry.Durable.Polling.Jobs; using Moq; using Quartz; using Xunit; public class QueueTrackerCoordinatorTests { - private readonly Mock mockILogHandler = new Mock(); - private readonly Mock mockIMessageProducer = new Mock(); - private readonly Mock queueTrackerFactory = new Mock(); - private readonly RetryDurablePollingDefinition retryDurablePollingDefinition = new RetryDurablePollingDefinition(true, "*/30 * * ? * *", 10, 100, "id"); + private readonly Mock mockJobDataProvider; + private readonly Mock mockQueueTrackerFactory; + private readonly QueueTrackerCoordinator queueTrackerCoordinator; + + public QueueTrackerCoordinatorTests() + { + var pollingDefinition = new RetryDurablePollingDefinition(true, "0 0/1 * 1/1 * ? *", 1, 1); + + this.mockJobDataProvider = new Mock(); + + this.mockJobDataProvider + .SetupGet(m => m.PollingDefinition) + .Returns(pollingDefinition); + + var mockTrigger = new Mock(); + mockTrigger + .SetupGet(m => m.Key) + .Returns(new TriggerKey("someTriggerKey")); + + this.mockJobDataProvider + .SetupGet(m => m.Trigger) + .Returns(mockTrigger.Object); + + this.mockQueueTrackerFactory = new Mock(); + mockQueueTrackerFactory + .Setup(d => d.Create(It.IsAny(), It.IsAny())) + .Returns(new QueueTracker( + "id", + new[] { this.mockJobDataProvider.Object }, + Mock.Of())); + + this.queueTrackerCoordinator = new QueueTrackerCoordinator(mockQueueTrackerFactory.Object); + } [Fact] public void QueueTrackerCoordinator_Ctor_WithArgumentNull_ThrowsException() @@ -26,68 +57,37 @@ public void QueueTrackerCoordinator_Ctor_WithArgumentNull_ThrowsException() } [Fact] - public void QueueTrackerCoordinator_ScheduleJob_Success() + public async Task QueueTrackerCoordinator_ScheduleJobs_Success() { - // Arrange - queueTrackerFactory - .Setup(d => d.Create(It.IsAny(), It.IsAny(), It.IsAny())) - .Returns(new QueueTracker( - Mock.Of(), - retryDurablePollingDefinition, - Mock.Of(), - Mock.Of() - )); - - var coordinator = new QueueTrackerCoordinator(queueTrackerFactory.Object); - // Act - coordinator.ScheduleJob(retryDurablePollingDefinition, - mockIMessageProducer.Object, - mockILogHandler.Object); + await this.queueTrackerCoordinator.ScheduleJobsAsync(Mock.Of(), Mock.Of()); //Assert - queueTrackerFactory.Verify(d => d.Create(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + this.mockQueueTrackerFactory.Verify(d => d.Create(It.IsAny(), It.IsAny()), Times.Once); + this.mockJobDataProvider.Verify(m => m.GetPollingJobDetail(), Times.Once); + this.mockJobDataProvider.Verify(m => m.Trigger, Times.Once); } [Fact] - public void QueueTrackerCoordinator_UnscheduleJob_Success() + public async Task QueueTrackerCoordinator_UnscheduleJobs_Success() { // Arrange - var mockIJobDetailProvider = new Mock(); - mockIJobDetailProvider - .Setup(x => x.GetQueuePollingJobDetail()) - .Returns(Mock.Of()); - - var mockITrigger = new Mock(); - mockITrigger - .SetupGet(x => x.Key) - .Returns(new TriggerKey(String.Empty)); - - var mockITriggerProvider = new Mock(); - mockITriggerProvider - .Setup(x => x.GetQueuePollingTrigger()) - .Returns(mockITrigger.Object); - - queueTrackerFactory - .Setup(d => d.Create(It.IsAny(), It.IsAny(), It.IsAny())) - .Returns(new QueueTracker( - Mock.Of(), - retryDurablePollingDefinition, - mockIJobDetailProvider.Object, - mockITriggerProvider.Object - )); - var coordinator = new QueueTrackerCoordinator(queueTrackerFactory.Object); + this.mockJobDataProvider + .Setup(x => x.GetPollingJobDetail()) + .Returns( + JobBuilder + .Create() + .Build()); // Act - coordinator.ScheduleJob( - retryDurablePollingDefinition, - mockIMessageProducer.Object, - mockILogHandler.Object); - coordinator.UnscheduleJob(); + await this.queueTrackerCoordinator.ScheduleJobsAsync(Mock.Of(), Mock.Of()); + await this.queueTrackerCoordinator.UnscheduleJobsAsync(); //Assert - queueTrackerFactory.Verify(d => d.Create(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + this.mockQueueTrackerFactory.Verify(d => d.Create(It.IsAny(), It.IsAny()), Times.Once); + this.mockJobDataProvider.Verify(m => m.GetPollingJobDetail(), Times.Once); + this.mockJobDataProvider.Verify(m => m.Trigger, Times.Exactly(2)); } } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerFactoryTests.cs b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerFactoryTests.cs index 6ab5b66a..3a774db8 100644 --- a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerFactoryTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerFactoryTests.cs @@ -3,62 +3,47 @@ using System; using System.Collections.Generic; using FluentAssertions; - using global::KafkaFlow.Retry.Durable.Definitions; - using global::KafkaFlow.Retry.Durable.Encoders; using global::KafkaFlow.Retry.Durable.Polling; - using global::KafkaFlow.Retry.Durable.Repository; - using global::KafkaFlow.Retry.Durable.Repository.Adapters; using Moq; using Xunit; public class QueueTrackerFactoryTests { - public static readonly IEnumerable DataTest = new List + public static IEnumerable DataTest() => new List { new object[] { null, - Mock.Of() , - Mock.Of() , - Mock.Of() , + Mock.Of(), + typeof(ArgumentNullException) }, new object[] { - Mock.Of(), - null , - Mock.Of() , - Mock.Of() + string.Empty, + Mock.Of(), + typeof(ArgumentException) }, new object[] { - Mock.Of(), - Mock.Of() , - null , - Mock.Of() - }, - new object[] - { - Mock.Of(), - Mock.Of() , - Mock.Of() , - null + "id", + null, + typeof(ArgumentNullException) } }; - private static readonly RetryDurablePollingDefinition retryDurablePollingDefinition = new RetryDurablePollingDefinition(true, "*/30 * * ? * *", 10, 100, "id"); - [Fact] public void QueueTrackerFactory_Create_Success() { // Arrange - var factory = new QueueTrackerFactory( - Mock.Of(), - Mock.Of(), - Mock.Of(), - Mock.Of()); + var mockJobDataProvidersFactory = new Mock(); + mockJobDataProvidersFactory + .Setup(m => m.Create(It.IsAny(), It.IsAny())) + .Returns(new[] { Mock.Of() }); + + var factory = new QueueTrackerFactory("id", mockJobDataProvidersFactory.Object); // Act - var queueTracker = factory.Create(retryDurablePollingDefinition, Mock.Of(), Mock.Of()); + var queueTracker = factory.Create(Mock.Of(), Mock.Of()); // Arrange queueTracker.Should().NotBeNull(); @@ -66,21 +51,13 @@ public void QueueTrackerFactory_Create_Success() [Theory] [MemberData(nameof(DataTest))] - public void QueueTrackerFactory_Ctor_WithArgumentNull_ThrowsException( - object retryDurableQueueRepository, - object messageHeadersAdapter, - object messageAdapter, - object utf8Encoder) + internal void QueueTrackerFactory_Ctor_WithArgumentNull_ThrowsException( + string schedulerId, + IJobDataProvidersFactory jobDataProvidersFactory, + Type expectedExceptionType) { - // Arrange & Act - Action act = () => new QueueTrackerFactory( - (IRetryDurableQueueRepository)retryDurableQueueRepository, - (IMessageHeadersAdapter)messageHeadersAdapter, - (IMessageAdapter)messageAdapter, - (IUtf8Encoder)utf8Encoder); - - // Assert - act.Should().Throw(); + // Act & Assert + Assert.Throws(expectedExceptionType, () => new QueueTrackerFactory(schedulerId, jobDataProvidersFactory)); } } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerTests.cs b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerTests.cs index 2c8fb5d6..78f56c86 100644 --- a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerTests.cs @@ -5,7 +5,7 @@ using System.Linq; using System.Threading.Tasks; using FluentAssertions; - using global::KafkaFlow.Retry.Durable.Definitions; + using global::KafkaFlow.Retry.Durable.Definitions.Polling; using global::KafkaFlow.Retry.Durable.Polling; using Moq; using Quartz; @@ -25,28 +25,85 @@ public Task Execute(IJobExecutionContext context) public class QueueTrackerTests { [Fact] - public async Task QueueTracker_ScheduleAndUnscheduleJobs_Success() + public async Task QueueTracker_ScheduleAndUnscheduleDifferentJobs_Success() { // arrange - var mockITriggerProvider1 = new Mock(); - mockITriggerProvider1 - .Setup(x => x.GetQueuePollingTrigger()) - .Returns(() => - TriggerBuilder - .Create() - .WithIdentity("Trigger1", "queueTrackerGroup") - .WithCronSchedule("*/5 * * ? * * *") - .StartNow() - .WithPriority(1) - .Build() + var schedulerId = "twoJobsSchedulerId"; + + var retryDurablePollingDefinition = + new RetryDurablePollingDefinition( + enabled: true, + cronExpression: "*/2 * * ? * * *", + fetchSize: 100, + expirationIntervalFactor: 1 + ); + + var cleanupPollingDefinition = + new CleanupPollingDefinition( + enabled: true, + cronExpression: "*/4 * * ? * * *", + timeToLiveInDays: 1, + rowsPerRequest: 10 + ); + + var retryDurableTriggerKeyName = $"Trigger_{schedulerId}_{retryDurablePollingDefinition.PollingJobType}"; + var cleanupTriggerKeyName = $"Trigger_{schedulerId}_{cleanupPollingDefinition.PollingJobType}"; + + var jobExecutionContexts = new List(); + + var mockRetryDurableJobDataProvider = new Mock(); + this.SetupMockJobDataProvider( + mockRetryDurableJobDataProvider, + retryDurablePollingDefinition, + retryDurableTriggerKeyName, + jobExecutionContexts); + + var mockCleanupJobDataProvider = new Mock(); + this.SetupMockJobDataProvider( + mockCleanupJobDataProvider, + cleanupPollingDefinition, + cleanupTriggerKeyName, + jobExecutionContexts); + + var queueTracker = new QueueTracker( + schedulerId, + new[] { mockRetryDurableJobDataProvider.Object, mockCleanupJobDataProvider.Object }, + Mock.Of() + ); + + // act + await queueTracker.ScheduleJobsAsync(); + + await WaitForSeconds(5); + + await queueTracker.UnscheduleJobsAsync(); + + // assert + jobExecutionContexts.Where(x => x.PreviousFireTimeUtc is null).Count().Should().Be(2); + jobExecutionContexts.Where(x => x.PreviousFireTimeUtc is null && x.Trigger.Key.Name == retryDurableTriggerKeyName).Count().Should().Be(1); + jobExecutionContexts.Where(x => x.PreviousFireTimeUtc is null && x.Trigger.Key.Name == cleanupTriggerKeyName).Count().Should().Be(1); + } + + [Fact] + public async Task QueueTracker_ScheduleAndUnscheduleRetryDurableJobs_Success() + { + // arrange + var schedulerId = "pollingId"; + + var retryDurablePollingDefinition = + new RetryDurablePollingDefinition( + enabled: true, + cronExpression: "*/5 * * ? * * *", + fetchSize: 100, + expirationIntervalFactor: 1 ); var jobExecutionContexts = new List(); var dataMap = new JobDataMap { { "JobExecution", jobExecutionContexts } }; - var mockIJobDetailProvider = new Mock(); - mockIJobDetailProvider - .Setup(x => x.GetQueuePollingJobDetail()) + var mockIJobDataProvider = new Mock(); + mockIJobDataProvider + .Setup(x => x.GetPollingJobDetail()) .Returns( JobBuilder .Create() @@ -54,59 +111,62 @@ public async Task QueueTracker_ScheduleAndUnscheduleJobs_Success() .Build() ); + mockIJobDataProvider + .SetupGet(m => m.PollingDefinition) + .Returns(retryDurablePollingDefinition); + + mockIJobDataProvider + .SetupGet(m => m.Trigger) + .Returns( + TriggerBuilder + .Create() + .WithIdentity("Trigger1", "queueTrackerGroup") + .WithCronSchedule("*/5 * * ? * * *") + .StartNow() + .WithPriority(1) + .Build()); + var mockILogHandler = new Mock(); mockILogHandler.Setup(x => x.Info(It.IsAny(), It.IsAny())); mockILogHandler.Setup(x => x.Error(It.IsAny(), It.IsAny(), It.IsAny())); - var retryDurablePollingDefinition = - new RetryDurablePollingDefinition( - enabled: true, - cronExpression: "*/5 * * ? * * *", - fetchSize: 100, - expirationIntervalFactor: 1, - id: "pollingId" - ); var queueTracker1 = new QueueTracker( - mockILogHandler.Object, - retryDurablePollingDefinition, - mockIJobDetailProvider.Object, - mockITriggerProvider1.Object + schedulerId, + new[] { mockIJobDataProvider.Object }, + mockILogHandler.Object ); // act - queueTracker1.ScheduleJob(); + await queueTracker1.ScheduleJobsAsync(); await WaitForSeconds(6).ConfigureAwait(false); - queueTracker1.UnscheduleJob(); + await queueTracker1.UnscheduleJobsAsync(); await WaitForSeconds(15).ConfigureAwait(false); - var mockITriggerProvider2 = new Mock(); - mockITriggerProvider2 - .Setup(x => x.GetQueuePollingTrigger()) - .Returns(() => + mockIJobDataProvider + .SetupGet(m => m.Trigger) + .Returns( TriggerBuilder - .Create() - .WithIdentity("Trigger2", "queueTrackerGroup") - .WithCronSchedule("*/5 * * ? * * *") - .StartNow() - .WithPriority(1) - .Build() - ); + .Create() + .WithIdentity("Trigger2", "queueTrackerGroup") + .WithCronSchedule("*/5 * * ? * * *") + .StartNow() + .WithPriority(1) + .Build()); var queueTracker2 = new QueueTracker( - mockILogHandler.Object, - retryDurablePollingDefinition, - mockIJobDetailProvider.Object, - mockITriggerProvider2.Object + schedulerId, + new[] { mockIJobDataProvider.Object }, + mockILogHandler.Object ); - queueTracker2.ScheduleJob(); + await queueTracker2.ScheduleJobsAsync(); await WaitForSeconds(6).ConfigureAwait(false); - queueTracker2.UnscheduleJob(); + await queueTracker2.UnscheduleJobsAsync(); await WaitForSeconds(15).ConfigureAwait(false); @@ -137,5 +197,38 @@ private static async Task WaitForSeconds(int seconds) await Task.Delay(100).ConfigureAwait(false); } } + + private void SetupMockJobDataProvider( + Mock mockJobDataProvider, + PollingDefinition pollingDefinition, + string triggerKeyName, + List jobExecutionContexts) + { + var dataMap = new JobDataMap { { "JobExecution", jobExecutionContexts } }; + + mockJobDataProvider + .Setup(x => x.GetPollingJobDetail()) + .Returns( + JobBuilder + .Create() + .SetJobData(dataMap) + .Build() + ); + + mockJobDataProvider + .SetupGet(m => m.PollingDefinition) + .Returns(pollingDefinition); + + mockJobDataProvider + .SetupGet(m => m.Trigger) + .Returns( + TriggerBuilder + .Create() + .WithIdentity(triggerKeyName, "queueTrackerGroupTest") + .WithCronSchedule(pollingDefinition.CronExpression) + .StartNow() + .WithPriority(1) + .Build()); + } } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/RetryDurableConsumerGuaranteeOrderedMiddlewareTests.cs b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/RetryDurableConsumerGuaranteeOrderedMiddlewareTests.cs index 587e444e..1b18b84c 100644 --- a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/RetryDurableConsumerGuaranteeOrderedMiddlewareTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/RetryDurableConsumerGuaranteeOrderedMiddlewareTests.cs @@ -11,7 +11,7 @@ public class RetryDurableConsumerGuaranteeOrderedMiddlewareTests { - public static readonly IEnumerable DataTest = new List + public static IEnumerable DataTest() => new List { new object[] { diff --git a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/RetryDurableConsumerLatestMiddlewareTests.cs b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/RetryDurableConsumerLatestMiddlewareTests.cs index 47568a68..83e65636 100644 --- a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/RetryDurableConsumerLatestMiddlewareTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/RetryDurableConsumerLatestMiddlewareTests.cs @@ -11,7 +11,15 @@ public class RetryDurableConsumerLatestMiddlewareTests { - public static readonly IEnumerable DataTest = new List + private readonly Mock logHandler = new Mock(); + + private readonly Mock messageContext = new Mock(); + + private readonly Mock retryDurableQueueRepository = new Mock(); + + private readonly Mock utf8Encoder = new Mock(); + + public static IEnumerable DataTest() => new List { new object[] { @@ -33,11 +41,6 @@ public class RetryDurableConsumerLatestMiddlewareTests } }; - private readonly Mock logHandler = new Mock(); - private readonly Mock messageContext = new Mock(); - private readonly Mock retryDurableQueueRepository = new Mock(); - private readonly Mock utf8Encoder = new Mock(); - [Theory] [MemberData(nameof(DataTest))] public void RetryDurableConsumerLatestMiddleware_Ctor_Tests( diff --git a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/RetryDurableMiddlewareTests.cs b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/RetryDurableMiddlewareTests.cs index cf29b76f..56ec07d7 100644 --- a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/RetryDurableMiddlewareTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/RetryDurableMiddlewareTests.cs @@ -10,7 +10,7 @@ public class RetryDurableMiddlewareTests { - public static readonly IEnumerable DataTest = new List + public static IEnumerable DataTest() => new List { new object[] { diff --git a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Forever/RetryForeverDefinitionTests.cs b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Forever/RetryForeverDefinitionTests.cs index 2f513c2d..651afe71 100644 --- a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Forever/RetryForeverDefinitionTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Forever/RetryForeverDefinitionTests.cs @@ -8,7 +8,7 @@ public class RetryForeverDefinitionTests { - public static readonly IEnumerable DataTest = new List + public static IEnumerable DataTest() => new List { new object[] { diff --git a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Simple/RetrySimpleDefinitionTests.cs b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Simple/RetrySimpleDefinitionTests.cs index 92095d21..30d2487b 100644 --- a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Simple/RetrySimpleDefinitionTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Simple/RetrySimpleDefinitionTests.cs @@ -8,7 +8,7 @@ public class RetrySimpleDefinitionTests { - public static readonly IEnumerable DataTest = new List + public static IEnumerable DataTest() => new List { new object[] { diff --git a/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/Readers/DboCollectionNavigatorTests.cs b/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/Readers/DboCollectionNavigatorTests.cs index f63d60a4..5f6d51c5 100644 --- a/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/Readers/DboCollectionNavigatorTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/Readers/DboCollectionNavigatorTests.cs @@ -12,39 +12,8 @@ public class DboCollectionNavigatorTests { - public static readonly IEnumerable DataTestCtor = new List - { - new object[] - { - null, - Mock.Of>() - }, - new object[] - { - Mock.Of>(), - null - } - }; - - public static readonly IEnumerable DataTestNavigate = new List - { - new object[] - { - null, - new Predicate((_)=> true) - }, - new object[] - { - new Action((_) => - new RetryQueueItem(Guid.NewGuid(), 1, DateTime.UtcNow,0,null,null, RetryQueueItemStatus.Waiting, SeverityLevel.High, "description") - { - Message = new RetryQueueItemMessage("topicName", new byte[1], new byte[1], 1, 1, DateTime.UtcNow) - }), - null - } - }; - private readonly DboCollectionNavigator dboCollectionNavigator; + private readonly Mock> dboDomainAdapter = new Mock>(); private readonly IList dbos = new List @@ -82,6 +51,38 @@ public DboCollectionNavigatorTests() dboCollectionNavigator = new DboCollectionNavigator(dbos, dboDomainAdapter.Object); } + public static IEnumerable DataTestCtor() => new List + { + new object[] + { + null, + Mock.Of>() + }, + new object[] + { + Mock.Of>(), + null + } + }; + + public static IEnumerable DataTestNavigate() => new List + { + new object[] + { + null, + new Predicate((_)=> true) + }, + new object[] + { + new Action((_) => + new RetryQueueItem(Guid.NewGuid(), 1, DateTime.UtcNow,0,null,null, RetryQueueItemStatus.Waiting, SeverityLevel.High, "description") + { + Message = new RetryQueueItemMessage("topicName", new byte[1], new byte[1], 1, 1, DateTime.UtcNow) + }), + null + } + }; + [Fact] public void DboCollectionNavigator_Navigate_Success() { diff --git a/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/Readers/RetryQueueReaderTests.cs b/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/Readers/RetryQueueReaderTests.cs index 29b769b1..ea8fb208 100644 --- a/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/Readers/RetryQueueReaderTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/Readers/RetryQueueReaderTests.cs @@ -13,7 +13,54 @@ public class RetryQueueReaderTests { - public static readonly IEnumerable DataTest = new List + private readonly RetryQueueReader reader; + + private readonly Mock retryQueueAdapter = new Mock(); + + private readonly Mock retryQueueItemAdapter = new Mock(); + + private readonly Mock retryQueueItemMessageAdapter = new Mock(); + + private readonly Mock retryQueueItemMessageHeaderAdapter = new Mock(); + + public RetryQueueReaderTests() + { + var item1 = this.CreateRetryQueueItem(1, RetryQueueItemStatus.InRetry, SeverityLevel.High); + var itemsA = new[] { item1 }; + + retryQueueAdapter + .Setup(d => d.Adapt(It.IsAny())) + .Returns(new RetryQueue(Guid.NewGuid(), "searchGroupKeyA", "queueGroupKeyA", DateTime.UtcNow, DateTime.UtcNow, RetryQueueStatus.Active, itemsA)); + + retryQueueItemAdapter + .Setup(d => d.Adapt(It.IsAny())) + .Returns(new RetryQueueItem( + id: Guid.NewGuid(), + attemptsCount: 3, + creationDate: DateTime.UtcNow, + sort: 0, + lastExecution: DateTime.UtcNow, + modifiedStatusDate: DateTime.UtcNow, + status: RetryQueueItemStatus.InRetry, + severityLevel: SeverityLevel.Low, + description: "test")); + + retryQueueItemMessageAdapter + .Setup(d => d.Adapt(It.IsAny())) + .Returns(new RetryQueueItemMessage("topicName", new byte[] { 1, 3 }, new byte[] { 2, 4, 6 }, 3, 21, DateTime.UtcNow)); + + retryQueueItemMessageHeaderAdapter + .Setup(d => d.Adapt(It.IsAny())) + .Returns(new MessageHeader("key", new byte[2])); + + reader = new RetryQueueReader( + retryQueueAdapter.Object, + retryQueueItemAdapter.Object, + retryQueueItemMessageAdapter.Object, + retryQueueItemMessageHeaderAdapter.Object); + } + + public static IEnumerable DataTest() => new List { new object[] { @@ -61,49 +108,6 @@ public class RetryQueueReaderTests } }; - private readonly RetryQueueReader reader; - private readonly Mock retryQueueAdapter = new Mock(); - private readonly Mock retryQueueItemAdapter = new Mock(); - private readonly Mock retryQueueItemMessageAdapter = new Mock(); - private readonly Mock retryQueueItemMessageHeaderAdapter = new Mock(); - - public RetryQueueReaderTests() - { - var item1 = this.CreateRetryQueueItem(1, RetryQueueItemStatus.InRetry, SeverityLevel.High); - var itemsA = new[] { item1 }; - - retryQueueAdapter - .Setup(d => d.Adapt(It.IsAny())) - .Returns(new RetryQueue(Guid.NewGuid(), "searchGroupKeyA", "queueGroupKeyA", DateTime.UtcNow, DateTime.UtcNow, RetryQueueStatus.Active, itemsA)); - - retryQueueItemAdapter - .Setup(d => d.Adapt(It.IsAny())) - .Returns(new RetryQueueItem( - id: Guid.NewGuid(), - attemptsCount: 3, - creationDate: DateTime.UtcNow, - sort: 0, - lastExecution: DateTime.UtcNow, - modifiedStatusDate: DateTime.UtcNow, - status: RetryQueueItemStatus.InRetry, - severityLevel: SeverityLevel.Low, - description: "test")); - - retryQueueItemMessageAdapter - .Setup(d => d.Adapt(It.IsAny())) - .Returns(new RetryQueueItemMessage("topicName", new byte[] { 1, 3 }, new byte[] { 2, 4, 6 }, 3, 21, DateTime.UtcNow)); - - retryQueueItemMessageHeaderAdapter - .Setup(d => d.Adapt(It.IsAny())) - .Returns(new MessageHeader("key", new byte[2])); - - reader = new RetryQueueReader( - retryQueueAdapter.Object, - retryQueueItemAdapter.Object, - retryQueueItemMessageAdapter.Object, - retryQueueItemMessageHeaderAdapter.Object); - } - [Fact] public void RetryQueueReader_Read_Success() { diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/CleanupPollingDefinitionBuilder.cs b/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/CleanupPollingDefinitionBuilder.cs new file mode 100644 index 00000000..fb20a783 --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/CleanupPollingDefinitionBuilder.cs @@ -0,0 +1,34 @@ +namespace KafkaFlow.Retry +{ + using KafkaFlow.Retry.Durable.Definitions.Polling; + + public class CleanupPollingDefinitionBuilder : PollingDefinitionBuilder + { + private int rowsPerRequest = 256; + private int timeToLiveInDays = 30; + + internal override bool Required => false; + + public CleanupPollingDefinitionBuilder WithRowsPerRequest(int rowsPerRequest) + { + this.rowsPerRequest = rowsPerRequest; + return this; + } + + public CleanupPollingDefinitionBuilder WithTimeToLiveInDays(int timeToLiveInDays) + { + this.timeToLiveInDays = timeToLiveInDays; + return this; + } + + internal CleanupPollingDefinition Build() + { + return new CleanupPollingDefinition( + this.enabled, + this.cronExpression, + this.timeToLiveInDays, + this.rowsPerRequest + ); + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/PollingDefinitionBuilder.cs b/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/PollingDefinitionBuilder.cs new file mode 100644 index 00000000..767669b1 --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/PollingDefinitionBuilder.cs @@ -0,0 +1,22 @@ +namespace KafkaFlow.Retry +{ + public abstract class PollingDefinitionBuilder where TSelf : PollingDefinitionBuilder + { + protected string cronExpression; + protected bool enabled; + + internal abstract bool Required { get; } + + public TSelf Enabled(bool value) + { + this.enabled = value; + return (TSelf)this; + } + + public TSelf WithCronExpression(string cronExpression) + { + this.cronExpression = cronExpression; + return (TSelf)this; + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/PollingDefinitionsAggregatorBuilder.cs b/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/PollingDefinitionsAggregatorBuilder.cs new file mode 100644 index 00000000..3aeb010e --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/PollingDefinitionsAggregatorBuilder.cs @@ -0,0 +1,75 @@ +namespace KafkaFlow.Retry +{ + using System; + using System.Collections.Generic; + using System.Linq; + using Dawn; + using KafkaFlow.Retry.Durable.Definitions.Polling; + + public class PollingDefinitionsAggregatorBuilder + { + private readonly CleanupPollingDefinitionBuilder cleanupPollingDefinitionBuilder; + private readonly List pollingDefinitions; + private readonly RetryDurablePollingDefinitionBuilder retryDurablePollingDefinitionBuilder; + private string schedulerId; + + public PollingDefinitionsAggregatorBuilder() + { + this.cleanupPollingDefinitionBuilder = new CleanupPollingDefinitionBuilder(); + this.retryDurablePollingDefinitionBuilder = new RetryDurablePollingDefinitionBuilder(); + + this.pollingDefinitions = new List(); + } + + public PollingDefinitionsAggregatorBuilder WithCleanupPollingConfiguration(Action configure) + { + Guard.Argument(configure, nameof(configure)).NotNull(); + + configure(this.cleanupPollingDefinitionBuilder); + var cleanupPollingDefinition = this.cleanupPollingDefinitionBuilder.Build(); + + this.pollingDefinitions.Add(cleanupPollingDefinition); + + return this; + } + + public PollingDefinitionsAggregatorBuilder WithRetryDurablePollingConfiguration(Action configure) + { + Guard.Argument(configure, nameof(configure)).NotNull(); + + configure(this.retryDurablePollingDefinitionBuilder); + var retryDurablepollingDefinition = this.retryDurablePollingDefinitionBuilder.Build(); + + this.pollingDefinitions.Add(retryDurablepollingDefinition); + + return this; + } + + public PollingDefinitionsAggregatorBuilder WithSchedulerId(string schedulerId) + { + this.schedulerId = schedulerId; + return this; + } + + internal PollingDefinitionsAggregator Build() + { + if (this.retryDurablePollingDefinitionBuilder.Required) + { + this.ValidateRequiredPollingDefinition(PollingJobType.RetryDurable); + } + + if (this.cleanupPollingDefinitionBuilder.Required) + { + this.ValidateRequiredPollingDefinition(PollingJobType.Cleanup); + } + + return new PollingDefinitionsAggregator(this.schedulerId, this.pollingDefinitions); + } + + private void ValidateRequiredPollingDefinition(PollingJobType pollingJobType) + { + Guard.Argument(this.pollingDefinitions.Any(pd => pd.PollingJobType == pollingJobType), nameof(this.pollingDefinitions)) + .True($"The polling job {pollingJobType} must be defined."); + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/RetryDurablePollingDefinitionBuilder.cs b/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/RetryDurablePollingDefinitionBuilder.cs new file mode 100644 index 00000000..b18b5967 --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Definitions/Builders/Polling/RetryDurablePollingDefinitionBuilder.cs @@ -0,0 +1,34 @@ +namespace KafkaFlow.Retry +{ + using KafkaFlow.Retry.Durable.Definitions.Polling; + + public class RetryDurablePollingDefinitionBuilder : PollingDefinitionBuilder + { + protected int expirationIntervalFactor = 1; + protected int fetchSize = 256; + + internal override bool Required => true; + + public RetryDurablePollingDefinitionBuilder WithExpirationIntervalFactor(int expirationIntervalFactor) + { + this.expirationIntervalFactor = expirationIntervalFactor; + return this; + } + + public RetryDurablePollingDefinitionBuilder WithFetchSize(int fetchSize) + { + this.fetchSize = fetchSize; + return this; + } + + internal RetryDurablePollingDefinition Build() + { + return new RetryDurablePollingDefinition( + this.enabled, + this.cronExpression, + this.fetchSize, + this.expirationIntervalFactor + ); + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableDefinitionBuilder.cs b/src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableDefinitionBuilder.cs index 8347fc16..aacdd1ef 100644 --- a/src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableDefinitionBuilder.cs +++ b/src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableDefinitionBuilder.cs @@ -6,7 +6,9 @@ using KafkaFlow.Configuration; using KafkaFlow.Retry.Durable.Compression; using KafkaFlow.Retry.Durable.Definitions; + using KafkaFlow.Retry.Durable.Definitions.Polling; using KafkaFlow.Retry.Durable.Encoders; + using KafkaFlow.Retry.Durable.Polling; using KafkaFlow.Retry.Durable.Repository; using KafkaFlow.Retry.Durable.Repository.Adapters; using KafkaFlow.Retry.Durable.Serializers; @@ -17,8 +19,8 @@ public class RetryDurableDefinitionBuilder private readonly List> retryWhenExceptions = new List>(); private JsonSerializerSettings jsonSerializerSettings = new JsonSerializerSettings(); private Type messageType; + private PollingDefinitionsAggregator pollingDefinitionsAggregator; private RetryDurableEmbeddedClusterDefinitionBuilder retryDurableEmbeddedClusterDefinitionBuilder; - private RetryDurablePollingDefinition retryDurablePollingDefinition; private IRetryDurableQueueRepositoryProvider retryDurableRepositoryProvider; private RetryDurableRetryPlanBeforeDefinition retryDurableRetryPlanBeforeDefinition; @@ -47,6 +49,8 @@ public RetryDurableDefinitionBuilder WithEmbeddedRetryCluster( Action configure ) { + Guard.Argument(configure, nameof(configure)).NotNull(); + this.retryDurableEmbeddedClusterDefinitionBuilder = new RetryDurableEmbeddedClusterDefinitionBuilder(cluster); configure(this.retryDurableEmbeddedClusterDefinitionBuilder); @@ -65,11 +69,13 @@ public RetryDurableDefinitionBuilder WithMessageType(Type messageType) return this; } - public RetryDurableDefinitionBuilder WithQueuePollingJobConfiguration(Action configure) + public RetryDurableDefinitionBuilder WithPollingJobsConfiguration(Action configure) { - var retryDurablePollingDefinitionBuilder = new RetryDurableQueuePollingJobDefinitionBuilder(); - configure(retryDurablePollingDefinitionBuilder); - this.retryDurablePollingDefinition = retryDurablePollingDefinitionBuilder.Build(); + Guard.Argument(configure, nameof(configure)).NotNull(); + + var pollingDefinitionsAggregatorBuilder = new PollingDefinitionsAggregatorBuilder(); + configure(pollingDefinitionsAggregatorBuilder); + this.pollingDefinitionsAggregator = pollingDefinitionsAggregatorBuilder.Build(); return this; } @@ -83,6 +89,8 @@ public RetryDurableDefinitionBuilder WithRepositoryProvider(IRetryDurableQueueRe public RetryDurableDefinitionBuilder WithRetryPlanBeforeRetryDurable(Action configure) { + Guard.Argument(configure, nameof(configure)).NotNull(); + var retryDurableRetryPlanBeforeDefinitionBuilder = new RetryDurableRetryPlanBeforeDefinitionBuilder(); configure(retryDurableRetryPlanBeforeDefinitionBuilder); this.retryDurableRetryPlanBeforeDefinition = retryDurableRetryPlanBeforeDefinitionBuilder.Build(); @@ -96,6 +104,7 @@ internal RetryDurableDefinition Build() Guard.Argument(this.retryDurableRepositoryProvider).NotNull("A repository should be defined"); Guard.Argument(this.messageType).NotNull("A message type should be defined"); + var triggerProvider = new TriggerProvider(); var utf8Encoder = new Utf8Encoder(); var gzipCompressor = new GzipCompressor(); var newtonsoftJsonSerializer = new NewtonsoftJsonSerializer(this.jsonSerializerSettings); @@ -113,7 +122,7 @@ internal RetryDurableDefinition Build() messageHeadersAdapter, messageAdapter, utf8Encoder, - this.retryDurablePollingDefinition); + this.pollingDefinitionsAggregator); this.retryDurableEmbeddedClusterDefinitionBuilder .Build( @@ -122,9 +131,9 @@ internal RetryDurableDefinition Build() gzipCompressor, utf8Encoder, newtonsoftJsonSerializer, - messageAdapter, messageHeadersAdapter, - this.retryDurablePollingDefinition + this.pollingDefinitionsAggregator, + triggerProvider ); return new RetryDurableDefinition( diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableEmbeddedClusterDefinitionBuilder.cs b/src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableEmbeddedClusterDefinitionBuilder.cs index 3368e8c1..572d57f8 100644 --- a/src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableEmbeddedClusterDefinitionBuilder.cs +++ b/src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableEmbeddedClusterDefinitionBuilder.cs @@ -7,7 +7,7 @@ using KafkaFlow.Producers; using KafkaFlow.Retry.Durable; using KafkaFlow.Retry.Durable.Compression; - using KafkaFlow.Retry.Durable.Definitions; + using KafkaFlow.Retry.Durable.Definitions.Polling; using KafkaFlow.Retry.Durable.Encoders; using KafkaFlow.Retry.Durable.Polling; using KafkaFlow.Retry.Durable.Repository; @@ -73,10 +73,9 @@ internal void Build( IGzipCompressor gzipCompressor, IUtf8Encoder utf8Encoder, INewtonsoftJsonSerializer newtonsoftJsonSerializer, - IMessageAdapter messageAdapter, IMessageHeadersAdapter messageHeadersAdapter, - RetryDurablePollingDefinition retryDurablePollingDefinition - ) + PollingDefinitionsAggregator pollingDefinitionsAggregator, + ITriggerProvider triggerProvider) { if (!enabled) { @@ -99,10 +98,14 @@ RetryDurablePollingDefinition retryDurablePollingDefinition var queueTrackerCoordinator = new QueueTrackerCoordinator( new QueueTrackerFactory( - retryDurableQueueRepository, - messageHeadersAdapter, - messageAdapter, - utf8Encoder + pollingDefinitionsAggregator.SchedulerId, + new JobDataProvidersFactory( + pollingDefinitionsAggregator, + triggerProvider, + retryDurableQueueRepository, + messageHeadersAdapter, + utf8Encoder + ) ) ); @@ -143,10 +146,11 @@ RetryDurablePollingDefinition retryDurablePollingDefinition }); queueTrackerCoordinator - .ScheduleJob( - retryDurablePollingDefinition, + .ScheduleJobsAsync( resolver.Resolve().GetProducer(producerName), - log); + log) + .GetAwaiter() + .GetResult(); } }) .WithPartitionsRevokedHandler( @@ -160,7 +164,7 @@ RetryDurablePollingDefinition retryDurablePollingDefinition PartitionsRevoked = partitionsRevokedHandler }); - queueTrackerCoordinator.UnscheduleJob(); + queueTrackerCoordinator.UnscheduleJobsAsync().GetAwaiter().GetResult(); }) .AddMiddlewares( middlewares => middlewares diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableQueuePollingJobDefinitionBuilder.cs b/src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableQueuePollingJobDefinitionBuilder.cs deleted file mode 100644 index 82f4b4b4..00000000 --- a/src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableQueuePollingJobDefinitionBuilder.cs +++ /dev/null @@ -1,54 +0,0 @@ -namespace KafkaFlow.Retry -{ - using KafkaFlow.Retry.Durable.Definitions; - - public class RetryDurableQueuePollingJobDefinitionBuilder - { - private string cronExpression; - private bool enabled; - private int expirationIntervalFactor = 1; - private int fetchSize = 256; - private string id; - - public RetryDurableQueuePollingJobDefinitionBuilder Enabled(bool enabled) - { - this.enabled = enabled; - return this; - } - - public RetryDurableQueuePollingJobDefinitionBuilder WithCronExpression(string cronExpression) - { - this.cronExpression = cronExpression; - return this; - } - - public RetryDurableQueuePollingJobDefinitionBuilder WithExpirationIntervalFactor(int expirationIntervalFactor) - { - this.expirationIntervalFactor = expirationIntervalFactor; - return this; - } - - public RetryDurableQueuePollingJobDefinitionBuilder WithFetchSize(int fetchSize) - { - this.fetchSize = fetchSize; - return this; - } - - public RetryDurableQueuePollingJobDefinitionBuilder WithId(string id) - { - this.id = id; - return this; - } - - internal RetryDurablePollingDefinition Build() - { - return new RetryDurablePollingDefinition( - this.enabled, - this.cronExpression, - this.fetchSize, - this.expirationIntervalFactor, - this.id - ); - } - } -} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Polling/CleanupPollingDefinition.cs b/src/KafkaFlow.Retry/Durable/Definitions/Polling/CleanupPollingDefinition.cs new file mode 100644 index 00000000..2a9c9eee --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Definitions/Polling/CleanupPollingDefinition.cs @@ -0,0 +1,30 @@ +namespace KafkaFlow.Retry.Durable.Definitions.Polling +{ + using Dawn; + + internal class CleanupPollingDefinition : PollingDefinition + { + public CleanupPollingDefinition( + bool enabled, + string cronExpression, + int timeToLiveInDays, + int rowsPerRequest) + : base(enabled, cronExpression) + { + if (enabled) + { + Guard.Argument(timeToLiveInDays, nameof(timeToLiveInDays)).Positive(); + Guard.Argument(rowsPerRequest, nameof(rowsPerRequest)).Positive(); + } + + this.TimeToLiveInDays = timeToLiveInDays; + this.RowsPerRequest = rowsPerRequest; + } + + public override PollingJobType PollingJobType => PollingJobType.Cleanup; + + public int RowsPerRequest { get; } + + public int TimeToLiveInDays { get; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingDefinition.cs b/src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingDefinition.cs new file mode 100644 index 00000000..49e8e6e3 --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingDefinition.cs @@ -0,0 +1,27 @@ +namespace KafkaFlow.Retry.Durable.Definitions.Polling +{ + using Dawn; + + internal abstract class PollingDefinition + { + protected PollingDefinition(bool enabled, string cronExpression) + { + Guard.Argument(this.PollingJobType, nameof(this.PollingJobType)).NotDefault(); + + if (enabled) + { + Guard.Argument(Quartz.CronExpression.IsValidExpression(cronExpression), nameof(cronExpression)) + .True("The cron expression that was defined is not valid"); + } + + this.Enabled = enabled; + this.CronExpression = cronExpression; + } + + public string CronExpression { get; } + + public bool Enabled { get; } + + public abstract PollingJobType PollingJobType { get; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingDefinitionsAggregator.cs b/src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingDefinitionsAggregator.cs new file mode 100644 index 00000000..f997f98a --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingDefinitionsAggregator.cs @@ -0,0 +1,25 @@ +namespace KafkaFlow.Retry.Durable.Definitions.Polling +{ + using System.Collections.Generic; + using System.Linq; + using Dawn; + + internal class PollingDefinitionsAggregator + { + public PollingDefinitionsAggregator(string schedulerId, IEnumerable pollingDefinitions) + { + Guard.Argument(schedulerId, nameof(schedulerId)).NotNull().NotEmpty(); + Guard.Argument(pollingDefinitions, nameof(pollingDefinitions)).NotNull().NotEmpty(); + + var pollingJobTypes = pollingDefinitions.Select(pd => pd.PollingJobType); + Guard.Argument(pollingJobTypes, nameof(pollingJobTypes)).DoesNotContainDuplicate(); + + this.SchedulerId = schedulerId; + this.PollingDefinitions = pollingDefinitions.ToDictionary(pd => pd.PollingJobType, pd => pd); + } + + public IDictionary PollingDefinitions { get; } + + public string SchedulerId { get; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingJobType.cs b/src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingJobType.cs new file mode 100644 index 00000000..8ee541d2 --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Definitions/Polling/PollingJobType.cs @@ -0,0 +1,10 @@ +namespace KafkaFlow.Retry.Durable.Definitions.Polling +{ + internal enum PollingJobType + { + Unknown = 0, + + RetryDurable = 1, + Cleanup = 2 + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Polling/RetryDurablePollingDefinition.cs b/src/KafkaFlow.Retry/Durable/Definitions/Polling/RetryDurablePollingDefinition.cs new file mode 100644 index 00000000..c16d31a9 --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Definitions/Polling/RetryDurablePollingDefinition.cs @@ -0,0 +1,27 @@ +namespace KafkaFlow.Retry.Durable.Definitions.Polling +{ + using Dawn; + + internal class RetryDurablePollingDefinition : PollingDefinition + { + public RetryDurablePollingDefinition( + bool enabled, + string cronExpression, + int fetchSize, + int expirationIntervalFactor) + : base(enabled, cronExpression) + { + Guard.Argument(fetchSize, nameof(fetchSize)).Positive(); + Guard.Argument(expirationIntervalFactor, nameof(expirationIntervalFactor)).Positive(); + + this.FetchSize = fetchSize; + this.ExpirationIntervalFactor = expirationIntervalFactor; + } + + public int ExpirationIntervalFactor { get; } + + public int FetchSize { get; } + + public override PollingJobType PollingJobType => PollingJobType.RetryDurable; + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Definitions/RetryDurablePollingDefinition.cs b/src/KafkaFlow.Retry/Durable/Definitions/RetryDurablePollingDefinition.cs deleted file mode 100644 index 0ee47dde..00000000 --- a/src/KafkaFlow.Retry/Durable/Definitions/RetryDurablePollingDefinition.cs +++ /dev/null @@ -1,41 +0,0 @@ -namespace KafkaFlow.Retry.Durable.Definitions -{ - using Dawn; - - internal class RetryDurablePollingDefinition - { - public RetryDurablePollingDefinition( - bool enabled, - string cronExpression, - int fetchSize, - int expirationIntervalFactor, - string id) - { - if (enabled) - { - Guard.Argument(Quartz.CronExpression.IsValidExpression(cronExpression), nameof(cronExpression)) - .True("The cron expression that was defined is not valid"); - } - - Guard.Argument(id, nameof(id)).NotNull().NotEmpty(); - Guard.Argument(fetchSize, nameof(fetchSize)).Positive(); - Guard.Argument(expirationIntervalFactor, nameof(expirationIntervalFactor)).Positive(); - - this.CronExpression = cronExpression; - this.Enabled = enabled; - this.FetchSize = fetchSize; - this.ExpirationIntervalFactor = expirationIntervalFactor; - this.Id = id; - } - - public string CronExpression { get; } - - public bool Enabled { get; } - - public int ExpirationIntervalFactor { get; } - - public int FetchSize { get; } - - public string Id { get; } - } -} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/Extensions/JobDataMapExtensions.cs b/src/KafkaFlow.Retry/Durable/Polling/Extensions/JobDataMapExtensions.cs new file mode 100644 index 00000000..9a4002ec --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Polling/Extensions/JobDataMapExtensions.cs @@ -0,0 +1,28 @@ +namespace KafkaFlow.Retry.Durable.Polling.Extensions +{ + using Dawn; + using Quartz; + + internal static class JobDataMapExtensions + { + public static string GetValidStringValue(this JobDataMap jobDataMap, string key, string jobName) + { + var stringValue = jobDataMap.GetValidValue(key, jobName); + + Guard.Argument(stringValue).NotEmpty($"Argument {key} can't be an empty string for the job {jobName}."); + + return stringValue; + } + + public static T GetValidValue(this JobDataMap jobDataMap, string key, string jobName) where T : class + { + jobDataMap.TryGetValue(key, out var objValue); + + var value = objValue as T; + + Guard.Argument(value).NotNull($"Argument {key} is required for the job {jobName}."); + + return value; + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/IJobDataProvider.cs b/src/KafkaFlow.Retry/Durable/Polling/IJobDataProvider.cs new file mode 100644 index 00000000..b979ecc7 --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Polling/IJobDataProvider.cs @@ -0,0 +1,14 @@ +namespace KafkaFlow.Retry.Durable.Polling +{ + using KafkaFlow.Retry.Durable.Definitions.Polling; + using Quartz; + + internal interface IJobDataProvider + { + PollingDefinition PollingDefinition { get; } + + ITrigger Trigger { get; } + + IJobDetail GetPollingJobDetail(); + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/IJobDataProvidersFactory.cs b/src/KafkaFlow.Retry/Durable/Polling/IJobDataProvidersFactory.cs new file mode 100644 index 00000000..2107f109 --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Polling/IJobDataProvidersFactory.cs @@ -0,0 +1,9 @@ +namespace KafkaFlow.Retry.Durable.Polling +{ + using System.Collections.Generic; + + internal interface IJobDataProvidersFactory + { + IEnumerable Create(IMessageProducer retryDurableMessageProducer, ILogHandler logHandler); + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/IJobDetailProvider.cs b/src/KafkaFlow.Retry/Durable/Polling/IJobDetailProvider.cs deleted file mode 100644 index a8ce264d..00000000 --- a/src/KafkaFlow.Retry/Durable/Polling/IJobDetailProvider.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace KafkaFlow.Retry.Durable.Polling -{ - using Quartz; - - internal interface IJobDetailProvider - { - IJobDetail GetQueuePollingJobDetail(); - } -} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/IQueueTrackerCoordinator.cs b/src/KafkaFlow.Retry/Durable/Polling/IQueueTrackerCoordinator.cs index 4e624b4b..4e11cb2e 100644 --- a/src/KafkaFlow.Retry/Durable/Polling/IQueueTrackerCoordinator.cs +++ b/src/KafkaFlow.Retry/Durable/Polling/IQueueTrackerCoordinator.cs @@ -1,14 +1,11 @@ namespace KafkaFlow.Retry.Durable.Polling { - using KafkaFlow.Retry.Durable.Definitions; + using System.Threading.Tasks; internal interface IQueueTrackerCoordinator { - void ScheduleJob( - RetryDurablePollingDefinition retryDurablePollingDefinition, - IMessageProducer retryDurableMessageProducer, - ILogHandler logHandler); + Task ScheduleJobsAsync(IMessageProducer retryDurableMessageProducer, ILogHandler logHandler); - void UnscheduleJob(); + Task UnscheduleJobsAsync(); } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/IQueueTrackerFactory.cs b/src/KafkaFlow.Retry/Durable/Polling/IQueueTrackerFactory.cs index 71217647..c039d1ed 100644 --- a/src/KafkaFlow.Retry/Durable/Polling/IQueueTrackerFactory.cs +++ b/src/KafkaFlow.Retry/Durable/Polling/IQueueTrackerFactory.cs @@ -1,12 +1,7 @@ namespace KafkaFlow.Retry.Durable.Polling { - using KafkaFlow.Retry.Durable.Definitions; - internal interface IQueueTrackerFactory { - QueueTracker Create( - RetryDurablePollingDefinition retryDurablePollingDefinition, - IMessageProducer retryDurableMessageProducer, - ILogHandler logHandler); + QueueTracker Create(IMessageProducer retryDurableMessageProducer, ILogHandler logHandler); } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/ITriggerProvider.cs b/src/KafkaFlow.Retry/Durable/Polling/ITriggerProvider.cs index efc09b49..af2c78f8 100644 --- a/src/KafkaFlow.Retry/Durable/Polling/ITriggerProvider.cs +++ b/src/KafkaFlow.Retry/Durable/Polling/ITriggerProvider.cs @@ -1,9 +1,10 @@ namespace KafkaFlow.Retry.Durable.Polling { + using KafkaFlow.Retry.Durable.Definitions.Polling; using Quartz; internal interface ITriggerProvider { - ITrigger GetQueuePollingTrigger(); + ITrigger GetPollingTrigger(string schedulerId, PollingDefinition pollingDefinition); } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/JobDataProvidersFactory.cs b/src/KafkaFlow.Retry/Durable/Polling/JobDataProvidersFactory.cs new file mode 100644 index 00000000..c5fbf220 --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Polling/JobDataProvidersFactory.cs @@ -0,0 +1,98 @@ +namespace KafkaFlow.Retry.Durable.Polling +{ + using System.Collections.Generic; + using Dawn; + using KafkaFlow.Retry.Durable.Definitions.Polling; + using KafkaFlow.Retry.Durable.Encoders; + using KafkaFlow.Retry.Durable.Polling.Jobs; + using KafkaFlow.Retry.Durable.Repository; + using KafkaFlow.Retry.Durable.Repository.Adapters; + using Quartz; + + internal class JobDataProvidersFactory : IJobDataProvidersFactory + { + private readonly IMessageHeadersAdapter messageHeadersAdapter; + private readonly PollingDefinitionsAggregator pollingDefinitionsAggregator; + private readonly IRetryDurableQueueRepository retryDurableQueueRepository; + private readonly ITriggerProvider triggerProvider; + private readonly IUtf8Encoder utf8Encoder; + + public JobDataProvidersFactory( + PollingDefinitionsAggregator pollingDefinitionsAggregator, + ITriggerProvider triggerProvider, + IRetryDurableQueueRepository retryDurableQueueRepository, + IMessageHeadersAdapter messageHeadersAdapter, + IUtf8Encoder utf8Encoder) + { + Guard.Argument(pollingDefinitionsAggregator, nameof(pollingDefinitionsAggregator)).NotNull(); + Guard.Argument(triggerProvider, nameof(triggerProvider)).NotNull(); + Guard.Argument(retryDurableQueueRepository).NotNull(); + Guard.Argument(messageHeadersAdapter).NotNull(); + Guard.Argument(utf8Encoder).NotNull(); + + this.pollingDefinitionsAggregator = pollingDefinitionsAggregator; + this.retryDurableQueueRepository = retryDurableQueueRepository; + this.messageHeadersAdapter = messageHeadersAdapter; + this.utf8Encoder = utf8Encoder; + this.triggerProvider = triggerProvider; + } + + public IEnumerable Create(IMessageProducer retryDurableMessageProducer, ILogHandler logHandler) + { + var jobDataProviders = new List(2); + + if (this.TryGetPollingDefinition(PollingJobType.RetryDurable, out var retryDurablePollingDefinition)) + { + jobDataProviders.Add( + new RetryDurableJobDataProvider( + retryDurablePollingDefinition, + this.GetTrigger(retryDurablePollingDefinition), + this.pollingDefinitionsAggregator.SchedulerId, + this.retryDurableQueueRepository, + logHandler, + this.messageHeadersAdapter, + this.utf8Encoder, + retryDurableMessageProducer + ) + ); + } + + if (this.TryGetPollingDefinition(PollingJobType.Cleanup, out var cleanupPollingDefinition)) + { + jobDataProviders.Add( + new CleanupJobDataProvider( + cleanupPollingDefinition, + this.GetTrigger(cleanupPollingDefinition), + this.pollingDefinitionsAggregator.SchedulerId, + this.retryDurableQueueRepository, + logHandler + ) + ); + } + + return jobDataProviders; + } + + private ITrigger GetTrigger(PollingDefinition pollingDefinition) + { + return this.triggerProvider.GetPollingTrigger(this.pollingDefinitionsAggregator.SchedulerId, pollingDefinition); + } + + private bool TryGetPollingDefinition(PollingJobType pollingJobType, out TPollingDefinition pollingDefinition) where TPollingDefinition : PollingDefinition + { + pollingDefinition = default; + + var pollingDefinitions = this.pollingDefinitionsAggregator.PollingDefinitions; + + var pollingDefinitionFound = pollingDefinitions.TryGetValue(pollingJobType, out var pollingDefinitionResult); + + if (pollingDefinitionFound) + { + Guard.Argument(pollingDefinitionResult, nameof(pollingDefinitionResult)).NotNull().Compatible(); + pollingDefinition = pollingDefinitionResult as TPollingDefinition; + } + + return pollingDefinitionFound; + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/JobDetailProvider.cs b/src/KafkaFlow.Retry/Durable/Polling/JobDetailProvider.cs deleted file mode 100644 index 12c034ad..00000000 --- a/src/KafkaFlow.Retry/Durable/Polling/JobDetailProvider.cs +++ /dev/null @@ -1,63 +0,0 @@ -namespace KafkaFlow.Retry.Durable.Polling -{ - using Dawn; - using KafkaFlow.Retry.Durable.Definitions; - using KafkaFlow.Retry.Durable.Encoders; - using KafkaFlow.Retry.Durable.Repository; - using KafkaFlow.Retry.Durable.Repository.Adapters; - using Quartz; - - internal class JobDetailProvider : IJobDetailProvider - { - private readonly ILogHandler logHandler; - private readonly IMessageAdapter messageAdapter; - private readonly IMessageHeadersAdapter messageHeadersAdapter; - private readonly IMessageProducer retryDurableMessageProducer; - private readonly RetryDurablePollingDefinition retryDurablePollingDefinition; - private readonly IRetryDurableQueueRepository retryDurableQueueRepository; - private readonly IUtf8Encoder utf8Encoder; - - public JobDetailProvider( - IRetryDurableQueueRepository retryDurableQueueRepository, - ILogHandler logHandler, - IMessageHeadersAdapter messageHeadersAdapter, - IMessageAdapter messageAdapter, - IUtf8Encoder utf8Encoder, - IMessageProducer retryDurableMessageProducer, - RetryDurablePollingDefinition retryDurablePollingDefinition) - { - Guard.Argument(retryDurableQueueRepository).NotNull(); - Guard.Argument(logHandler).NotNull(); - Guard.Argument(messageHeadersAdapter).NotNull(); - Guard.Argument(messageAdapter).NotNull(); - Guard.Argument(utf8Encoder).NotNull(); - Guard.Argument(retryDurableMessageProducer).NotNull(); - Guard.Argument(retryDurablePollingDefinition).NotNull(); - - this.retryDurableQueueRepository = retryDurableQueueRepository; - this.logHandler = logHandler; - this.messageHeadersAdapter = messageHeadersAdapter; - this.messageAdapter = messageAdapter; - this.utf8Encoder = utf8Encoder; - this.retryDurableMessageProducer = retryDurableMessageProducer; - this.retryDurablePollingDefinition = retryDurablePollingDefinition; - } - - public IJobDetail GetQueuePollingJobDetail() - { - var dataMap = new JobDataMap(); - dataMap.Add(QueuePollingJobConstants.RetryDurableQueueRepository, this.retryDurableQueueRepository); - dataMap.Add(QueuePollingJobConstants.RetryDurableMessageProducer, this.retryDurableMessageProducer); - dataMap.Add(QueuePollingJobConstants.RetryDurablePollingDefinition, this.retryDurablePollingDefinition); - dataMap.Add(QueuePollingJobConstants.LogHandler, this.logHandler); - dataMap.Add(QueuePollingJobConstants.MessageHeadersAdapter, this.messageHeadersAdapter); - dataMap.Add(QueuePollingJobConstants.MessageAdapter, this.messageAdapter); - dataMap.Add(QueuePollingJobConstants.Utf8Encoder, this.utf8Encoder); - - return JobBuilder - .Create() - .SetJobData(dataMap) - .Build(); - } - } -} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/Jobs/CleanupJobDataProvider.cs b/src/KafkaFlow.Retry/Durable/Polling/Jobs/CleanupJobDataProvider.cs new file mode 100644 index 00000000..29a78b0d --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Polling/Jobs/CleanupJobDataProvider.cs @@ -0,0 +1,55 @@ +namespace KafkaFlow.Retry.Durable.Polling.Jobs +{ + using Dawn; + using KafkaFlow.Retry.Durable.Definitions.Polling; + using KafkaFlow.Retry.Durable.Repository; + using Quartz; + + internal class CleanupJobDataProvider : IJobDataProvider + { + private readonly CleanupPollingDefinition cleanupPollingDefinition; + private readonly ILogHandler logHandler; + private readonly IRetryDurableQueueRepository retryDurableQueueRepository; + private readonly string schedulerId; + private readonly ITrigger trigger; + + public CleanupJobDataProvider( + CleanupPollingDefinition cleanupPollingDefinition, + ITrigger trigger, + string schedulerId, + IRetryDurableQueueRepository retryDurableQueueRepository, + ILogHandler logHandler) + { + Guard.Argument(cleanupPollingDefinition, nameof(cleanupPollingDefinition)).NotNull(); + Guard.Argument(trigger, nameof(trigger)).NotNull(); + Guard.Argument(schedulerId, nameof(schedulerId)).NotNull().NotEmpty(); + Guard.Argument(retryDurableQueueRepository, nameof(retryDurableQueueRepository)).NotNull(); + Guard.Argument(logHandler, nameof(logHandler)).NotNull(); + + this.cleanupPollingDefinition = cleanupPollingDefinition; + this.trigger = trigger; + this.schedulerId = schedulerId; + this.retryDurableQueueRepository = retryDurableQueueRepository; + this.logHandler = logHandler; + } + + public PollingDefinition PollingDefinition => this.cleanupPollingDefinition; + + public ITrigger Trigger => this.trigger; + + public IJobDetail GetPollingJobDetail() + { + var dataMap = new JobDataMap(); + + dataMap.Add(PollingJobConstants.CleanupPollingDefinition, this.cleanupPollingDefinition); + dataMap.Add(PollingJobConstants.SchedulerId, this.schedulerId); + dataMap.Add(PollingJobConstants.RetryDurableQueueRepository, this.retryDurableQueueRepository); + dataMap.Add(PollingJobConstants.LogHandler, this.logHandler); + + return JobBuilder + .Create() + .SetJobData(dataMap) + .Build(); + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/Jobs/CleanupPollingJob.cs b/src/KafkaFlow.Retry/Durable/Polling/Jobs/CleanupPollingJob.cs new file mode 100644 index 00000000..b8af08aa --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Polling/Jobs/CleanupPollingJob.cs @@ -0,0 +1,60 @@ +namespace KafkaFlow.Retry.Durable.Polling.Jobs +{ + using System; + using System.Threading.Tasks; + using KafkaFlow.Retry.Durable.Definitions.Polling; + using KafkaFlow.Retry.Durable.Polling.Extensions; + using KafkaFlow.Retry.Durable.Repository; + using KafkaFlow.Retry.Durable.Repository.Actions.Delete; + using KafkaFlow.Retry.Durable.Repository.Model; + using Quartz; + + [DisallowConcurrentExecutionAttribute] + internal class CleanupPollingJob : IJob + { + public async Task Execute(IJobExecutionContext context) + { + var jobDataMap = context.JobDetail.JobDataMap; + + var cleanupPollingDefinition = jobDataMap.GetValidValue(PollingJobConstants.CleanupPollingDefinition, nameof(CleanupPollingJob)); + var schedulerId = jobDataMap.GetValidStringValue(PollingJobConstants.SchedulerId, nameof(CleanupPollingJob)); + var retryDurableQueueRepository = jobDataMap.GetValidValue(PollingJobConstants.RetryDurableQueueRepository, nameof(CleanupPollingJob)); + var logHandler = jobDataMap.GetValidValue(PollingJobConstants.LogHandler, nameof(CleanupPollingJob)); + + try + { + logHandler.Info( + $"{nameof(CleanupPollingJob)} starts execution", + new + { + Name = context.Trigger.Key.Name + } + ); + + var maxLastExecutionDateToBeKept = DateTime.UtcNow.AddDays(-1 * cleanupPollingDefinition.TimeToLiveInDays); + + var deleteQueuesInput = new DeleteQueuesInput( + schedulerId, + RetryQueueStatus.Done, + maxLastExecutionDateToBeKept, + cleanupPollingDefinition.RowsPerRequest); + + var deleteQueuesResult = await retryDurableQueueRepository.DeleteQueuesAsync(deleteQueuesInput).ConfigureAwait(false); + + var logObj = new + { + TimeToLiveInDays = cleanupPollingDefinition.TimeToLiveInDays, + MaxLastExecutionDateToBeKept = maxLastExecutionDateToBeKept, + MaxQueuesThatCanBeDeleted = cleanupPollingDefinition.RowsPerRequest, + TotalQueuesDeleted = deleteQueuesResult.TotalQueuesDeleted + }; + + logHandler.Info($"{nameof(CleanupPollingJob)} executed successfully.", logObj); + } + catch (Exception ex) + { + logHandler.Error($"Exception on {nameof(CleanupPollingJob)} execution", ex, null); + } + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/QueuePollingJobConstants.cs b/src/KafkaFlow.Retry/Durable/Polling/Jobs/PollingJobConstants.cs similarity index 66% rename from src/KafkaFlow.Retry/Durable/Polling/QueuePollingJobConstants.cs rename to src/KafkaFlow.Retry/Durable/Polling/Jobs/PollingJobConstants.cs index ce5c22b3..717a634c 100644 --- a/src/KafkaFlow.Retry/Durable/Polling/QueuePollingJobConstants.cs +++ b/src/KafkaFlow.Retry/Durable/Polling/Jobs/PollingJobConstants.cs @@ -1,13 +1,14 @@ -namespace KafkaFlow.Retry.Durable.Polling +namespace KafkaFlow.Retry.Durable.Polling.Jobs { - internal static class QueuePollingJobConstants + internal static class PollingJobConstants { + public const string CleanupPollingDefinition = "CleanupPollingDefinition"; public const string LogHandler = "LogHandler"; - public const string MessageAdapter = "MessageAdapter"; public const string MessageHeadersAdapter = "MessageHeadersAdapter"; public const string RetryDurableMessageProducer = "RetryDurableProducer"; public const string RetryDurablePollingDefinition = "RetryDurablePollingDefinition"; public const string RetryDurableQueueRepository = "RetryDurableQueueRepository"; + public const string SchedulerId = "SchedulerId"; public const string Utf8Encoder = "Utf8Encoder"; } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurableJobDataProvider.cs b/src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurableJobDataProvider.cs new file mode 100644 index 00000000..6c2a7841 --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurableJobDataProvider.cs @@ -0,0 +1,72 @@ +namespace KafkaFlow.Retry.Durable.Polling.Jobs +{ + using Dawn; + using KafkaFlow.Retry.Durable.Definitions.Polling; + using KafkaFlow.Retry.Durable.Encoders; + using KafkaFlow.Retry.Durable.Repository; + using KafkaFlow.Retry.Durable.Repository.Adapters; + using Quartz; + + internal class RetryDurableJobDataProvider : IJobDataProvider + { + private readonly ILogHandler logHandler; + private readonly IMessageHeadersAdapter messageHeadersAdapter; + private readonly IMessageProducer retryDurableMessageProducer; + private readonly RetryDurablePollingDefinition retryDurablePollingDefinition; + private readonly IRetryDurableQueueRepository retryDurableQueueRepository; + private readonly string schedulerId; + private readonly ITrigger trigger; + private readonly IUtf8Encoder utf8Encoder; + + public RetryDurableJobDataProvider( + RetryDurablePollingDefinition retryDurablePollingDefinition, + ITrigger trigger, + string schedulerId, + IRetryDurableQueueRepository retryDurableQueueRepository, + ILogHandler logHandler, + IMessageHeadersAdapter messageHeadersAdapter, + IUtf8Encoder utf8Encoder, + IMessageProducer retryDurableMessageProducer) + { + Guard.Argument(retryDurablePollingDefinition, nameof(retryDurablePollingDefinition)).NotNull(); + Guard.Argument(trigger, nameof(trigger)).NotNull(); + Guard.Argument(schedulerId, nameof(schedulerId)).NotNull().NotEmpty(); + Guard.Argument(retryDurableQueueRepository, nameof(retryDurableQueueRepository)).NotNull(); + Guard.Argument(logHandler, nameof(logHandler)).NotNull(); + Guard.Argument(messageHeadersAdapter, nameof(messageHeadersAdapter)).NotNull(); + Guard.Argument(utf8Encoder, nameof(utf8Encoder)).NotNull(); + Guard.Argument(retryDurableMessageProducer, nameof(retryDurableMessageProducer)).NotNull(); + + this.retryDurablePollingDefinition = retryDurablePollingDefinition; + this.trigger = trigger; + this.schedulerId = schedulerId; + this.retryDurableQueueRepository = retryDurableQueueRepository; + this.logHandler = logHandler; + this.messageHeadersAdapter = messageHeadersAdapter; + this.utf8Encoder = utf8Encoder; + this.retryDurableMessageProducer = retryDurableMessageProducer; + } + + public PollingDefinition PollingDefinition => this.retryDurablePollingDefinition; + + public ITrigger Trigger => this.trigger; + + public IJobDetail GetPollingJobDetail() + { + var dataMap = new JobDataMap(); + + dataMap.Add(PollingJobConstants.RetryDurablePollingDefinition, this.retryDurablePollingDefinition); + dataMap.Add(PollingJobConstants.SchedulerId, this.schedulerId); + dataMap.Add(PollingJobConstants.RetryDurableQueueRepository, this.retryDurableQueueRepository); + dataMap.Add(PollingJobConstants.LogHandler, this.logHandler); + dataMap.Add(PollingJobConstants.MessageHeadersAdapter, this.messageHeadersAdapter); + dataMap.Add(PollingJobConstants.Utf8Encoder, this.utf8Encoder); + dataMap.Add(PollingJobConstants.RetryDurableMessageProducer, this.retryDurableMessageProducer); + + return JobBuilder + .Create() + .SetJobData(dataMap) + .Build(); + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/QueuePollingJob.cs b/src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurablePollingJob.cs similarity index 66% rename from src/KafkaFlow.Retry/Durable/Polling/QueuePollingJob.cs rename to src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurablePollingJob.cs index e2dede32..2b0f26e4 100644 --- a/src/KafkaFlow.Retry/Durable/Polling/QueuePollingJob.cs +++ b/src/KafkaFlow.Retry/Durable/Polling/Jobs/RetryDurablePollingJob.cs @@ -1,11 +1,12 @@ -namespace KafkaFlow.Retry.Durable.Polling +namespace KafkaFlow.Retry.Durable.Polling.Jobs { using System; using System.Linq; using System.Threading.Tasks; using Dawn; - using KafkaFlow.Retry.Durable.Definitions; + using KafkaFlow.Retry.Durable.Definitions.Polling; using KafkaFlow.Retry.Durable.Encoders; + using KafkaFlow.Retry.Durable.Polling.Extensions; using KafkaFlow.Retry.Durable.Repository; using KafkaFlow.Retry.Durable.Repository.Actions.Read; using KafkaFlow.Retry.Durable.Repository.Actions.Update; @@ -13,8 +14,8 @@ using KafkaFlow.Retry.Durable.Repository.Model; using Quartz; - [Quartz.DisallowConcurrentExecutionAttribute()] - internal class QueuePollingJob : IJob + [DisallowConcurrentExecutionAttribute] + internal class RetryDurablePollingJob : IJob { private TimeSpan expirationInterval = TimeSpan.Zero; @@ -22,47 +23,18 @@ public async Task Execute(IJobExecutionContext context) { var jobDataMap = context.JobDetail.JobDataMap; - Guard.Argument(jobDataMap.ContainsKey(QueuePollingJobConstants.RetryDurableQueueRepository), QueuePollingJobConstants.RetryDurableQueueRepository) - .True("Argument RetryDurableQueueRepository wasn't found and is required for this job"); - - Guard.Argument(jobDataMap.ContainsKey(QueuePollingJobConstants.RetryDurableMessageProducer), QueuePollingJobConstants.RetryDurableMessageProducer) - .True("Argument RetryDurableProducer wasn't found and is required for this job"); - - Guard.Argument(jobDataMap.ContainsKey(QueuePollingJobConstants.RetryDurablePollingDefinition), QueuePollingJobConstants.RetryDurablePollingDefinition) - .True("Argument RetryDurablePollingDefinition wasn't found and is required for this job"); - - Guard.Argument(jobDataMap.ContainsKey(QueuePollingJobConstants.LogHandler), QueuePollingJobConstants.LogHandler) - .True("Argument LogHandler wasn't found and is required for this job"); - - Guard.Argument(jobDataMap.ContainsKey(QueuePollingJobConstants.MessageHeadersAdapter), QueuePollingJobConstants.MessageHeadersAdapter) - .True("Argument MessageHeadersAdapter wasn't found and is required for this job"); - - Guard.Argument(jobDataMap.ContainsKey(QueuePollingJobConstants.MessageAdapter), QueuePollingJobConstants.MessageAdapter) - .True("Argument MessageAdapter wasn't found and is required for this job"); - - Guard.Argument(jobDataMap.ContainsKey(QueuePollingJobConstants.Utf8Encoder), QueuePollingJobConstants.Utf8Encoder) - .True("Argument Utf8Encoder wasn't found and is required for this job"); - - var retryDurableQueueRepository = jobDataMap[QueuePollingJobConstants.RetryDurableQueueRepository] as IRetryDurableQueueRepository; - var retryDurableProducer = jobDataMap[QueuePollingJobConstants.RetryDurableMessageProducer] as IMessageProducer; - var retryDurablePollingDefinition = jobDataMap[QueuePollingJobConstants.RetryDurablePollingDefinition] as RetryDurablePollingDefinition; - var logHandler = jobDataMap[QueuePollingJobConstants.LogHandler] as ILogHandler; - var messageHeadersAdapter = jobDataMap[QueuePollingJobConstants.MessageHeadersAdapter] as IMessageHeadersAdapter; - var messageAdapter = jobDataMap[QueuePollingJobConstants.MessageAdapter] as IMessageAdapter; - var utf8Encoder = jobDataMap[QueuePollingJobConstants.Utf8Encoder] as IUtf8Encoder; - - Guard.Argument(retryDurableQueueRepository).NotNull(); - Guard.Argument(retryDurableProducer).NotNull(); - Guard.Argument(retryDurablePollingDefinition).NotNull(); - Guard.Argument(logHandler).NotNull(); - Guard.Argument(messageHeadersAdapter).NotNull(); - Guard.Argument(messageAdapter).NotNull(); - Guard.Argument(utf8Encoder).NotNull(); + var retryDurablePollingDefinition = jobDataMap.GetValidValue(PollingJobConstants.RetryDurablePollingDefinition, nameof(RetryDurablePollingJob)); + var schedulerId = jobDataMap.GetValidStringValue(PollingJobConstants.SchedulerId, nameof(RetryDurablePollingJob)); + var retryDurableQueueRepository = jobDataMap.GetValidValue(PollingJobConstants.RetryDurableQueueRepository, nameof(RetryDurablePollingJob)); + var logHandler = jobDataMap.GetValidValue(PollingJobConstants.LogHandler, nameof(RetryDurablePollingJob)); + var messageHeadersAdapter = jobDataMap.GetValidValue(PollingJobConstants.MessageHeadersAdapter, nameof(RetryDurablePollingJob)); + var utf8Encoder = jobDataMap.GetValidValue(PollingJobConstants.Utf8Encoder, nameof(RetryDurablePollingJob)); + var retryDurableProducer = jobDataMap.GetValidValue(PollingJobConstants.RetryDurableMessageProducer, nameof(RetryDurablePollingJob)); try { logHandler.Info( - "PollingJob starts execution", + $"{nameof(RetryDurablePollingJob)} starts execution", new { Name = context.Trigger.Key.Name @@ -81,7 +53,7 @@ public async Task Execute(IJobExecutionContext context) ) ) { - SearchGroupKey = retryDurablePollingDefinition.Id + SearchGroupKey = schedulerId }; var activeQueues = await retryDurableQueueRepository @@ -89,7 +61,7 @@ public async Task Execute(IJobExecutionContext context) .ConfigureAwait(false); logHandler.Verbose( - "PollingJob number of active queues", + $"{nameof(RetryDurablePollingJob)} number of active queues", new { activeQueues @@ -101,7 +73,7 @@ public async Task Execute(IJobExecutionContext context) if (!queue.Items.Any()) { logHandler.Verbose( - "PollingJob queue with no items", + $"{nameof(RetryDurablePollingJob)} queue with no items", new { QueueId = queue.Id, @@ -117,7 +89,7 @@ public async Task Execute(IJobExecutionContext context) if (!this.IsAbleToBeProduced(item, retryDurablePollingDefinition)) { logHandler.Verbose( - "PollingJob queue item is not able to be produced", + $"{nameof(RetryDurablePollingJob)} queue item is not able to be produced", new { QueueId = queue.Id, @@ -148,7 +120,7 @@ await retryDurableProducer ).ConfigureAwait(false); logHandler.Verbose( - "PollingJob queue item produced", + $"{nameof(RetryDurablePollingJob)} queue item produced", new { QueueId = queue.Id, @@ -162,7 +134,7 @@ await retryDurableProducer catch (Exception ex) { logHandler.Error( - "Exception on queue PollingJob execution producing to retry topic", + $"Exception on queue {nameof(RetryDurablePollingJob)} execution producing to retry topic", ex, new { @@ -184,11 +156,11 @@ await retryDurableQueueRepository } catch (RetryDurableException rdex) { - logHandler.Error("RetryDurableException on queue PollingJob execution", rdex, null); + logHandler.Error($"RetryDurableException on queue {nameof(RetryDurablePollingJob)} execution", rdex, null); } catch (Exception ex) { - logHandler.Error("Exception on queue PollingJob execution", ex, null); + logHandler.Error($"Exception on queue {nameof(RetryDurablePollingJob)} execution", ex, null); } } diff --git a/src/KafkaFlow.Retry/Durable/Polling/QueueTracker.cs b/src/KafkaFlow.Retry/Durable/Polling/QueueTracker.cs index d7d87340..3510a48b 100644 --- a/src/KafkaFlow.Retry/Durable/Polling/QueueTracker.cs +++ b/src/KafkaFlow.Retry/Durable/Polling/QueueTracker.cs @@ -1,39 +1,35 @@ namespace KafkaFlow.Retry.Durable.Polling { using System; + using System.Collections.Generic; using System.Collections.Specialized; using System.Threading; + using System.Threading.Tasks; using Dawn; - using KafkaFlow.Retry.Durable.Definitions; using Quartz; using Quartz.Impl; internal class QueueTracker { private static readonly object internalLock = new object(); - private readonly IJobDetail job; + private readonly IEnumerable jobDataProviders; private readonly ILogHandler logHandler; - private readonly RetryDurablePollingDefinition retryDurablePollingDefinition; - private readonly ITrigger trigger; + private readonly string schedulerId; private IScheduler scheduler; public QueueTracker( - ILogHandler logHandler, - RetryDurablePollingDefinition retryDurablePollingDefinition, - IJobDetailProvider jobDetailProvider, - ITriggerProvider triggerProvider + string schedulerId, + IEnumerable jobDataProviders, + ILogHandler logHandler ) { + Guard.Argument(schedulerId, nameof(schedulerId)).NotNull().NotEmpty(); + Guard.Argument(jobDataProviders).NotNull().NotEmpty(); Guard.Argument(logHandler).NotNull(); - Guard.Argument(retryDurablePollingDefinition).NotNull(); - Guard.Argument(jobDetailProvider).NotNull(); - Guard.Argument(triggerProvider).NotNull(); + this.schedulerId = schedulerId; + this.jobDataProviders = jobDataProviders; this.logHandler = logHandler; - this.retryDurablePollingDefinition = retryDurablePollingDefinition; - - this.job = jobDetailProvider.GetQueuePollingJobDetail(); - this.trigger = triggerProvider.GetQueuePollingTrigger(); } private bool IsSchedulerActive @@ -41,90 +37,123 @@ private bool IsSchedulerActive && this.scheduler.IsStarted && !this.scheduler.IsShutdown; - internal void ScheduleJob(CancellationToken cancellationToken = default) + internal async Task ScheduleJobsAsync(CancellationToken cancellationToken = default) { try { - Guard.Argument(this.scheduler).Null(s => "Scheduler was already started. Please call this method just once."); + await this.StartSchedulerAsync(cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + this.logHandler.Error("PollingJob ERROR starting scheduler", ex, new { SchedulerId = this.schedulerId }); + return; + } - lock (internalLock) + foreach (var jobDataProvider in this.jobDataProviders) + { + if (!jobDataProvider.PollingDefinition.Enabled) { - StdSchedulerFactory fact = new StdSchedulerFactory(); - fact.Initialize(new NameValueCollection { { "quartz.scheduler.instanceName", this.retryDurablePollingDefinition.Id } }); - this.scheduler = fact.GetScheduler(cancellationToken).GetAwaiter().GetResult(); - - this.logHandler.Info( - "PollingJob Scheduler Acquired", + this.logHandler.Warning( + "PollingJob Scheduler not enabled", new { - PollingId = this.retryDurablePollingDefinition.Id, - CronExpression = this.retryDurablePollingDefinition.CronExpression + SchedulerId = this.schedulerId, + PollingJobType = jobDataProvider.PollingDefinition.PollingJobType.ToString(), + CronExpression = jobDataProvider.PollingDefinition.CronExpression }); + + continue; } - if (!this.IsSchedulerActive) - { - this.scheduler - .Start(cancellationToken) - .GetAwaiter() - .GetResult(); + await this.ScheduleJobAsync(jobDataProvider, cancellationToken).ConfigureAwait(false); + } + } - this.logHandler.Info( - "PollingJob Scheduler Started", - new - { - PollingId = this.retryDurablePollingDefinition.Id, - CronExpression = this.retryDurablePollingDefinition.CronExpression - }); + internal async Task UnscheduleJobsAsync(CancellationToken cancellationToken = default) + { + foreach (var jobDataProvider in this.jobDataProviders) + { + if (!jobDataProvider.PollingDefinition.Enabled) + { + continue; } - var scheduledJob = this.scheduler - .ScheduleJob(this.job, this.trigger, cancellationToken) - .GetAwaiter() - .GetResult(); + var trigger = jobDataProvider.Trigger; + + this.logHandler.Info( + "PollingJob unscheduler started", + new + { + SchedulerId = this.schedulerId, + PollingJobType = jobDataProvider.PollingDefinition.PollingJobType.ToString(), + TriggerKey = trigger.Key.ToString() + }); + + var unscheduledJob = await this.scheduler.UnscheduleJob(trigger.Key).ConfigureAwait(false); + + this.logHandler.Info("PollingJob unscheduler finished", + new + { + SchedulerId = this.schedulerId, + PollingJobType = jobDataProvider.PollingDefinition.PollingJobType.ToString(), + TriggerKey = trigger.Key.ToString(), + UnscheduledJob = unscheduledJob.ToString() + }); + } + } + + private async Task ScheduleJobAsync(IJobDataProvider jobDataProvider, CancellationToken cancellationToken) + { + try + { + var job = jobDataProvider.GetPollingJobDetail(); + var trigger = jobDataProvider.Trigger; + + var scheduledJob = await this.scheduler.ScheduleJob(job, trigger, cancellationToken).ConfigureAwait(false); this.logHandler.Info( - "PollingJob Scheduler Scheduled", + "PollingJob Scheduler scheduled", new { - PollingId = this.retryDurablePollingDefinition.Id, - CronExpression = this.retryDurablePollingDefinition.CronExpression, + SchedulerId = this.schedulerId, + PollingJobType = jobDataProvider.PollingDefinition.PollingJobType.ToString(), + CronExpression = jobDataProvider.PollingDefinition.CronExpression, ScheduleJob = scheduledJob.ToString() }); } catch (Exception ex) { this.logHandler.Error( - "PollingJob Scheduler Error", + "PollingJob Scheduler ERROR scheduling", ex, new { - PollingId = this.retryDurablePollingDefinition.Id, - CronExpression = this.retryDurablePollingDefinition.CronExpression + SchedulerId = this.schedulerId, + PollingJobType = jobDataProvider.PollingDefinition.PollingJobType.ToString(), + CronExpression = jobDataProvider.PollingDefinition.CronExpression }); } } - internal void UnscheduleJob(CancellationToken cancellationToken = default) + private async Task StartSchedulerAsync(CancellationToken cancellationToken) { - this.logHandler.Info( - "PollingJob Unscheduler Started", - new - { - TriggerKey = this.trigger.Key.ToString() - }); + lock (internalLock) + { + Guard.Argument(this.scheduler).Null(s => "Scheduler was already started. Please call this method just once."); - var unscheduledJob = this.scheduler - .UnscheduleJob(this.trigger.Key) - .GetAwaiter() - .GetResult(); + StdSchedulerFactory fact = new StdSchedulerFactory(); + fact.Initialize(new NameValueCollection { { "quartz.scheduler.instanceName", this.schedulerId } }); + this.scheduler = fact.GetScheduler(cancellationToken).GetAwaiter().GetResult(); - this.logHandler.Info("PollingJob Unscheduler Finished", - new - { - UnscheduledJob = unscheduledJob, - TriggerKey = this.trigger.Key.ToString() - }); + this.logHandler.Info("PollingJob Scheduler acquired", new { SchedulerId = this.schedulerId }); + } + + if (!this.IsSchedulerActive) + { + await this.scheduler.Start(cancellationToken).ConfigureAwait(false); + + this.logHandler.Info("PollingJob Scheduler started", new { SchedulerId = this.schedulerId }); + } } } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/QueueTrackerCoordinator.cs b/src/KafkaFlow.Retry/Durable/Polling/QueueTrackerCoordinator.cs index 40e6132e..b28cd3b3 100644 --- a/src/KafkaFlow.Retry/Durable/Polling/QueueTrackerCoordinator.cs +++ b/src/KafkaFlow.Retry/Durable/Polling/QueueTrackerCoordinator.cs @@ -1,7 +1,7 @@ namespace KafkaFlow.Retry.Durable.Polling { + using System.Threading.Tasks; using Dawn; - using KafkaFlow.Retry.Durable.Definitions; internal class QueueTrackerCoordinator : IQueueTrackerCoordinator { @@ -15,30 +15,19 @@ public QueueTrackerCoordinator(IQueueTrackerFactory queueTrackerFactory) this.queueTrackerFactory = queueTrackerFactory; } - public void ScheduleJob( - RetryDurablePollingDefinition retryDurablePollingDefinition, - IMessageProducer retryDurableMessageProducer, - ILogHandler logHandler) + public async Task ScheduleJobsAsync(IMessageProducer retryDurableMessageProducer, ILogHandler logHandler) { - if (!retryDurablePollingDefinition.Enabled) - { - return; - } - this.queueTracker = this.queueTrackerFactory - .Create( - retryDurablePollingDefinition, - retryDurableMessageProducer, - logHandler); + .Create(retryDurableMessageProducer, logHandler); - this.queueTracker.ScheduleJob(); + await this.queueTracker.ScheduleJobsAsync().ConfigureAwait(false); } - public void UnscheduleJob() + public async Task UnscheduleJobsAsync() { if (this.queueTracker is object) { - this.queueTracker.UnscheduleJob(); + await this.queueTracker.UnscheduleJobsAsync().ConfigureAwait(false); } } } diff --git a/src/KafkaFlow.Retry/Durable/Polling/QueueTrackerFactory.cs b/src/KafkaFlow.Retry/Durable/Polling/QueueTrackerFactory.cs index cb8ee07f..9c6af186 100644 --- a/src/KafkaFlow.Retry/Durable/Polling/QueueTrackerFactory.cs +++ b/src/KafkaFlow.Retry/Durable/Polling/QueueTrackerFactory.cs @@ -1,53 +1,37 @@ namespace KafkaFlow.Retry.Durable.Polling { + using System.Collections.Generic; using Dawn; - using KafkaFlow.Retry.Durable.Definitions; - using KafkaFlow.Retry.Durable.Encoders; - using KafkaFlow.Retry.Durable.Repository; - using KafkaFlow.Retry.Durable.Repository.Adapters; internal class QueueTrackerFactory : IQueueTrackerFactory { - private readonly IMessageAdapter messageAdapter; - private readonly IMessageHeadersAdapter messageHeadersAdapter; - private readonly IRetryDurableQueueRepository retryDurableQueueRepository; - private readonly IUtf8Encoder utf8Encoder; + private readonly IJobDataProvidersFactory jobDataProvidersFactory; + private readonly string schedulerId; + private IEnumerable jobDataProviders; public QueueTrackerFactory( - IRetryDurableQueueRepository retryDurableQueueRepository, - IMessageHeadersAdapter messageHeadersAdapter, - IMessageAdapter messageAdapter, - IUtf8Encoder utf8Encoder + string schedulerId, + IJobDataProvidersFactory jobDataProvidersFactory ) { - Guard.Argument(retryDurableQueueRepository).NotNull(); - Guard.Argument(messageHeadersAdapter).NotNull(); - Guard.Argument(messageAdapter).NotNull(); - Guard.Argument(utf8Encoder).NotNull(); + Guard.Argument(schedulerId, nameof(schedulerId)).NotNull().NotEmpty(); + Guard.Argument(jobDataProvidersFactory, nameof(jobDataProvidersFactory)).NotNull(); - this.retryDurableQueueRepository = retryDurableQueueRepository; - this.messageHeadersAdapter = messageHeadersAdapter; - this.messageAdapter = messageAdapter; - this.utf8Encoder = utf8Encoder; + this.schedulerId = schedulerId; + this.jobDataProvidersFactory = jobDataProvidersFactory; } - public QueueTracker Create( - RetryDurablePollingDefinition retryDurablePollingDefinition, - IMessageProducer retryDurableMessageProducer, - ILogHandler logHandler) + public QueueTracker Create(IMessageProducer retryDurableMessageProducer, ILogHandler logHandler) { + if (this.jobDataProviders is null) + { + this.jobDataProviders = this.jobDataProvidersFactory.Create(retryDurableMessageProducer, logHandler); + } + return new QueueTracker( - logHandler, - retryDurablePollingDefinition, - new JobDetailProvider( - this.retryDurableQueueRepository, - logHandler, - this.messageHeadersAdapter, - this.messageAdapter, - this.utf8Encoder, - retryDurableMessageProducer, - retryDurablePollingDefinition), - new TriggerProvider(retryDurablePollingDefinition)); + this.schedulerId, + this.jobDataProviders, + logHandler); } } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/TriggerProvider.cs b/src/KafkaFlow.Retry/Durable/Polling/TriggerProvider.cs index 7f062cf2..2b8081a8 100644 --- a/src/KafkaFlow.Retry/Durable/Polling/TriggerProvider.cs +++ b/src/KafkaFlow.Retry/Durable/Polling/TriggerProvider.cs @@ -1,29 +1,38 @@ namespace KafkaFlow.Retry.Durable.Polling { - using Dawn; - using KafkaFlow.Retry.Durable.Definitions; + using System.Collections.Generic; + using KafkaFlow.Retry.Durable.Definitions.Polling; using Quartz; internal class TriggerProvider : ITriggerProvider { - private readonly RetryDurablePollingDefinition retryDurablePollingDefinition; + private readonly IDictionary triggers; - public TriggerProvider(RetryDurablePollingDefinition retryDurablePollingDefinition) + public TriggerProvider() { - Guard.Argument(retryDurablePollingDefinition).NotNull(); - - this.retryDurablePollingDefinition = retryDurablePollingDefinition; + this.triggers = new Dictionary(); } - public ITrigger GetQueuePollingTrigger() + public ITrigger GetPollingTrigger(string schedulerId, PollingDefinition pollingDefinition) { - return TriggerBuilder - .Create() - .WithIdentity($"pollingJob_{this.retryDurablePollingDefinition.Id}", "queueTrackerGroup") - .WithCronSchedule(this.retryDurablePollingDefinition.CronExpression) - .StartNow() - .WithPriority(1) - .Build(); + var triggerId = $"pollingJobTrigger_{schedulerId}_{pollingDefinition.PollingJobType}"; + + if (this.triggers.TryGetValue(triggerId, out var triggerAlreadyCreated)) + { + return triggerAlreadyCreated; + } + + var trigger = TriggerBuilder + .Create() + .WithIdentity(triggerId, "queueTrackerGroup") + .WithCronSchedule(pollingDefinition.CronExpression) + .StartNow() + .WithPriority(1) + .Build(); + + this.triggers.Add(triggerId, trigger); + + return trigger; } } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Repository/Actions/Delete/DeleteQueuesInput.cs b/src/KafkaFlow.Retry/Durable/Repository/Actions/Delete/DeleteQueuesInput.cs new file mode 100644 index 00000000..83f92e4e --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Repository/Actions/Delete/DeleteQueuesInput.cs @@ -0,0 +1,34 @@ +namespace KafkaFlow.Retry.Durable.Repository.Actions.Delete +{ + using System; + using Dawn; + using KafkaFlow.Retry.Durable.Repository.Model; + + public class DeleteQueuesInput + { + public DeleteQueuesInput( + string searchGroupKey, + RetryQueueStatus retryQueueStatus, + DateTime maxLastExecutionDateToBeKept, + int maxRowsToDelete) + { + Guard.Argument(searchGroupKey, nameof(searchGroupKey)).NotNull().NotEmpty(); + Guard.Argument(retryQueueStatus, nameof(retryQueueStatus)).NotDefault(); + Guard.Argument(maxLastExecutionDateToBeKept, nameof(maxLastExecutionDateToBeKept)).NotDefault(); + Guard.Argument(maxRowsToDelete, nameof(maxRowsToDelete)).Positive(); + + this.SearchGroupKey = searchGroupKey; + this.RetryQueueStatus = retryQueueStatus; + this.MaxLastExecutionDateToBeKept = maxLastExecutionDateToBeKept; + this.MaxRowsToDelete = maxRowsToDelete; + } + + public DateTime MaxLastExecutionDateToBeKept { get; } + + public int MaxRowsToDelete { get; } + + public RetryQueueStatus RetryQueueStatus { get; } + + public string SearchGroupKey { get; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Repository/Actions/Delete/DeleteQueuesResult.cs b/src/KafkaFlow.Retry/Durable/Repository/Actions/Delete/DeleteQueuesResult.cs new file mode 100644 index 00000000..aa79ac50 --- /dev/null +++ b/src/KafkaFlow.Retry/Durable/Repository/Actions/Delete/DeleteQueuesResult.cs @@ -0,0 +1,16 @@ +namespace KafkaFlow.Retry.Durable.Repository.Actions.Delete +{ + using Dawn; + + public class DeleteQueuesResult + { + public DeleteQueuesResult(int totalQueuesDeleted) + { + Guard.Argument(totalQueuesDeleted, nameof(totalQueuesDeleted)).NotNegative(); + + this.TotalQueuesDeleted = totalQueuesDeleted; + } + + public int TotalQueuesDeleted { get; } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Repository/IRetryDurableQueueRepository.cs b/src/KafkaFlow.Retry/Durable/Repository/IRetryDurableQueueRepository.cs index 8eb81062..46518ed4 100644 --- a/src/KafkaFlow.Retry/Durable/Repository/IRetryDurableQueueRepository.cs +++ b/src/KafkaFlow.Retry/Durable/Repository/IRetryDurableQueueRepository.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Threading.Tasks; using KafkaFlow.Retry.Durable.Repository.Actions.Create; + using KafkaFlow.Retry.Durable.Repository.Actions.Delete; using KafkaFlow.Retry.Durable.Repository.Actions.Read; using KafkaFlow.Retry.Durable.Repository.Actions.Update; using KafkaFlow.Retry.Durable.Repository.Model; @@ -15,6 +16,8 @@ internal interface IRetryDurableQueueRepository Task CheckQueuePendingItemsAsync(QueuePendingItemsInput queuePendingItemsInput); + Task DeleteQueuesAsync(DeleteQueuesInput deleteQueuesInput); + Task> GetRetryQueuesAsync(GetQueuesInput getQueuesInput); Task SaveToQueueAsync(IMessageContext context, string description); diff --git a/src/KafkaFlow.Retry/Durable/Repository/IRetryDurableQueueRepositoryProvider.cs b/src/KafkaFlow.Retry/Durable/Repository/IRetryDurableQueueRepositoryProvider.cs index 221433ed..a22fbfb5 100644 --- a/src/KafkaFlow.Retry/Durable/Repository/IRetryDurableQueueRepositoryProvider.cs +++ b/src/KafkaFlow.Retry/Durable/Repository/IRetryDurableQueueRepositoryProvider.cs @@ -2,6 +2,7 @@ { using System.Threading.Tasks; using KafkaFlow.Retry.Durable.Repository.Actions.Create; + using KafkaFlow.Retry.Durable.Repository.Actions.Delete; using KafkaFlow.Retry.Durable.Repository.Actions.Read; using KafkaFlow.Retry.Durable.Repository.Actions.Update; @@ -13,6 +14,8 @@ public interface IRetryDurableQueueRepositoryProvider Task CheckQueuePendingItemsAsync(QueuePendingItemsInput input); + Task DeleteQueuesAsync(DeleteQueuesInput input); + Task GetQueuesAsync(GetQueuesInput input); Task SaveToQueueAsync(SaveToQueueInput input); diff --git a/src/KafkaFlow.Retry/Durable/Repository/NullRetryDurableQueueRepository.cs b/src/KafkaFlow.Retry/Durable/Repository/NullRetryDurableQueueRepository.cs index 4c064b96..daf5762a 100644 --- a/src/KafkaFlow.Retry/Durable/Repository/NullRetryDurableQueueRepository.cs +++ b/src/KafkaFlow.Retry/Durable/Repository/NullRetryDurableQueueRepository.cs @@ -5,6 +5,7 @@ using System.Linq; using System.Threading.Tasks; using KafkaFlow.Retry.Durable.Repository.Actions.Create; + using KafkaFlow.Retry.Durable.Repository.Actions.Delete; using KafkaFlow.Retry.Durable.Repository.Actions.Read; using KafkaFlow.Retry.Durable.Repository.Actions.Update; using KafkaFlow.Retry.Durable.Repository.Model; @@ -12,7 +13,7 @@ [ExcludeFromCodeCoverage] internal class NullRetryDurableQueueRepository : IRetryDurableQueueRepository { - public readonly static IRetryDurableQueueRepository Instance = new NullRetryDurableQueueRepository(); + public static readonly IRetryDurableQueueRepository Instance = new NullRetryDurableQueueRepository(); public Task AddIfQueueExistsAsync(IMessageContext context) => Task.FromResult(new AddIfQueueExistsResult(AddIfQueueExistsResultStatus.NoPendingMembers)); @@ -23,6 +24,9 @@ public Task CheckQueueNewestItemsAsync(QueueNewestItemsI public Task CheckQueuePendingItemsAsync(QueuePendingItemsInput queuePendingItemsInput) => Task.FromResult(new QueuePendingItemsResult(QueuePendingItemsResultStatus.NoPendingItems)); + public Task DeleteQueuesAsync(DeleteQueuesInput deleteQueuesInput) + => Task.FromResult(new DeleteQueuesResult(0)); + public Task> GetRetryQueuesAsync(GetQueuesInput getQueuesInput) => Task.FromResult(Enumerable.Empty()); diff --git a/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs b/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs index 337ca79f..708f71b2 100644 --- a/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs +++ b/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs @@ -7,9 +7,10 @@ using Dawn; using KafkaFlow.Retry.Durable; using KafkaFlow.Retry.Durable.Common; - using KafkaFlow.Retry.Durable.Definitions; + using KafkaFlow.Retry.Durable.Definitions.Polling; using KafkaFlow.Retry.Durable.Encoders; using KafkaFlow.Retry.Durable.Repository.Actions.Create; + using KafkaFlow.Retry.Durable.Repository.Actions.Delete; using KafkaFlow.Retry.Durable.Repository.Actions.Read; using KafkaFlow.Retry.Durable.Repository.Actions.Update; using KafkaFlow.Retry.Durable.Repository.Adapters; @@ -22,7 +23,7 @@ internal class RetryDurableQueueRepository : IRetryDurableQueueRepository private const int MaxAttempts = 6; private readonly IMessageAdapter messageAdapter; private readonly IMessageHeadersAdapter messageHeadersAdapter; - private readonly RetryDurablePollingDefinition retryDurablePollingDefinition; + private readonly PollingDefinitionsAggregator pollingDefinitionsAggregator; private readonly IRetryDurableQueueRepositoryProvider retryDurableRepositoryProvider; private readonly IEnumerable updateItemHandlers; private readonly IUtf8Encoder utf8Encoder; @@ -33,7 +34,7 @@ public RetryDurableQueueRepository( IMessageHeadersAdapter messageHeadersAdapter, IMessageAdapter messageAdapter, IUtf8Encoder utf8Encoder, - RetryDurablePollingDefinition retryDurablePollingDefinition) + PollingDefinitionsAggregator pollingDefinitionsAggregator) { Guard.Argument(retryDurableRepositoryProvider).NotNull("Retry durable requires a repository to be defined"); Guard.Argument(updateItemHandlers).NotNull("At least an update item handler should be defined"); @@ -41,14 +42,14 @@ public RetryDurableQueueRepository( Guard.Argument(messageHeadersAdapter).NotNull(); Guard.Argument(messageAdapter).NotNull(); Guard.Argument(utf8Encoder).NotNull(); - Guard.Argument(retryDurablePollingDefinition).NotNull(); + Guard.Argument(pollingDefinitionsAggregator).NotNull(); this.retryDurableRepositoryProvider = retryDurableRepositoryProvider; this.updateItemHandlers = updateItemHandlers; this.messageHeadersAdapter = messageHeadersAdapter; this.messageAdapter = messageAdapter; this.utf8Encoder = utf8Encoder; - this.retryDurablePollingDefinition = retryDurablePollingDefinition; + this.pollingDefinitionsAggregator = pollingDefinitionsAggregator; } public async Task AddIfQueueExistsAsync(IMessageContext context) @@ -71,8 +72,8 @@ public async Task AddIfQueueExistsAsync(IMessageContext context.ConsumerContext.MessageTimestamp, this.messageHeadersAdapter.AdaptMessageHeadersToRepository(context.Headers) ), - this.retryDurablePollingDefinition.Id, - $"{this.retryDurablePollingDefinition.Id}-{this.utf8Encoder.Decode((byte[])context.Message.Key)}", + this.pollingDefinitionsAggregator.SchedulerId, + $"{this.pollingDefinitionsAggregator.SchedulerId}-{this.utf8Encoder.Decode((byte[])context.Message.Key)}", RetryQueueStatus.Active, RetryQueueItemStatus.Waiting, SeverityLevel.Unknown, @@ -130,6 +131,11 @@ public async Task CheckQueuePendingItemsAsync(QueuePend } } + public async Task DeleteQueuesAsync(DeleteQueuesInput deleteQueuesInput) + { + return await this.retryDurableRepositoryProvider.DeleteQueuesAsync(deleteQueuesInput).ConfigureAwait(false); + } + public async Task> GetRetryQueuesAsync(GetQueuesInput getQueuesInput) { try @@ -171,8 +177,8 @@ public async Task SaveToQueueAsync(IMessageContext context, s context.ConsumerContext.MessageTimestamp, this.messageHeadersAdapter.AdaptMessageHeadersToRepository(context.Headers) ), - this.retryDurablePollingDefinition.Id, - $"{this.retryDurablePollingDefinition.Id}-{this.utf8Encoder.Decode((byte[])context.Message.Key)}", + this.pollingDefinitionsAggregator.SchedulerId, + $"{this.pollingDefinitionsAggregator.SchedulerId}-{this.utf8Encoder.Decode((byte[])context.Message.Key)}", RetryQueueStatus.Active, RetryQueueItemStatus.Waiting, SeverityLevel.Unknown,