我有一个应用程序,它运行多个线程,使用EF 5将数据插入Server 2017数据库表。
使用EF 5插入域模型实体的C#代码如下:
using (var ctx = this.dbContextFactory.CreateDbContext())
{
//ctx.Database.AutoTransactionsEnabled = false;
foreach (var rootEntity in request.RootEntities)
{
ctx.ChangeTracker.TrackGraph(rootEntity, node =>
{
if ((request.EntityTypes != null && request.EntityTypes.Contains(node.Entry.Entity.GetType()))
|| rootEntity == node.Entry.Entity)
{
if (node.Entry.IsKeySet)
node.Entry.State = Microsoft.EntityFrameworkCore.EntityState.Modified;
else
node.Entry.State = Microsoft.EntityFrameworkCore.EntityState.Added;
}
});
}
await ctx.SaveChangesAsync(cancellationToken);
}每个线程负责实例化自己的DbContext实例,因此使用dbContextFactory。
为INSERT (合并)生成的一些示例SQL如下:
SET NOCOUNT ON;
DECLARE @inserted0 TABLE ([OrderId] bigint, [_Position] [int]);
MERGE [dbo].[Orders] USING (
VALUES (@p0, 0),
(@p1, 1),
(@p2, 2),
...
(@43, 41)) AS i ([SomeColumn], _Position) ON 1=0
WHEN NOT MATCHED THEN
INSERT ([SomeColumn])
VALUES (i.[SomeColumn])
OUTPUT INSERTED.[OrderId], i._Position
INTO @inserted0;
SELECT [t].[OrderId] FROM [dbo].[Orders] t
INNER JOIN @inserted0 i ON ([t].[OrderId] = [i].[OrderId])
ORDER BY [i].[_Position];由于这些线程经常同时运行,因此我得到以下SQL异常:
Transaction (Process ID 99) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.EF隐式地将隔离级别设置为读取提交。
使用SQL导致事务死锁的原因如下:

