Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change revoke and assign behavior to turn on/off the trigger #90

Merged
merged 6 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
using System;
using FluentAssertions;
using global::KafkaFlow.Retry.Durable.Definitions;
using global::KafkaFlow.Retry.Durable.Encoders;
using global::KafkaFlow.Retry.Durable.Polling;
using global::KafkaFlow.Retry.Durable.Repository;
using global::KafkaFlow.Retry.Durable.Repository.Adapters;
using Moq;
using Quartz;
using Xunit;

public class QueueTrackerCoordinatorTests
Expand All @@ -34,19 +32,16 @@ public void QueueTrackerCoordinator_Initialize_Success()
queueTrackerFactory
martinhonovais marked this conversation as resolved.
Show resolved Hide resolved
.Setup(d => d.Create(It.IsAny<RetryDurablePollingDefinition>(), It.IsAny<IMessageProducer>(), It.IsAny<ILogHandler>()))
.Returns(new QueueTracker(
Mock.Of<IRetryDurableQueueRepository>(),
Mock.Of<ILogHandler>(),
Mock.Of<IMessageHeadersAdapter>(),
Mock.Of<IMessageAdapter>(),
Mock.Of<IUtf8Encoder>(),
Mock.Of<IMessageProducer>(),
retryDurablePollingDefinition
retryDurablePollingDefinition,
Mock.Of<IJobDetailProvider>(),
Mock.Of<ITriggerProvider>()
));

var coordinator = new QueueTrackerCoordinator(queueTrackerFactory.Object);

