From f8fe911f4933d9439ac6289fbcb0e1cdbb7aea44 Mon Sep 17 00:00:00 2001 From: Fernando Marins <43406405+fernando-a-marins@users.noreply.github.com> Date: Wed, 12 Jul 2023 17:41:30 +0100 Subject: [PATCH] fix: Fix/129/quartz jobs miss fired problem (#131) --- .github/workflows/build.yml | 4 +- .../Fixtures/BootstrapperHostFixture.cs | 2 +- .../KafkaFlow.Retry.IntegrationTests.csproj | 1 + .../PollingTests/JobDataProviderSurrogate.cs | 46 ++++ .../PollingTests/JobSurrogate.cs | 17 ++ .../QueueTrackerCoordinatorTests.cs | 171 +++++++++++++ .../Properties/AssemblyMetadata.cs | 4 +- .../RetryDurableTests.cs | 2 +- .../Polling/QueueTrackerCoordinatorTests.cs | 4 +- .../Durable/Polling/QueueTrackerTests.cs | 234 ------------------ .../Durable/Polling/TriggerProvider.cs | 4 +- .../Properties/AssemblyInfo.cs | 3 +- 12 files changed, 248 insertions(+), 244 deletions(-) create mode 100644 src/KafkaFlow.Retry.IntegrationTests/PollingTests/JobDataProviderSurrogate.cs create mode 100644 src/KafkaFlow.Retry.IntegrationTests/PollingTests/JobSurrogate.cs create mode 100644 src/KafkaFlow.Retry.IntegrationTests/PollingTests/QueueTrackerCoordinatorTests.cs delete mode 100644 src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerTests.cs diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 5111c23d..e6784451 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -71,8 +71,8 @@ jobs: - name: Start Kafka uses: 280780363/kafka-action@v1.0 with: - kafka version: "latest" # Optional, kafka version - zookeeper version: "latest" # Optional, zookeeper version + kafka version: "3.4.0-debian-11-r15" # Optional, kafka version + zookeeper version: "3.8.1-debian-11-r18" # Optional, zookeeper version kafka port: 9092 # Optional, kafka port. Connect using localhost:9092 zookeeper port: 2181 # Optional, zookeeper port auto create topic: "true" # Optional, auto create kafka topic diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperHostFixture.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperHostFixture.cs index d2f0e68f..59a6cb50 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperHostFixture.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperHostFixture.cs @@ -122,4 +122,4 @@ private void SetupServices(HostBuilderContext context, IServiceCollection servic services.AddSingleton(); } } -} \ No newline at end of file +} diff --git a/src/KafkaFlow.Retry.IntegrationTests/KafkaFlow.Retry.IntegrationTests.csproj b/src/KafkaFlow.Retry.IntegrationTests/KafkaFlow.Retry.IntegrationTests.csproj index 921e611d..62db33e5 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/KafkaFlow.Retry.IntegrationTests.csproj +++ b/src/KafkaFlow.Retry.IntegrationTests/KafkaFlow.Retry.IntegrationTests.csproj @@ -13,6 +13,7 @@ + diff --git a/src/KafkaFlow.Retry.IntegrationTests/PollingTests/JobDataProviderSurrogate.cs b/src/KafkaFlow.Retry.IntegrationTests/PollingTests/JobDataProviderSurrogate.cs new file mode 100644 index 00000000..e73e45d3 --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/PollingTests/JobDataProviderSurrogate.cs @@ -0,0 +1,46 @@ +namespace KafkaFlow.Retry.IntegrationTests.PollingTests +{ + using System.Collections.Generic; + using global::KafkaFlow.Retry.Durable.Definitions.Polling; + using global::KafkaFlow.Retry.Durable.Polling; + using Quartz; + + internal class JobDataProviderSurrogate : IJobDataProvider + { + public JobDataProviderSurrogate(string schedulerId, PollingDefinition pollingDefinition, ITrigger trigger, List jobExecutionContexts) + { + this.PollingDefinition = pollingDefinition; + + this.Trigger = trigger; + this.TriggerName = this.GetTriggerName(schedulerId); + + this.JobExecutionContexts = jobExecutionContexts; + this.JobDetail = this.CreateJobDetail(); + } + + public IJobDetail JobDetail { get; } + + public List JobExecutionContexts { get; } + + public PollingDefinition PollingDefinition { get; } + + public ITrigger Trigger { get; } + + public string TriggerName { get; } + + private IJobDetail CreateJobDetail() + { + var dataMap = new JobDataMap { { "JobExecution", this.JobExecutionContexts } }; + + return JobBuilder + .Create() + .SetJobData(dataMap) + .Build(); + } + + private string GetTriggerName(string schedulerId) + { + return $"pollingJobTrigger_{schedulerId}_{this.PollingDefinition.PollingJobType}"; + } + } +} diff --git a/src/KafkaFlow.Retry.IntegrationTests/PollingTests/JobSurrogate.cs b/src/KafkaFlow.Retry.IntegrationTests/PollingTests/JobSurrogate.cs new file mode 100644 index 00000000..195045c2 --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/PollingTests/JobSurrogate.cs @@ -0,0 +1,17 @@ +namespace KafkaFlow.Retry.IntegrationTests.PollingTests +{ + using System.Collections.Generic; + using System.Threading.Tasks; + using Quartz; + + internal class JobSurrogate : IJob + { + public Task Execute(IJobExecutionContext context) + { + var jobExecutionContexts = context.JobDetail.JobDataMap["JobExecution"] as List; + jobExecutionContexts.Add(context); + + return Task.CompletedTask; + } + } +} diff --git a/src/KafkaFlow.Retry.IntegrationTests/PollingTests/QueueTrackerCoordinatorTests.cs b/src/KafkaFlow.Retry.IntegrationTests/PollingTests/QueueTrackerCoordinatorTests.cs new file mode 100644 index 00000000..a7fb9fa0 --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/PollingTests/QueueTrackerCoordinatorTests.cs @@ -0,0 +1,171 @@ +namespace KafkaFlow.Retry.IntegrationTests.PollingTests +{ + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using FluentAssertions; + using global::KafkaFlow.Retry.Durable.Definitions.Polling; + using global::KafkaFlow.Retry.Durable.Polling; + using Moq; + using Quartz; + using Xunit; + + public class QueueTrackerCoordinatorTests + { + private readonly Mock mockJobDataProvidersFactory; + private readonly ITriggerProvider triggerProvider; + + public QueueTrackerCoordinatorTests() + { + this.triggerProvider = new TriggerProvider(); + + this.mockJobDataProvidersFactory = new Mock(); + } + + [Fact] + public async Task QueueTrackerCoordinator_ForceMisfireJob_SuccessWithCorrectScheduledFiredTimes() + { + // arrange + var schedulerId = "MisfiredJobsDoesNothing"; + var jobExecutionContexts = new List(); + + var waitForScheduleInSeconds = 5; + var jobActiveTimeInSeconds = 8; + var pollingInSeconds = 2; + + var cronExpression = $"*/{pollingInSeconds} * * ? * * *"; + + var retryDurableJobDataProvider = this.CreateRetryDurableJobDataProvider(schedulerId, cronExpression, jobExecutionContexts); + + this.mockJobDataProvidersFactory + .Setup(m => m.Create(It.IsAny(), It.IsAny())) + .Returns(new[] { retryDurableJobDataProvider }); + + var queueTrackerCoordinator = this.CreateQueueTrackerCoordinator(schedulerId); + + // act + + Thread.Sleep(waitForScheduleInSeconds * 1000); + + await queueTrackerCoordinator.ScheduleJobsAsync(Mock.Of(), Mock.Of()); + + Thread.Sleep(jobActiveTimeInSeconds * 1000); + + await queueTrackerCoordinator.UnscheduleJobsAsync(); + + // assert + var scheduledFiredTimes = jobExecutionContexts + .Where(ctx => ctx.ScheduledFireTimeUtc.HasValue) + .Select(ctx => ctx.ScheduledFireTimeUtc.Value) + .OrderBy(x => x) + .ToList(); + + var currentScheduledFiredTime = scheduledFiredTimes.First(); + var otherScheduledFiredTimes = scheduledFiredTimes.Skip(1).ToList(); + + foreach (var scheduledFiredTime in otherScheduledFiredTimes) + { + currentScheduledFiredTime.AddSeconds(pollingInSeconds).Should().Be(scheduledFiredTime); + + currentScheduledFiredTime = scheduledFiredTime; + } + } + + [Fact] + public async Task QueueTrackerCoordinator_ScheduleAndUnscheduleDifferentJobs_Success() + { + // arrange + var schedulerId = "twoJobsSchedulerId"; + var jobExecutionContexts = new List(); + + var timePollingActiveInSeconds = 4; + + var retryDurableCronExpression = "*/2 * * ? * * *"; + var cleanupCronExpression = "*/4 * * ? * * *"; + + var retryDurableMinExpectedJobsFired = 2; + var retryDurableMaxExpectedJobsFired = 3; + var cleanupMinExpectedJobsFired = 1; + var cleanupMaxExpectedJobsFired = 2; + + var retryDurableJobDataProvider = this.CreateRetryDurableJobDataProvider(schedulerId, retryDurableCronExpression, jobExecutionContexts); + var cleanupJobDataProvider = this.CreateCleanupJobDataProvider(schedulerId, cleanupCronExpression, jobExecutionContexts); + + this.mockJobDataProvidersFactory + .Setup(m => m.Create(It.IsAny(), It.IsAny())) + .Returns(new[] { retryDurableJobDataProvider, cleanupJobDataProvider }); + + var queueTrackerCoordinator = this.CreateQueueTrackerCoordinator(schedulerId); + + // act + await queueTrackerCoordinator.ScheduleJobsAsync(Mock.Of(), Mock.Of()); + + Thread.Sleep(timePollingActiveInSeconds * 1000); + + await queueTrackerCoordinator.UnscheduleJobsAsync(); + + // assert + jobExecutionContexts.Where(ctx => !ctx.PreviousFireTimeUtc.HasValue).Should().HaveCount(2); + + var retryDurableFiresContexts = jobExecutionContexts.Where(ctx => ctx.Trigger.Key.Name == retryDurableJobDataProvider.TriggerName); + var cleanupFiresContexts = jobExecutionContexts.Where(ctx => ctx.Trigger.Key.Name == cleanupJobDataProvider.TriggerName); + + retryDurableFiresContexts + .Should() + .HaveCountGreaterThanOrEqualTo(retryDurableMinExpectedJobsFired) + .And + .HaveCountLessThanOrEqualTo(retryDurableMaxExpectedJobsFired); + + retryDurableFiresContexts.Should().ContainSingle(ctx => !ctx.PreviousFireTimeUtc.HasValue); + + cleanupFiresContexts + .Should() + .HaveCountGreaterThanOrEqualTo(cleanupMinExpectedJobsFired) + .And + .HaveCountLessThanOrEqualTo(cleanupMaxExpectedJobsFired); + + cleanupFiresContexts.Should().ContainSingle(ctx => !ctx.PreviousFireTimeUtc.HasValue); + } + + private JobDataProviderSurrogate CreateCleanupJobDataProvider(string schedulerId, string cronExpression, List jobExecutionContexts) + { + var cleanupPollingDefinition = + new CleanupPollingDefinition( + enabled: true, + cronExpression: cronExpression, + timeToLiveInDays: 1, + rowsPerRequest: 10 + ); + + return this.CreateJobDataProvider(schedulerId, cleanupPollingDefinition, jobExecutionContexts); + } + + private JobDataProviderSurrogate CreateJobDataProvider(string schedulerId, PollingDefinition pollingDefinition, List jobExecutionContexts) + { + var trigger = this.triggerProvider.GetPollingTrigger(schedulerId, pollingDefinition); + + return new JobDataProviderSurrogate(schedulerId, pollingDefinition, trigger, jobExecutionContexts); + } + + private IQueueTrackerCoordinator CreateQueueTrackerCoordinator(string schedulerId) + { + var queueTrackerFactory = new QueueTrackerFactory(schedulerId, this.mockJobDataProvidersFactory.Object); + + return new QueueTrackerCoordinator(queueTrackerFactory); + } + + private JobDataProviderSurrogate CreateRetryDurableJobDataProvider(string schedulerId, string cronExpression, List jobExecutionContexts) + { + var retryDurablePollingDefinition = + new RetryDurablePollingDefinition( + enabled: true, + cronExpression: cronExpression, + fetchSize: 100, + expirationIntervalFactor: 1 + ); + + return this.CreateJobDataProvider(schedulerId, retryDurablePollingDefinition, jobExecutionContexts); + } + } +} diff --git a/src/KafkaFlow.Retry.IntegrationTests/Properties/AssemblyMetadata.cs b/src/KafkaFlow.Retry.IntegrationTests/Properties/AssemblyMetadata.cs index c2a6239a..07d754eb 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Properties/AssemblyMetadata.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Properties/AssemblyMetadata.cs @@ -1,5 +1,7 @@ using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; +using Xunit; [assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] -[assembly: ExcludeFromCodeCoverage] \ No newline at end of file +[assembly: ExcludeFromCodeCoverage] +[assembly: CollectionBehavior(DisableTestParallelization = true)] diff --git a/src/KafkaFlow.Retry.IntegrationTests/RetryDurableTests.cs b/src/KafkaFlow.Retry.IntegrationTests/RetryDurableTests.cs index f36b88ea..c35102c7 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/RetryDurableTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/RetryDurableTests.cs @@ -141,4 +141,4 @@ internal async Task RetryDurableTest( } } } -} \ No newline at end of file +} diff --git a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerCoordinatorTests.cs b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerCoordinatorTests.cs index 15a2e123..e5764272 100644 --- a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerCoordinatorTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerCoordinatorTests.cs @@ -14,7 +14,7 @@ public class QueueTrackerCoordinatorTests { private readonly Mock mockJobDataProvider; private readonly Mock mockQueueTrackerFactory; - private readonly QueueTrackerCoordinator queueTrackerCoordinator; + private readonly IQueueTrackerCoordinator queueTrackerCoordinator; public QueueTrackerCoordinatorTests() { @@ -90,4 +90,4 @@ public async Task QueueTrackerCoordinator_UnscheduleJobs_Success() this.mockJobDataProvider.Verify(m => m.Trigger, Times.Exactly(2)); } } -} \ No newline at end of file +} diff --git a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerTests.cs b/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerTests.cs deleted file mode 100644 index b4e77d38..00000000 --- a/src/KafkaFlow.Retry.UnitTests/KafkaFlow.Retry/Durable/Polling/QueueTrackerTests.cs +++ /dev/null @@ -1,234 +0,0 @@ -namespace KafkaFlow.Retry.UnitTests.KafkaFlow.Retry.Durable.Polling -{ - using System; - using System.Collections.Generic; - using System.Linq; - using System.Threading.Tasks; - using FluentAssertions; - using global::KafkaFlow.Retry.Durable.Definitions.Polling; - using global::KafkaFlow.Retry.Durable.Polling; - using Moq; - using Quartz; - using Xunit; - - public class MockIJob : IJob - { - public Task Execute(IJobExecutionContext context) - { - var jobExecutionContexts = context.JobDetail.JobDataMap["JobExecution"] as List; - jobExecutionContexts.Add(context); - - return Task.CompletedTask; - } - } - - public class QueueTrackerTests - { - [Fact] - public async Task QueueTracker_ScheduleAndUnscheduleDifferentJobs_Success() - { - // arrange - var schedulerId = "twoJobsSchedulerId"; - - var retryDurablePollingDefinition = - new RetryDurablePollingDefinition( - enabled: true, - cronExpression: "*/2 * * ? * * *", - fetchSize: 100, - expirationIntervalFactor: 1 - ); - - var cleanupPollingDefinition = - new CleanupPollingDefinition( - enabled: true, - cronExpression: "*/4 * * ? * * *", - timeToLiveInDays: 1, - rowsPerRequest: 10 - ); - - var retryDurableTriggerKeyName = $"Trigger_{schedulerId}_{retryDurablePollingDefinition.PollingJobType}"; - var cleanupTriggerKeyName = $"Trigger_{schedulerId}_{cleanupPollingDefinition.PollingJobType}"; - - var jobExecutionContexts = new List(); - - var mockRetryDurableJobDataProvider = new Mock(); - this.SetupMockJobDataProvider( - mockRetryDurableJobDataProvider, - retryDurablePollingDefinition, - retryDurableTriggerKeyName, - jobExecutionContexts); - - var mockCleanupJobDataProvider = new Mock(); - this.SetupMockJobDataProvider( - mockCleanupJobDataProvider, - cleanupPollingDefinition, - cleanupTriggerKeyName, - jobExecutionContexts); - - var queueTracker = new QueueTracker( - schedulerId, - new[] { mockRetryDurableJobDataProvider.Object, mockCleanupJobDataProvider.Object }, - Mock.Of() - ); - - // act - await queueTracker.ScheduleJobsAsync(); - - await WaitForSeconds(5); - - await queueTracker.UnscheduleJobsAsync(); - - // assert - jobExecutionContexts.Where(x => x.PreviousFireTimeUtc is null).Count().Should().Be(2); - jobExecutionContexts.Where(x => x.PreviousFireTimeUtc is null && x.Trigger.Key.Name == retryDurableTriggerKeyName).Count().Should().Be(1); - jobExecutionContexts.Where(x => x.PreviousFireTimeUtc is null && x.Trigger.Key.Name == cleanupTriggerKeyName).Count().Should().Be(1); - } - - [Fact] - public async Task QueueTracker_ScheduleAndUnscheduleRetryDurableJobs_Success() - { - // arrange - var schedulerId = "pollingId"; - - var retryDurablePollingDefinition = - new RetryDurablePollingDefinition( - enabled: true, - cronExpression: "*/5 * * ? * * *", - fetchSize: 100, - expirationIntervalFactor: 1 - ); - - var jobExecutionContexts = new List(); - var dataMap = new JobDataMap { { "JobExecution", jobExecutionContexts } }; - - var mockIJobDataProvider = new Mock(); - mockIJobDataProvider - .Setup(x => x.JobDetail) - .Returns( - JobBuilder - .Create() - .SetJobData(dataMap) - .Build() - ); - - mockIJobDataProvider - .SetupGet(m => m.PollingDefinition) - .Returns(retryDurablePollingDefinition); - - mockIJobDataProvider - .SetupGet(m => m.Trigger) - .Returns( - TriggerBuilder - .Create() - .WithIdentity("Trigger1", "queueTrackerGroup") - .WithCronSchedule("*/5 * * ? * * *") - .StartNow() - .WithPriority(1) - .Build()); - - var mockILogHandler = new Mock(); - mockILogHandler.Setup(x => x.Info(It.IsAny(), It.IsAny())); - mockILogHandler.Setup(x => x.Error(It.IsAny(), It.IsAny(), It.IsAny())); - - var queueTracker1 = new QueueTracker( - schedulerId, - new[] { mockIJobDataProvider.Object }, - mockILogHandler.Object - ); - - // act - await queueTracker1.ScheduleJobsAsync(); - - await WaitForSeconds(6).ConfigureAwait(false); - - await queueTracker1.UnscheduleJobsAsync(); - - await WaitForSeconds(15).ConfigureAwait(false); - - mockIJobDataProvider - .SetupGet(m => m.Trigger) - .Returns( - TriggerBuilder - .Create() - .WithIdentity("Trigger2", "queueTrackerGroup") - .WithCronSchedule("*/5 * * ? * * *") - .StartNow() - .WithPriority(1) - .Build()); - - var queueTracker2 = new QueueTracker( - schedulerId, - new[] { mockIJobDataProvider.Object }, - mockILogHandler.Object - ); - - await queueTracker2.ScheduleJobsAsync(); - - await WaitForSeconds(6).ConfigureAwait(false); - - await queueTracker2.UnscheduleJobsAsync(); - - await WaitForSeconds(15).ConfigureAwait(false); - - jobExecutionContexts.Where(x => x.PreviousFireTimeUtc is null).Count().Should().Be(2); - jobExecutionContexts.Where(x => x.PreviousFireTimeUtc is null && x.Trigger.Key.Name == "Trigger1").Count().Should().Be(1); - jobExecutionContexts.Where(x => x.PreviousFireTimeUtc is null && x.Trigger.Key.Name == "Trigger2").Count().Should().Be(1); - - var timeBetweenJobExecutionsWhileJobWasUnscheduled = - jobExecutionContexts - .Where(x => x.Trigger.Key.Name == "Trigger2") - .OrderBy(x => x.FireTimeUtc) - .First() - .FireTimeUtc - - - jobExecutionContexts - .Where(x => x.Trigger.Key.Name == "Trigger1") - .OrderBy(x => x.FireTimeUtc) - .Last() - .FireTimeUtc; - - timeBetweenJobExecutionsWhileJobWasUnscheduled.Should().BeGreaterThan(TimeSpan.FromSeconds(15)); - } - - private static async Task WaitForSeconds(int seconds) - { - for (int i = 0; i < (seconds * 10); i++) - { - await Task.Delay(100).ConfigureAwait(false); - } - } - - private void SetupMockJobDataProvider( - Mock mockJobDataProvider, - PollingDefinition pollingDefinition, - string triggerKeyName, - List jobExecutionContexts) - { - var dataMap = new JobDataMap { { "JobExecution", jobExecutionContexts } }; - - mockJobDataProvider - .Setup(x => x.JobDetail) - .Returns( - JobBuilder - .Create() - .SetJobData(dataMap) - .Build() - ); - - mockJobDataProvider - .SetupGet(m => m.PollingDefinition) - .Returns(pollingDefinition); - - mockJobDataProvider - .SetupGet(m => m.Trigger) - .Returns( - TriggerBuilder - .Create() - .WithIdentity(triggerKeyName, "queueTrackerGroupTest") - .WithCronSchedule(pollingDefinition.CronExpression) - .StartNow() - .WithPriority(1) - .Build()); - } - } -} \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Polling/TriggerProvider.cs b/src/KafkaFlow.Retry/Durable/Polling/TriggerProvider.cs index e16a5fe8..eb44bab5 100644 --- a/src/KafkaFlow.Retry/Durable/Polling/TriggerProvider.cs +++ b/src/KafkaFlow.Retry/Durable/Polling/TriggerProvider.cs @@ -9,9 +9,9 @@ public ITrigger GetPollingTrigger(string schedulerId, PollingDefinition pollingD => TriggerBuilder .Create() .WithIdentity($"pollingJobTrigger_{schedulerId}_{pollingDefinition.PollingJobType}", "queueTrackerGroup") - .WithCronSchedule(pollingDefinition.CronExpression) + .WithCronSchedule(pollingDefinition.CronExpression, cronBuilder => cronBuilder.WithMisfireHandlingInstructionDoNothing()) .StartNow() .WithPriority(1) .Build(); } -} \ No newline at end of file +} diff --git a/src/KafkaFlow.Retry/Properties/AssemblyInfo.cs b/src/KafkaFlow.Retry/Properties/AssemblyInfo.cs index 1ef990ef..04c08639 100644 --- a/src/KafkaFlow.Retry/Properties/AssemblyInfo.cs +++ b/src/KafkaFlow.Retry/Properties/AssemblyInfo.cs @@ -1,4 +1,5 @@ using System.Runtime.CompilerServices; [assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] -[assembly: InternalsVisibleTo("KafkaFlow.Retry.UnitTests")] \ No newline at end of file +[assembly: InternalsVisibleTo("KafkaFlow.Retry.UnitTests")] +[assembly: InternalsVisibleTo("KafkaFlow.Retry.IntegrationTests")]