Skip to content

Commit 9721547

Browse files
author
Mnatsakan Margaryan
committed
patial init
1 parent 47c717c commit 9721547

15 files changed

+297
-11
lines changed

PandaNuGet.sln renamed to MassTransit.PostgresOutbox.sln

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,24 @@
11

22
Microsoft Visual Studio Solution File, Format Version 12.00
3-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PandaNuGet", "src\PandaNuGet\PandaNuGet.csproj", "{25001943-A870-4E17-A9B9-0D190CEC819B}"
3+
# Visual Studio Version 17
4+
VisualStudioVersion = 17.9.34723.18
5+
MinimumVisualStudioVersion = 10.0.40219.1
6+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MassTransit.PostgresOutbox", "src\PandaNuGet\MassTransit.PostgresOutbox.csproj", "{25001943-A870-4E17-A9B9-0D190CEC819B}"
47
EndProject
5-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PandaNuGet.Tests", "test\PandaNuGet.Tests\PandaNuGet.Tests.csproj", "{0305E58F-1C47-454C-B10B-A223F2561A85}"
8+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PandaNuGet.Tests", "test\PandaNuGet.Tests\PandaNuGet.Tests.csproj", "{0305E58F-1C47-454C-B10B-A223F2561A85}"
69
EndProject
7-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PandaNuGet.Demo", "test\PandaNuGet.Demo\PandaNuGet.Demo.csproj", "{8A6AA36D-1CEF-4018-9C9D-7D029F3EAECE}"
10+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PandaNuGet.Demo", "test\PandaNuGet.Demo\PandaNuGet.Demo.csproj", "{8A6AA36D-1CEF-4018-9C9D-7D029F3EAECE}"
811
EndProject
912
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{F8A6DCFE-8924-49A4-B3E9-2034593F54E5}"
1013
EndProject
1114
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{FEE159A2-74A0-4469-9B93-52987CA1A3CA}"
1215
EndProject
1316
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{8FF54200-8B03-4828-AAAE-297E30E2A00A}"
1417
ProjectSection(SolutionItems) = preProject
15-
.github\workflows\main.yml = .github\workflows\main.yml
1618
.gitignore = .gitignore
17-
Readme.md = Readme.md
1819
global.json = global.json
20+
.github\workflows\main.yml = .github\workflows\main.yml
21+
Readme.md = Readme.md
1922
EndProjectSection
2023
EndProject
2124
Global
@@ -37,9 +40,15 @@ Global
3740
{8A6AA36D-1CEF-4018-9C9D-7D029F3EAECE}.Release|Any CPU.ActiveCfg = Release|Any CPU
3841
{8A6AA36D-1CEF-4018-9C9D-7D029F3EAECE}.Release|Any CPU.Build.0 = Release|Any CPU
3942
EndGlobalSection
43+
GlobalSection(SolutionProperties) = preSolution
44+
HideSolutionNode = FALSE
45+
EndGlobalSection
4046
GlobalSection(NestedProjects) = preSolution
4147
{25001943-A870-4E17-A9B9-0D190CEC819B} = {F8A6DCFE-8924-49A4-B3E9-2034593F54E5}
42-
{8A6AA36D-1CEF-4018-9C9D-7D029F3EAECE} = {FEE159A2-74A0-4469-9B93-52987CA1A3CA}
4348
{0305E58F-1C47-454C-B10B-A223F2561A85} = {FEE159A2-74A0-4469-9B93-52987CA1A3CA}
49+
{8A6AA36D-1CEF-4018-9C9D-7D029F3EAECE} = {FEE159A2-74A0-4469-9B93-52987CA1A3CA}
50+
EndGlobalSection
51+
GlobalSection(ExtensibilityGlobals) = postSolution
52+
SolutionGuid = {32F4406B-A6AC-4ACA-BBC6-30D53665B7F7}
4453
EndGlobalSection
4554
EndGlobal
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using MassTransit.PostgresOutbox.Entities;
2+
using Microsoft.EntityFrameworkCore;
3+
4+
namespace MassTransit.PostgresOutbox.Abstractions
5+
{
6+
public interface IInboxDbContext
7+
{
8+
public DbSet<InboxMessage> InboxMessages { get; set; }
9+
}
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using MassTransit.PostgresOutbox.Entities;
2+
using Microsoft.EntityFrameworkCore;
3+
4+
namespace FinHub.Mock1.Box.Abstractions
5+
{
6+
public interface IOutboxDbContext
7+
{
8+
public DbSet<OutboxMessage> OutboxMessages { get; set; }
9+
}
10+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
using EFCore.PostgresExtensions.Enums;
2+
using EFCore.PostgresExtensions.Extensions;
3+
using FinHub.Mock1.Box.Entities;
4+
using FinHub.Mock1.Box.Enums;
5+
using FinHub.Mock1.Box.Implementations;
6+
using MassTransit;
7+
using MassTransit.PostgresOutbox.Abstractions;
8+
using Microsoft.EntityFrameworkCore;
9+
using Microsoft.Extensions.DependencyInjection;
10+
using Microsoft.Extensions.Logging;
11+
12+
namespace FinHub.Mock1.Box.Abstractions
13+
{
14+
public abstract class InboxConsumer<TMessage, TDbContext> : IConsumer<TMessage>
15+
where TMessage : class
16+
where TDbContext : DbContext, IInboxDbContext
17+
{
18+
private readonly string _consumerId;
19+
private readonly IServiceScopeFactory _serviceScopeFactory;
20+
21+
protected InboxConsumer(IServiceScopeFactory serviceScopeFactory)
22+
{
23+
_consumerId = GetType().ToString();
24+
_serviceScopeFactory = serviceScopeFactory;
25+
}
26+
27+
public async Task Consume(ConsumeContext<TMessage> context)
28+
{
29+
using var scope = _serviceScopeFactory.CreateScope();
30+
var messageId = context.Headers.Get<Guid>(Constants.OutboxMessageId);
31+
32+
var dbContext = scope.ServiceProvider.GetRequiredService<TDbContext>();
33+
var logger = scope.ServiceProvider.GetRequiredService<ILogger<InboxConsumer<TMessage, TDbContext>>>();
34+
35+
var exists = await dbContext.InboxMessages.AnyAsync(x => x.MessageId == messageId && x.ConsumerId == _consumerId);
36+
37+
if (!exists)
38+
{
39+
dbContext.InboxMessages.Add(new InboxMessage
40+
{
41+
MessageId = messageId!.Value,
42+
CreatedAt = DateTime.UtcNow,
43+
State = MessageState.New,
44+
ConsumerId = _consumerId,
45+
});
46+
47+
await dbContext.SaveChangesAsync();
48+
}
49+
50+
using var transactionScope = await dbContext.Database.BeginTransactionAsync(System.Data.IsolationLevel.ReadCommitted);
51+
52+
var inboxMessage = await dbContext.InboxMessages
53+
.Where(x => x.MessageId == messageId)
54+
.Where(x => x.ConsumerId == _consumerId)
55+
.Where(x => x.State == MessageState.New)
56+
.ForUpdate(LockBehavior.SkipLocked)
57+
.FirstOrDefaultAsync();
58+
59+
if (inboxMessage == null)
60+
{
61+
return;
62+
}
63+
64+
try
65+
{
66+
await Consume(context.Message);
67+
inboxMessage.State = MessageState.Done;
68+
}
69+
catch (Exception ex)
70+
{
71+
logger.LogError(ex, "Exception thrown while consuming message");
72+
throw;
73+
}
74+
finally
75+
{
76+
inboxMessage!.UpdatedAt = DateTime.UtcNow;
77+
await dbContext.SaveChangesAsync();
78+
await transactionScope.CommitAsync();
79+
}
80+
}
81+
82+
public abstract Task Consume(TMessage message);
83+
}
84+
}

src/PandaNuGet/Constants.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace MassTransit.PostgresOutbox
2+
{
3+
public class Constants
4+
{
5+
public const string OutboxMessageId = "OutboxMessageId";
6+
}
7+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using MassTransit.PostgresOutbox.Enums;
2+
3+
namespace MassTransit.PostgresOutbox.Entities
4+
{
5+
public class InboxMessage
6+
{
7+
public required Guid MessageId { get; set; }
8+
public required string ConsumerId { get; set; }
9+
public MessageState State { get; set; } = MessageState.New;
10+
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
11+
public DateTime? UpdatedAt { get; set; }
12+
}
13+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using MassTransit.PostgresOutbox.Enums;
2+
3+
namespace MassTransit.PostgresOutbox.Entities
4+
{
5+
public class OutboxMessage
6+
{
7+
public required Guid Id { get; set; }
8+
public MessageState State { get; set; } = MessageState.New;
9+
public required string Payload { get; set; }
10+
public required string Type { get; set; }
11+
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
12+
public DateTime? UpdatedAt { get; set; }
13+
14+
}
15+
}

src/PandaNuGet/Enums/MessageState.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace MassTransit.PostgresOutbox.Enums
2+
{
3+
public enum MessageState
4+
{
5+
New = 1,
6+
Done = 2,
7+
}
8+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using MassTransit.PostgresOutbox.Entities;
2+
using Microsoft.EntityFrameworkCore;
3+
4+
namespace MassTransit.PostgresOutbox.Extensions
5+
{
6+
public static class ModelBuilderExtensions
7+
{
8+
public static ModelBuilder ConfigureOutboxMessageEntity(this ModelBuilder modelBuilder)
9+
{
10+
var entity = modelBuilder.Entity<OutboxMessage>();
11+
12+
entity.HasKey(x => x.Id);
13+
entity.Property(x => x.Id).ValueGeneratedNever();
14+
15+
return modelBuilder;
16+
}
17+
18+
public static ModelBuilder ConfigureInboxMessageEntity(this ModelBuilder modelBuilder)
19+
{
20+
var entity = modelBuilder.Entity<InboxMessage>();
21+
22+
entity.HasKey(x => new { x.MessageId, x.ConsumerId });
23+
24+
return modelBuilder;
25+
}
26+
27+
public static ModelBuilder ConfigureInboxOutboxEntities(this ModelBuilder modelBuilder)
28+
{
29+
modelBuilder.ConfigureOutboxMessageEntity();
30+
modelBuilder.ConfigureInboxMessageEntity();
31+
32+
return modelBuilder;
33+
}
34+
}
35+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using FinHub.Mock1.Box.Abstractions;
2+
using MassTransit.PostgresOutbox.Entities;
3+
using Newtonsoft.Json;
4+
5+
namespace MassTransit.PostgresOutbox.Extensions
6+
{
7+
public static class OutboxDbContextExtensions
8+
{
9+
public static Guid AddToOutbox<T>(this IOutboxDbContext dbContext, T message)
10+
{
11+
var entity = new OutboxMessage
12+
{
13+
Id = Guid.NewGuid(),
14+
CreatedAt = DateTime.UtcNow,
15+
State = Enums.MessageState.New,
16+
UpdatedAt = null,
17+
Payload = JsonConvert.SerializeObject(message),
18+
Type = typeof(T).AssemblyQualifiedName!
19+
};
20+
21+
dbContext.OutboxMessages.Add(entity);
22+
23+
return entity.Id;
24+
}
25+
}
26+
}

0 commit comments

Comments
 (0)