// Act
coordinator.Initialize(retryDurablePollingDefinition,
coordinator.ScheduleJob(retryDurablePollingDefinition,
mockIMessageProducer.Object,
mockILogHandler.Object);

Expand All @@ -58,26 +53,38 @@ public void QueueTrackerCoordinator_Initialize_Success()
public void QueueTrackerCoordinator_Shutdown_Success()
martinhonovais marked this conversation as resolved.
Show resolved Hide resolved
{
// Arrange
var mockIJobDetailProvider = new Mock<IJobDetailProvider>();
mockIJobDetailProvider
.Setup(x => x.GetQueuePollingJobDetail())
.Returns(Mock.Of<IJobDetail>());

var mockITrigger = new Mock<ITrigger>();
mockITrigger
.SetupGet(x => x.Key)
.Returns(new TriggerKey(String.Empty));

var mockITriggerProvider = new Mock<ITriggerProvider>();
mockITriggerProvider
.Setup(x => x.GetQueuePollingTrigger())
.Returns(mockITrigger.Object);

queueTrackerFactory
.Setup(d => d.Create(It.IsAny<RetryDurablePollingDefinition>(), It.IsAny<IMessageProducer>(), It.IsAny<ILogHandler>()))
.Returns(new QueueTracker(
Mock.Of<IRetryDurableQueueRepository>(),
Mock.Of<ILogHandler>(),
Mock.Of<IMessageHeadersAdapter>(),
Mock.Of<IMessageAdapter>(),
Mock.Of<IUtf8Encoder>(),
Mock.Of<IMessageProducer>(),
retryDurablePollingDefinition
retryDurablePollingDefinition,
mockIJobDetailProvider.Object,
mockITriggerProvider.Object
));

var coordinator = new QueueTrackerCoordinator(queueTrackerFactory.Object);

// Act
coordinator.Initialize(
coordinator.ScheduleJob(
retryDurablePollingDefinition,
mockIMessageProducer.Object,
mockILogHandler.Object);
coordinator.Shutdown();
coordinator.UnscheduleJob();

//Assert
queueTrackerFactory.Verify(d => d.Create(It.IsAny<RetryDurablePollingDefinition>(), It.IsAny<IMessageProducer>(), It.IsAny<ILogHandler>()), Times.Once);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
namespace KafkaFlow.Retry.UnitTests.KafkaFlow.Retry.Durable.Polling
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using global::KafkaFlow.Retry.Durable.Definitions;
using global::KafkaFlow.Retry.Durable.Polling;
using Moq;
using Quartz;
using Xunit;

public class MockIJob : IJob
{
public Task Execute(IJobExecutionContext context)
{
var jobExecutionContexts = context.JobDetail.JobDataMap["JobExecution"] as List<IJobExecutionContext>;
jobExecutionContexts.Add(context);

return Task.CompletedTask;
}
}

public class QueueTrackerTests
{
[Fact]
public async Task QueueTracker_Create_Success()
martinhonovais marked this conversation as resolved.
Show resolved Hide resolved
{
// arrange
var mockITriggerProvider = new Mock<ITriggerProvider>();
mockITriggerProvider
.Setup(x => x.GetQueuePollingTrigger())
.Returns(() =>
TriggerBuilder
.Create()
.WithIdentity("Trigger", "queueTrackerGroup")
.WithCronSchedule("*/5 * * ? * * *")
.StartNow()
.WithPriority(1)
.Build()
);

IList<IJobExecutionContext> jobExecutionContexts = new List<IJobExecutionContext>();
martinhonovais marked this conversation as resolved.
Show resolved Hide resolved
JobDataMap dataMap = new JobDataMap();
dataMap.Add("JobExecution", jobExecutionContexts);
martinhonovais marked this conversation as resolved.
Show resolved Hide resolved
var mockIJobDetailProvider = new Mock<IJobDetailProvider>();
mockIJobDetailProvider
.Setup(x => x.GetQueuePollingJobDetail())
.Returns(
JobBuilder
.Create<MockIJob>()
.SetJobData(dataMap)
.Build()
);

var mockILogHandler = new Mock<ILogHandler>();
mockILogHandler.Setup(x => x.Info(It.IsAny<string>(), It.IsAny<object>()));
mockILogHandler.Setup(x => x.Error(It.IsAny<string>(), It.IsAny<Exception>(), It.IsAny<object>()));
var retryDurablePollingDefinition =
new RetryDurablePollingDefinition(
enabled: true,
cronExpression: "*/5 * * ? * * *",
fetchSize: 100,
expirationIntervalFactor: 1,
id: "pollingId"
);

var queueTracker = new QueueTracker(
mockILogHandler.Object,
retryDurablePollingDefinition,
mockIJobDetailProvider.Object,
mockITriggerProvider.Object
);

// act
queueTracker.ScheduleJob();

await WaitForSeconds(6).ConfigureAwait(false);

queueTracker.UnscheduleJob();

await WaitForSeconds(15).ConfigureAwait(false);

var mockITriggerProvider1 = new Mock<ITriggerProvider>();
martinhonovais marked this conversation as resolved.
Show resolved Hide resolved
mockITriggerProvider1
.Setup(x => x.GetQueuePollingTrigger())
.Returns(() =>
TriggerBuilder
.Create()
.WithIdentity("Trigger1", "queueTrackerGroup")
.WithCronSchedule("*/5 * * ? * * *")
.StartNow()
.WithPriority(1)
.Build()
);

var queueTracker1 = new QueueTracker(
mockILogHandler.Object,
retryDurablePollingDefinition,
mockIJobDetailProvider.Object,
mockITriggerProvider1.Object
);

queueTracker1.ScheduleJob();

await WaitForSeconds(6).ConfigureAwait(false);

queueTracker1.UnscheduleJob();

await WaitForSeconds(15).ConfigureAwait(false);

jobExecutionContexts.Where(x => x.PreviousFireTimeUtc is null).Count().Should().Be(2);
jobExecutionContexts.Where(x => x.PreviousFireTimeUtc is null && x.Trigger.Key.Name == "Trigger").Count().Should().Be(1);
jobExecutionContexts.Where(x => x.PreviousFireTimeUtc is null && x.Trigger.Key.Name == "Trigger1").Count().Should().Be(1);
(
martinhonovais marked this conversation as resolved.
Show resolved Hide resolved
jobExecutionContexts
.Where(x => x.Trigger.Key.Name == "Trigger1")
.OrderBy(x => x.FireTimeUtc)
.First()
.FireTimeUtc
-
jobExecutionContexts
.Where(x => x.Trigger.Key.Name == "Trigger")
.OrderBy(x => x.FireTimeUtc)
.Last()
.FireTimeUtc
).Should().BeGreaterThan(TimeSpan.FromSeconds(15));
}

private static async Task WaitForSeconds(int seconds)
{
for (int i = 0; i < (seconds * 10); i++)
{
await Task.Delay(100).ConfigureAwait(false);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,43 @@ RetryDurablePollingDefinition retryDurablePollingDefinition
.WithPartitionsAssignedHandler(
(resolver, partitionsAssignedHandler) =>
{
var log = resolver.Resolve<ILogHandler>();
log.Info(
"Partitions Assigned",
new
{
PartitionsAssigned = partitionsAssignedHandler
});

if (partitionsAssignedHandler is object
&& partitionsAssignedHandler.Any(tp => tp.Partition == DefaultPartitionElection))
{
log.Info(
"Default Partition Assigned",
new
{
DefaultPartitionElection
});

queueTrackerCoordinator
.Initialize(
.ScheduleJob(
retryDurablePollingDefinition,
resolver.Resolve<IProducerAccessor>().GetProducer(producerName),
resolver.Resolve<ILogHandler>());
log);
}
})
.WithPartitionsRevokedHandler(
(resolver, partitionsRevokedHandler) =>
{
queueTrackerCoordinator.Shutdown();
var log = resolver.Resolve<ILogHandler>();
log.Info(
"Partitions Revoked",
new
{
PartitionsRevoked = partitionsRevokedHandler
});

queueTrackerCoordinator.UnscheduleJob();
})
.AddMiddlewares(
middlewares => middlewares
Expand Down
9 changes: 9 additions & 0 deletions src/KafkaFlow.Retry/Durable/Polling/IJobDetailProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace KafkaFlow.Retry.Durable.Polling
{
using Quartz;

internal interface IJobDetailProvider
{
IJobDetail GetQueuePollingJobDetail();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

internal interface IQueueTrackerCoordinator
{
void Initialize(
void ScheduleJob(
RetryDurablePollingDefinition retryDurablePollingDefinition,
IMessageProducer retryDurableMessageProducer,
ILogHandler logHandler);

void Shutdown();
void UnscheduleJob();
}
}
9 changes: 9 additions & 0 deletions src/KafkaFlow.Retry/Durable/Polling/ITriggerProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace KafkaFlow.Retry.Durable.Polling
{
using Quartz;

internal interface ITriggerProvider
{
ITrigger GetQueuePollingTrigger();
}
}
63 changes: 63 additions & 0 deletions src/KafkaFlow.Retry/Durable/Polling/JobDetailProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using Dawn;
using KafkaFlow.Retry.Durable.Definitions;
using KafkaFlow.Retry.Durable.Encoders;
using KafkaFlow.Retry.Durable.Repository;
using KafkaFlow.Retry.Durable.Repository.Adapters;
using Quartz;
martinhonovais marked this conversation as resolved.
Show resolved Hide resolved

namespace KafkaFlow.Retry.Durable.Polling
{
internal class JobDetailProvider : IJobDetailProvider
{
private readonly ILogHandler logHandler;
private readonly IMessageAdapter messageAdapter;
private readonly IMessageHeadersAdapter messageHeadersAdapter;
private readonly IMessageProducer retryDurableMessageProducer;
private readonly RetryDurablePollingDefinition retryDurablePollingDefinition;
private readonly IRetryDurableQueueRepository retryDurableQueueRepository;
private readonly IUtf8Encoder utf8Encoder;

public JobDetailProvider(
IRetryDurableQueueRepository retryDurableQueueRepository,
ILogHandler logHandler,
IMessageHeadersAdapter messageHeadersAdapter,
IMessageAdapter messageAdapter,
IUtf8Encoder utf8Encoder,
IMessageProducer retryDurableMessageProducer,
RetryDurablePollingDefinition retryDurablePollingDefinition)
{
Guard.Argument(retryDurableQueueRepository).NotNull();
Guard.Argument(logHandler).NotNull();
Guard.Argument(messageHeadersAdapter).NotNull();
Guard.Argument(messageAdapter).NotNull();
Guard.Argument(utf8Encoder).NotNull();
Guard.Argument(retryDurableMessageProducer).NotNull();
Guard.Argument(retryDurablePollingDefinition).NotNull();

this.retryDurableQueueRepository = retryDurableQueueRepository;
this.logHandler = logHandler;
this.messageHeadersAdapter = messageHeadersAdapter;
this.messageAdapter = messageAdapter;
this.utf8Encoder = utf8Encoder;
this.retryDurableMessageProducer = retryDurableMessageProducer;
this.retryDurablePollingDefinition = retryDurablePollingDefinition;
}

public IJobDetail GetQueuePollingJobDetail()
{
JobDataMap dataMap = new JobDataMap();
dataMap.Add(QueuePollingJobConstants.RetryDurableQueueRepository, this.retryDurableQueueRepository);
dataMap.Add(QueuePollingJobConstants.RetryDurableMessageProducer, this.retryDurableMessageProducer);
dataMap.Add(QueuePollingJobConstants.RetryDurablePollingDefinition, this.retryDurablePollingDefinition);
dataMap.Add(QueuePollingJobConstants.LogHandler, this.logHandler);
dataMap.Add(QueuePollingJobConstants.MessageHeadersAdapter, this.messageHeadersAdapter);
dataMap.Add(QueuePollingJobConstants.MessageAdapter, this.messageAdapter);
dataMap.Add(QueuePollingJobConstants.Utf8Encoder, this.utf8Encoder);

return JobBuilder
.Create<QueuePollingJob>()
.SetJobData(dataMap)
.Build();
}
}
}
Loading