Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/guide/durability/efcore/sagas.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,12 @@ public class OrdersDbContext : DbContext
modelBuilder.Entity<Order>(map =>
{
map.ToTable("orders", "sample");
map.HasKey(x => x.Id);
map.Property(x => x.OrderStatus)
.HasConversion(v => v.ToString(), v => Enum.Parse<OrderStatus>(v));

// enable optimistic concurrency
map.Property(x => x.Version)
.IsConcurrencyToken();
});
}
}
Expand Down
111 changes: 111 additions & 0 deletions src/Persistence/EfCoreTests/Optimistic_concurrency_with_ef_core.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using EfCoreTests.MultiTenancy;
using IntegrationTests;
using JasperFx;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using SharedPersistenceModels.Items;
using Shouldly;
using Weasel.Core;
using Weasel.SqlServer;
using Weasel.SqlServer.Tables;
using Wolverine;
using Wolverine.Attributes;
using Wolverine.EntityFrameworkCore;
using Wolverine.Runtime.Handlers;
using Wolverine.SqlServer;
using Wolverine.Tracking;
using Xunit.Abstractions;

namespace EfCoreTests;

[Collection("sqlserver")]
public class Optimistic_concurrency_with_ef_core
{
private readonly ITestOutputHelper _output;

public Optimistic_concurrency_with_ef_core(ITestOutputHelper output)
{
_output = output;
}

[Fact]
public async Task detect_concurrency_exception_as_SagaConcurrencyException()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opt =>
{
opt.Services.AddDbContextWithWolverineIntegration<OptConcurrencyDbContext>(o =>
{
o.UseSqlServer(Servers.SqlServerConnectionString);
});

opt.Services.AddScoped<IOrderRepository, OrderRepository>();

opt.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString);
opt.UseEntityFrameworkCoreTransactions();
opt.Policies.UseDurableLocalQueues();
opt.Policies.AutoApplyTransactions();
}).StartAsync();

var table = new Table("ConcurrencyTestSagas");
table.AddColumn<Guid>("id").AsPrimaryKey();
table.AddColumn<string>("value");
table.AddColumn<int>("version");
await using var conn = new SqlConnection(Servers.SqlServerConnectionString);
await conn.OpenAsync();

var migration = await SchemaMigration.DetermineAsync(conn, table);
await new SqlServerMigrator().ApplyAllAsync(conn, migration, AutoCreate.All);

using var scope = host.Services.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<OptConcurrencyDbContext>();
await dbContext.Database.EnsureCreatedAsync();

await conn.CloseAsync();

await dbContext.ConcurrencyTestSagas.AddAsync(new()
{
Id = Guid.NewGuid(),
Value = "initial value",
Version = 0,
});
await dbContext.SaveChangesAsync();

Should.ThrowAsync<SagaConcurrencyException>(() => host.InvokeMessageAndWaitAsync(new UpdateConcurrencyTestSaga(Guid.NewGuid(), "updated value")));
}
}

public class OptConcurrencyDbContext : DbContext
{
public OptConcurrencyDbContext(DbContextOptions<OptConcurrencyDbContext> options) : base(options)
{
}

protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<ConcurrencyTestSaga>()
.Property("Version")
.IsConcurrencyToken();
}

public DbSet<ConcurrencyTestSaga> ConcurrencyTestSagas { get; set; }
}

public record UpdateConcurrencyTestSaga(Guid Id, string NewValue);

public class ConcurrencyTestSaga : Saga
{
public Guid Id { get; set; }
public string Value { get; set; }
public void Handle(UpdateConcurrencyTestSaga order, OptConcurrencyDbContext ctx)
{
// Fake 999 updates of the saga while this event is being handled
ctx.ConcurrencyTestSagas.Entry(this).Property("Version").OriginalValue = 999;

Value = order.NewValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ public Frame CommitUnitOfWorkFrame(Variable saga, IServiceContainer container)
call.CommentText = "Committing any pending entity changes to the database";
call.ReturnVariable!.OverrideName(call.ReturnVariable.Usage + "1");

return call;
return new WrapSagaConcurrencyException(saga, call);
}

public Frame DetermineUpdateFrame(Variable saga, IServiceContainer container)
{
return new CommentFrame("No explicit update necessary with EF Core");
var dbContextType = DetermineDbContextType(saga.VariableType, container);
return new IncrementSagaVersionIfNecessary(dbContextType, saga);
}

public Frame DetermineDeleteFrame(Variable sagaId, Variable saga, IServiceContainer container)
Expand Down Expand Up @@ -342,6 +343,73 @@ public override IEnumerable<Variable> FindVariables(IMethodVariables chain)
}
}

public class IncrementSagaVersionIfNecessary : SyncFrame
{
private readonly Type _dbContextType;
private readonly Variable _saga;
private Variable? _context;

public IncrementSagaVersionIfNecessary(Type dbContextType, Variable saga)
{
_dbContextType = dbContextType;
_saga = saga;
}

public override IEnumerable<Variable> FindVariables(IMethodVariables chain)
{
yield return _saga;

_context = chain.FindVariable(_dbContextType);
yield return _context;
}

public override void GenerateCode(GeneratedMethod method, ISourceWriter writer)
{
writer.WriteLine("");
writer.WriteComment("If the saga state changed, then increment it's version to support optimistic concurrency");
writer.WriteLine($"if ({_context!.Usage}.Entry({_saga.Usage}.Type == EntityState.Modified) {{ {_saga.Usage}.Version += 1; }}");

Next?.GenerateCode(method, writer);
}
}

public class WrapSagaConcurrencyException : SyncFrame
{
private readonly Variable _saga;
private readonly Frame _frame;

public WrapSagaConcurrencyException(Variable saga, Frame frame)
{
_saga = saga;
_frame = frame;
}

public override IEnumerable<Variable> FindVariables(IMethodVariables chain)
{
foreach (var variable in _frame.FindVariables(chain)) yield return variable;
}

public override void GenerateCode(GeneratedMethod method, ISourceWriter writer)
{
writer.WriteLine("BLOCK:try");
_frame.GenerateCode(method, writer);
writer.FinishBlock();

writer.WriteLine("BLOCK:catch (DbUpdateConcurrencyException error)");
writer.WriteComment("Only intercepts concurrency error on the saga itself");

writer.WriteLine($"BLOCK:if (error.Entries.Any(e => e.Entity == ${_saga.Usage})");
writer.WriteLine($"throw new SagaConcurrencyException($\"Saga of type {_saga.VariableType.FullNameInCode()} and id {{ {SagaChain.SagaIdVariableName} }} cannot be updated because of optimistic concurrency violations\");");
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using SagaChain.SagaIdVariableName seems like an oversight but I don't have access to the "Variable sagaId" from neither CommitUnitOfWorkFrame nor DetermineUpdateFrame.

writer.FinishBlock();

writer.WriteComment("Rethrow any other exception");
writer.WriteLine("throw;");
writer.FinishBlock();

Next?.GenerateCode(method, writer);
}
}

public class CommitDbContextTransactionIfNecessary : SyncFrame
{
private Variable? _envelopeTransaction;
Expand Down
Loading