首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >带有多个同时大容量合并语句的EF Core 5事务死锁

带有多个同时大容量合并语句的EF Core 5事务死锁
EN

Stack Overflow用户
提问于 2021-08-09 14:35:35
回答 1查看 809关注 0票数 2

我有一个应用程序,它运行多个线程,使用EF 5将数据插入Server 2017数据库表。

使用EF 5插入域模型实体的C#代码如下:

代码语言:javascript
复制
using (var ctx = this.dbContextFactory.CreateDbContext())
{
    //ctx.Database.AutoTransactionsEnabled = false;
    foreach (var rootEntity in request.RootEntities)
    {
        ctx.ChangeTracker.TrackGraph(rootEntity, node =>
        {
            if ((request.EntityTypes != null && request.EntityTypes.Contains(node.Entry.Entity.GetType()))
                || rootEntity == node.Entry.Entity)
            {
                if (node.Entry.IsKeySet)
                    node.Entry.State = Microsoft.EntityFrameworkCore.EntityState.Modified;
                else
                    node.Entry.State = Microsoft.EntityFrameworkCore.EntityState.Added;
            }
        });
    }
    await ctx.SaveChangesAsync(cancellationToken);
}

每个线程负责实例化自己的DbContext实例,因此使用dbContextFactory。

为INSERT (合并)生成的一些示例SQL如下:

代码语言:javascript
复制
SET NOCOUNT ON;
DECLARE @inserted0 TABLE ([OrderId] bigint, [_Position] [int]);
MERGE [dbo].[Orders] USING (
VALUES (@p0, 0),
(@p1, 1),
(@p2, 2),
...
(@43, 41)) AS i ([SomeColumn],  _Position) ON 1=0
WHEN NOT MATCHED THEN
INSERT ([SomeColumn])
VALUES (i.[SomeColumn])
OUTPUT INSERTED.[OrderId], i._Position
INTO @inserted0;

SELECT [t].[OrderId] FROM [dbo].[Orders] t
INNER JOIN @inserted0 i ON ([t].[OrderId] = [i].[OrderId])
ORDER BY [i].[_Position];

由于这些线程经常同时运行,因此我得到以下SQL异常:

代码语言:javascript
复制
Transaction (Process ID 99) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

EF隐式地将隔离级别设置为读取提交。

使用SQL导致事务死锁的原因如下:

我所关切的是:

  1. Frustratingly,由EF生成的SQL包括两个语句:合并,然后是SELECT。我不理解SELECT的用途,因为主键的标识可以从@inserted0 0表变量中获得。给定this answer,单独的合并语句就足以使这个.

原子化。

我相信正是这个选择导致了事务死锁。

  1. 我试图通过使用READ提交快照来避免与主键查找的冲突来解决这个问题,但是我仍然得到了相同的错误,尽管这个隔离级别应该可以避免锁定和使用行版本控制。

我试图解决这个问题:

解决此问题的唯一方法是显式防止由EF Core启动事务,因此代码如下:

代码语言:javascript
复制
ctx.Database.AutoTransactionsEnabled = false;

我已经对此进行了多次测试,而且还没有收到事务死锁。考虑到逻辑只是插入新的记录,我相信这是可以做到的。

有人对解决这个问题有什么建议吗?

耽误您时间,实在对不起。

EN

回答 1

Stack Overflow用户

发布于 2021-10-05 06:33:39

对于多线程上的INSERT (MERGE)语句,我们也遇到了同样的问题。我们不想为所有事务启用EnableRetryOnFailure()选项,所以我们编写了下面的DbContent扩展方法。