我所关切的是:
原子化。
我相信正是这个选择导致了事务死锁。
我试图解决这个问题:
解决此问题的唯一方法是显式防止由EF Core启动事务,因此代码如下:
ctx.Database.AutoTransactionsEnabled = false;我已经对此进行了多次测试,而且还没有收到事务死锁。考虑到逻辑只是插入新的记录,我相信这是可以做到的。
有人对解决这个问题有什么建议吗?
耽误您时间,实在对不起。
发布于 2021-10-05 06:33:39
对于多线程上的INSERT (MERGE)语句,我们也遇到了同样的问题。我们不想为所有事务启用EnableRetryOnFailure()选项,所以我们编写了下面的DbContent扩展方法。
public static async Task<TResult> SaveWithRetryAsync<TResult>(this DbContext context,
Func<Task<TResult>> bulkInsertOperation,
Func<TResult, Task<bool>> verifyBulkOperationSucceeded,
IsolationLevel isolationLevel = IsolationLevel.Unspecified,
int retryLimit = 6,
int maxRetryDelayInSeconds = 30)
{
var existingTransaction = context.Database.CurrentTransaction?.GetDbTransaction();
if (existingTransaction != null)
throw new InvalidOperationException($"Cannot run {nameof(SaveWithRetryAsync)} inside a transaction");
if (context.ChangeTracker.HasChanges())
{
throw new InvalidOperationException(
"DbContext should be saved before running this action to revert only the changes of this action in case of a concurrency conflict.");
}
const int sqlErrorNrOnDuplicatePrimaryKey = 2627;
const int sqlErrorNrOnSnapshotIsolation = 3960;
const int sqlErrorDeadlock = 1205;
int[] sqlErrorsToRetry = { sqlErrorNrOnDuplicatePrimaryKey, sqlErrorNrOnSnapshotIsolation, sqlErrorDeadlock };
var retryState = new SaveWithRetryState<TResult>(bulkInsertOperation);
// Use EF Cores connection resiliency feature for retrying (see https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency)
// Usually the IExecutionStrategy is configured DbContextOptionsBuilder.UseSqlServer(..., options.EnableRetryOnFailure()).
// In ASP.NET, the DbContext is configured in Startup.cs and we don't want this retry behaviour everywhere for each db operation.
var executionStrategyDependencies = context.Database.GetService<ExecutionStrategyDependencies>();
var retryStrategy = new CustomSqlServerRetryingExecutionStrategy(executionStrategyDependencies, retryLimit, TimeSpan.FromSeconds(maxRetryDelayInSeconds), sqlErrorsToRetry);
try
{
var result = await retryStrategy.ExecuteInTransactionAsync(
retryState,
async (state, cancelToken) =>
{
try
{
var r = await state.Action();
await context.SaveChangesAsync(false, cancelToken);
if (state.FirstException != null)
{
Log.Logger.Warning(
$"Action passed to {nameof(SaveWithRetryAsync)} failed {state.NumberOfRetries} times " +
$"(retry limit={retryLimit}, ThreadId={Thread.CurrentThread.ManagedThreadId}).\nFirst exception was: {state.FirstException}");
}
state.Result = r;
return r;
}
catch (Exception ex)
{
context.RevertChanges();
state.NumberOfRetries++;
state.FirstException ??= ex;
state.LastException = ex;
throw;
}
},
(state, cancelToken) => verifyBulkOperationSucceeded(retryState.Result),
context.GetSupportedIsolationLevel(isolationLevel));
context.ChangeTracker.AcceptAllChanges();
return result;
}
catch (Exception ex)
{
throw new InvalidOperationException(
$"DB Transaction in {nameof(SaveWithRetryAsync)} failed. " +
$"Tried {retryState.NumberOfRetries} times (retry limit={retryLimit}, ThreadId={Thread.CurrentThread.ManagedThreadId}).\n" +
$"First exception was: {retryState.FirstException}.\nLast exception was: {retryState.LastException}",
ex);
}
}使用以下CustomSqlServerRetryingExecutionStrategy
public class CustomSqlServerRetryingExecutionStrategy : SqlServerRetryingExecutionStrategy
{
public CustomSqlServerRetryingExecutionStrategy(ExecutionStrategyDependencies executionStrategyDependencies, int retryLimit, TimeSpan fromSeconds, int[] sqlErrorsToRetry)
: base(executionStrategyDependencies, retryLimit, fromSeconds, sqlErrorsToRetry)
{
}
protected override bool ShouldRetryOn(Exception exception)
{
//SqlServerRetryingExecutionStrategy does not check the base exception, maybe a bug in EF core ?!
return base.ShouldRetryOn(exception) || base.ShouldRetryOn(exception.GetBaseException());
}
}保存当前(重试)状态的Helper类:
private class SaveWithRetryState<T>
{
public SaveWithRetryState(Func<Task<T>> action)
{
Action = action;
}
public Exception FirstException { get; set; }
public Exception LastException { get; set; }
public int NumberOfRetries { get; set; }
public Func<Task<T>> Action { get; }
public T Result { get; set; }
}现在,扩展方法可以如下所示。代码将尝试多次添加大容量(5)。
await _context.SaveWithRetryAsync(
// method to insert the bulk
async () =>
{
var listOfAddedItems = new List<string>();
foreach (var item in bulkImport)
{
listOfAddedItems.Add(item.Guid);
await context.Import.AddAsync(item);
}
return listOfAddedItems;
},
// method to check if the bulk insert was successful
listOfAddedItems =>
{
if (listOfAddedItems == null)
return Task.FromResult(true);
return _context.Import.AsNoTracking().AnyAsync(x => x.Guid == listOfAddedItems.First());
},
IsolationLevel.Snapshot,
5, // max retry attempts
100); // max retry time关于为什么会发生这种情况的背景信息,请看下面的讨论:https://github.com/dotnet/efcore/issues/21899
https://stackoverflow.com/questions/68713938
复制相似问题