Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 5 additions & 26 deletions EfCore.BulkOperations.API/Repositories/ProductRepository.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
using System.Data;
using System.Data.Common;
using EfCore.BulkOperations.API.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;

namespace EfCore.BulkOperations.API.Repositories;

Expand Down Expand Up @@ -57,14 +54,9 @@ public async Task<int> BulkMergeProducts(List<Product> products)

public async Task SyncDataThenCommit(List<Product> list1, List<Product> list2)
{
IDbContextTransaction? transaction = null;
DbConnection? connection = null;
try
{
connection = dbContext.Database.GetDbConnection();
if (connection.State != ConnectionState.Open) await connection.OpenAsync();
transaction = await dbContext.Database.BeginTransactionAsync();
var dbTransaction = transaction.GetDbTransaction();
var dbTransaction = await dbContext.BeginTransactionAsync();

await dbContext.BulkInsertAsync(
list1,
Expand All @@ -76,29 +68,20 @@ await dbContext.BulkInsertAsync(
dbTransaction);
await dbContext.BulkInsertAsync(list2, null, dbTransaction);

await transaction.CommitAsync();
await dbContext.CommitAsync();
}
catch (Exception)
{
if (transaction is not null) await transaction.RollbackAsync();
await dbContext.RollbackAsync();
throw;
}
finally
{
if (connection is { State: ConnectionState.Open }) await connection.CloseAsync();
}
}

public async Task SyncDataThenRollback(Product item1, List<Product> list2, List<Product> list3)
{
IDbContextTransaction? transaction = null;
DbConnection? connection = null;
try
{
connection = dbContext.Database.GetDbConnection();
if (connection.State != ConnectionState.Open) await connection.OpenAsync();
transaction = await dbContext.Database.BeginTransactionAsync();
var dbTransaction = transaction.GetDbTransaction();
var dbTransaction = await dbContext.BeginTransactionAsync();

await dbContext.Products.AddAsync(item1);
await dbContext.SaveChangesAsync();
Expand All @@ -109,12 +92,8 @@ public async Task SyncDataThenRollback(Product item1, List<Product> list2, List<
}
catch (Exception)
{
if (transaction is not null) await transaction.RollbackAsync();
await dbContext.RollbackAsync();
throw;
}
finally
{
if (connection is { State: ConnectionState.Open }) await connection.CloseAsync();
}
}
}
26 changes: 14 additions & 12 deletions EfCore.BulkOperations/BulkCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,7 @@ internal static IEnumerable<BatchData> GenerateMergeBatches<T>(DbContext dbConte
var colIndex = 0;
columns.ToList().ForEach(column =>
{
var value = type.GetProperty(column.RefName)?.GetValue(row);
if (column.ValueConverter is not null)
value = column.ValueConverter.ConvertToProvider(value);

var paramName = $"{Prefix}{rowIndex}_{colIndex}".ToString();
list.Add(new SqlParameter(paramName, value));
var paramName = ProcessParameter(type, column, row, rowIndex, colIndex, list);
sql.Append($"{paramName} AS `{column.Name}`, ");
colIndex++;
});
Expand Down Expand Up @@ -357,12 +352,7 @@ internal static IEnumerable<BatchData> GenerateMergeBatches<T>(DbContext dbConte
var colIndex = 0;
columns.ToList().ForEach(column =>
{
var value = type.GetProperty(column.RefName)?.GetValue(row);
if (column.ValueConverter is not null)
value = column.ValueConverter.ConvertToProvider(value);

var paramName = $"{Prefix}{rowIndex}_{colIndex}".ToString();
list.Add(new SqlParameter(paramName, value));
var paramName = ProcessParameter(type, column, row, rowIndex, colIndex, list);
sql.Append($"{paramName}, ");
colIndex++;
});
Expand All @@ -373,4 +363,16 @@ internal static IEnumerable<BatchData> GenerateMergeBatches<T>(DbContext dbConte
sql.Remove(sql.Length - 2, 1);
return new TempTable(sql, parameters);
}

private static string ProcessParameter<T>(Type type, ColumnInfo column, T row, int rowIndex, int colIndex,
List<SqlParameter> list)
{
var value = type.GetProperty(column.RefName)?.GetValue(row);
if (column.ValueConverter is not null)
value = column.ValueConverter.ConvertToProvider(value);

var paramName = $"{Prefix}{rowIndex}_{colIndex}".ToString();
list.Add(new SqlParameter(paramName, value));
return paramName;
}
}
21 changes: 21 additions & 0 deletions EfCore.BulkOperations/DbContextExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Data.Common;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;

namespace EfCore.BulkOperations;

Expand Down Expand Up @@ -83,4 +84,24 @@ public static async Task<int> BulkMergeAsync<T>(this DbContext dbContext,
{
return await EfCoreBulkUtils.BulkMergeAsync(dbContext, items, optionFactory, transaction, cancellationToken);
}

public static async Task<DbTransaction> BeginTransactionAsync(this DbContext dbContext,
CancellationToken cancellationToken = default)
{
var transaction = await dbContext.Database.BeginTransactionAsync(cancellationToken);
var dbTransaction = transaction.GetDbTransaction();
return dbTransaction;
}

public static async Task CommitAsync(this DbContext dbContext, CancellationToken cancellationToken = default)
{
if (dbContext.Database.CurrentTransaction != null)
await dbContext.Database.CurrentTransaction.CommitAsync(cancellationToken);
}

public static async Task RollbackAsync(this DbContext dbContext, CancellationToken cancellationToken = default)
{
if (dbContext.Database.CurrentTransaction != null)
await dbContext.Database.CurrentTransaction.RollbackAsync(cancellationToken);
}
}
20 changes: 6 additions & 14 deletions EfCore.BulkOperations/Docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,29 +98,21 @@ EfCore.BulkOperations utilizes local transactions within bulk processes. If you
can pass an existing transaction into the bulk process.

```js
IDbContextTransaction? transaction = null;
DbConnection? connection = null;
try
{
connection = dbContext.Database.GetDbConnection();
if (connection.State != ConnectionState.Open) await connection.OpenAsync();
transaction = await dbContext.Database.BeginTransactionAsync();
var dbTransaction = transaction.GetDbTransaction();
var dbTransaction = dbContext.BeginTransactionAsync();

await dbContext.Products.AddAsync(item1);
await dbContext.Products.AddAsync (item1);
await dbContext.SaveChangesAsync();
await dbContext.BulkInsertAsync(list2, null, dbTransaction);
await dbContext.BulkInsertAsync(list3, null, dbTransaction);

throw new DbUpdateException("Internal Server Error");
throw new DbUpdateException("Some error occurs");
await dbTransaction.CommitAsync();
}
catch (Exception ex)
catch (Exception)
{
if (transaction is not null) await transaction.RollbackAsync();
await dbContext.RollbackAsync();
throw;
}
finally
{
if (connection is { State: ConnectionState.Open }) await connection.CloseAsync();
}
```
2 changes: 1 addition & 1 deletion EfCore.BulkOperations/EfCore.BulkOperations.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<RepositoryUrl>https://github.com/hongjs/EfCore.BulkOperations</RepositoryUrl>
<RepositoryType>github</RepositoryType>
<PackageTags>BulkInsert,BulkUpdate,BulkDelete,BulkMerge</PackageTags>
<Version>1.4.1</Version>
<Version>1.5.0</Version>
<PackageReadmeFile>README.md</PackageReadmeFile>
<PackageProjectUrl>https://github.com/hongjs/EfCore.BulkOperations</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/hongjs/EfCore.BulkOperations?tab=MIT-1-ov-file</PackageLicenseUrl>
Expand Down
2 changes: 1 addition & 1 deletion EfCore.BulkOperations/EfCoreBulkUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private static async Task<int> ExecuteBatchDataAsync(
CancellationToken? cancellationToken = null)
{
await using var command = connection.CreateCommand();
if (command.Connection is null) throw new ArgumentException("Command.Connection is null");
if (command.Connection is null) throw new ArgumentNullException(nameof(connection));
if (dbTransaction is not null) command.Transaction = dbTransaction;
command.CommandText = batch.Sql.ToString();
if (commandTimeout is not null) command.CommandTimeout = commandTimeout.Value;
Expand Down
16 changes: 4 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,29 +98,21 @@ EfCore.BulkOperations utilizes local transactions within bulk processes. If you
can pass an existing transaction into the bulk process.

```js
IDbContextTransaction? transaction = null;
DbConnection? connection = null;
try
{
connection = dbContext.Database.GetDbConnection();
if (connection.State != ConnectionState.Open) await connection.OpenAsync();
transaction = await dbContext.Database.BeginTransactionAsync();
var dbTransaction = transaction.GetDbTransaction();
var dbTransaction = dbContext.BeginTransactionAsync();

await dbContext.Products.AddAsync (item1);
await dbContext.SaveChangesAsync();
await dbContext.BulkInsertAsync(list2, null, dbTransaction);
await dbContext.BulkInsertAsync(list3, null, dbTransaction);

throw new DbUpdateException("Internal Server Error");
throw new DbUpdateException("Some error occurs");
await dbTransaction.CommitAsync();
}
catch (Exception)
{
if (transaction is not null) await transaction.RollbackAsync();
await dbContext.RollbackAsync();
throw;
}
finally
{
if (connection is { State: ConnectionState.Open }) await connection.CloseAsync();
}
```