代码语言:javascript
复制
  public static async Task<TResult> SaveWithRetryAsync<TResult>(this DbContext context,
                                                                  Func<Task<TResult>> bulkInsertOperation,
                                                                  Func<TResult, Task<bool>> verifyBulkOperationSucceeded,
                                                                  IsolationLevel isolationLevel = IsolationLevel.Unspecified,
                                                                  int retryLimit = 6,
                                                                  int maxRetryDelayInSeconds = 30)
    {
        var existingTransaction = context.Database.CurrentTransaction?.GetDbTransaction();
        if (existingTransaction != null)
            throw new InvalidOperationException($"Cannot run {nameof(SaveWithRetryAsync)} inside a transaction");

        if (context.ChangeTracker.HasChanges())
        {
            throw new InvalidOperationException(
                "DbContext should be saved before running this action to revert only the changes of this action in case of a concurrency conflict.");
        }

        const int sqlErrorNrOnDuplicatePrimaryKey = 2627;
        const int sqlErrorNrOnSnapshotIsolation = 3960;
        const int sqlErrorDeadlock = 1205;
        int[] sqlErrorsToRetry = { sqlErrorNrOnDuplicatePrimaryKey, sqlErrorNrOnSnapshotIsolation, sqlErrorDeadlock };

        var retryState = new SaveWithRetryState<TResult>(bulkInsertOperation);

        // Use EF Cores connection resiliency feature for retrying (see https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency)
        // Usually the IExecutionStrategy is configured DbContextOptionsBuilder.UseSqlServer(..., options.EnableRetryOnFailure()).
        // In ASP.NET, the DbContext is configured in Startup.cs and we don't want this retry behaviour everywhere for each db operation.
        var executionStrategyDependencies = context.Database.GetService<ExecutionStrategyDependencies>();
        var retryStrategy = new CustomSqlServerRetryingExecutionStrategy(executionStrategyDependencies, retryLimit, TimeSpan.FromSeconds(maxRetryDelayInSeconds), sqlErrorsToRetry);

        try
        {
            var result = await retryStrategy.ExecuteInTransactionAsync(
                retryState,
                async (state, cancelToken) =>
                {
                    try
                    {
                        var r = await state.Action();

                        await context.SaveChangesAsync(false, cancelToken);

                        if (state.FirstException != null)
                        {
                            Log.Logger.Warning(
                                $"Action passed to {nameof(SaveWithRetryAsync)} failed {state.NumberOfRetries} times " +
                                $"(retry limit={retryLimit}, ThreadId={Thread.CurrentThread.ManagedThreadId}).\nFirst exception was: {state.FirstException}");
                        }

                        state.Result = r;
                        return r;
                    }
                    catch (Exception ex)
                    {
                        context.RevertChanges();
                        state.NumberOfRetries++;
                        state.FirstException ??= ex;
                        state.LastException = ex;

                        throw;
                    }
                },
                (state, cancelToken) => verifyBulkOperationSucceeded(retryState.Result),
                context.GetSupportedIsolationLevel(isolationLevel));

            context.ChangeTracker.AcceptAllChanges();
            return result;
        }
        catch (Exception ex)
        {
            throw new InvalidOperationException(
                $"DB Transaction in {nameof(SaveWithRetryAsync)} failed. " +
                $"Tried {retryState.NumberOfRetries} times (retry limit={retryLimit}, ThreadId={Thread.CurrentThread.ManagedThreadId}).\n" +
                $"First exception was: {retryState.FirstException}.\nLast exception was: {retryState.LastException}",
                ex);
        }
    }

使用以下CustomSqlServerRetryingExecutionStrategy

代码语言:javascript
复制
 public class CustomSqlServerRetryingExecutionStrategy : SqlServerRetryingExecutionStrategy
{
    public CustomSqlServerRetryingExecutionStrategy(ExecutionStrategyDependencies executionStrategyDependencies, int retryLimit, TimeSpan fromSeconds, int[] sqlErrorsToRetry)
        : base(executionStrategyDependencies, retryLimit, fromSeconds, sqlErrorsToRetry)
    {
    }

    protected override bool ShouldRetryOn(Exception exception)
    {
        //SqlServerRetryingExecutionStrategy does not check the base exception, maybe a bug in EF core ?!
        return base.ShouldRetryOn(exception) || base.ShouldRetryOn(exception.GetBaseException());
    }
}

保存当前(重试)状态的Helper类:

代码语言:javascript
复制
private class SaveWithRetryState<T>
    {
        public SaveWithRetryState(Func<Task<T>> action)
        {
            Action = action;
        }

        public Exception FirstException { get; set; }
        public Exception LastException { get; set; }
        public int NumberOfRetries { get; set; }
        public Func<Task<T>> Action { get; }
        public T Result { get; set; }
    }

现在,扩展方法可以如下所示。代码将尝试多次添加大容量(5)。

代码语言:javascript
复制
 await _context.SaveWithRetryAsync(
            // method to insert the bulk
            async () =>
            {
                var listOfAddedItems = new List<string>();
                foreach (var item in bulkImport)
                {
                    listOfAddedItems.Add(item.Guid);
                    await context.Import.AddAsync(item);
                }

                return listOfAddedItems;
            },
            // method to check if the bulk insert was successful
            listOfAddedItems =>
            {

                if (listOfAddedItems == null)
                    return Task.FromResult(true);
                return _context.Import.AsNoTracking().AnyAsync(x => x.Guid == listOfAddedItems.First());
            },
            IsolationLevel.Snapshot,
            5, // max retry attempts
            100); // max retry time

关于为什么会发生这种情况的背景信息,请看下面的讨论:https://github.com/dotnet/efcore/issues/21899

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68713938

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档