multithreading 多线程实体框架:连接未关闭。连接的当前状态是连接
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/12521695/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Multithreading Entity Framework: The connection was not closed. The connection's current state is connecting
提问by Brandon
So I have a windows service process that performs a workflow process. The back end uses Repository and UnitofWork Pattern and Unity on top of Entity Framework with the entities class generated from the edmx. I won't go into a whole lot of detail as its not necessary but basically there are 5 steps that the workflow goes through. A particular process might be at any stage at any point in time (in order of course). Step one just generates data for step two, which validates the data via a long running process to another server. Then step there generates a pdf with that data. For each stage we spawn a timer, however it is configurable to allow more than one timer to be spawned for each stage. Therein lays the problem. When I add a processor to a particular stage, it the following error randomly:
所以我有一个执行工作流过程的 Windows 服务过程。后端在实体框架之上使用 Repository 和 UnitofWork Pattern 以及 Unity,其中实体类是从 edmx 生成的。我不会详细介绍,因为它不是必需的,但基本上工作流程有 5 个步骤。一个特定的过程可能处于任何时间点的任何阶段(当然是按顺序)。第一步只是为第二步生成数据,它通过一个长时间运行的进程向另一台服务器验证数据。然后在那里生成带有该数据的pdf。对于每个阶段,我们都会生成一个计时器,但是可以配置为允许为每个阶段生成多个计时器。这就是问题所在。当我将处理器添加到特定阶段时,它会随机出现以下错误:
The connection was not closed. The connection's current state is connecting.
连接没有关闭。连接的当前状态是连接。
Reading up on this it seems obvious that this is happening because the context is trying to access the same entity from two threads. But this is where it is kind of throwing me for a loop. All of the information I can find on this states that we should be using a instance context per thread. Which as far as I can tell I am doing (see the code below). I am not using singleton pattern or statics or anything so I am not really sure why this is happening or how to avoid it. I have posted the relevant bits of my code below for your review.
阅读此内容似乎很明显这是因为上下文试图从两个线程访问同一实体。但这就是让我陷入困境的地方。我能找到的所有信息都表明我们应该为每个线程使用一个实例上下文。据我所知,我正在做(见下面的代码)。我没有使用单例模式或静态或任何东西,所以我不确定为什么会发生这种情况或如何避免它。我已经在下面发布了我的代码的相关部分供您查看。
The base repository:
基础存储库:
public class BaseRepository
{
/// <summary>
/// Initializes a repository and registers with a <see cref="IUnitOfWork"/>
/// </summary>
/// <param name="unitOfWork"></param>
public BaseRepository(IUnitOfWork unitOfWork)
{
if (unitOfWork == null) throw new ArgumentException("unitofWork");
UnitOfWork = unitOfWork;
}
/// <summary>
/// Returns a <see cref="DbSet"/> of entities.
/// </summary>
/// <typeparam name="TEntity">Entity type the dbset needs to return.</typeparam>
/// <returns></returns>
protected virtual DbSet<TEntity> GetDbSet<TEntity>() where TEntity : class
{
return Context.Set<TEntity>();
}
/// <summary>
/// Sets the state of an entity.
/// </summary>
/// <param name="entity">object to set state.</param>
/// <param name="entityState"><see cref="EntityState"/></param>
protected virtual void SetEntityState(object entity, EntityState entityState)
{
Context.Entry(entity).State = entityState;
}
/// <summary>
/// Unit of work controlling this repository.
/// </summary>
protected IUnitOfWork UnitOfWork { get; set; }
/// <summary>
///
/// </summary>
/// <param name="entity"></param>
protected virtual void Attach(object entity)
{
if (Context.Entry(entity).State == EntityState.Detached)
Context.Entry(entity).State = EntityState.Modified;
}
protected virtual void Detach(object entity)
{
Context.Entry(entity).State = EntityState.Detached;
}
/// <summary>
/// Provides access to the ef context we are working with
/// </summary>
internal StatementAutoEntities Context
{
get
{
return (StatementAutoEntities)UnitOfWork;
}
}
}
StatementAutoEntities is the autogenerated EF class.
StatementAutoEntities 是自动生成的 EF 类。
The repository implementation:
存储库实现:
public class ProcessingQueueRepository : BaseRepository, IProcessingQueueRepository
{
/// <summary>
/// Creates a new repository and associated with a <see cref="IUnitOfWork"/>
/// </summary>
/// <param name="unitOfWork"></param>
public ProcessingQueueRepository(IUnitOfWork unitOfWork) : base(unitOfWork)
{
}
/// <summary>
/// Create a new <see cref="ProcessingQueue"/> entry in database
/// </summary>
/// <param name="Queue">
/// <see cref="ProcessingQueue"/>
/// </param>
public void Create(ProcessingQueue Queue)
{
GetDbSet<ProcessingQueue>().Add(Queue);
UnitOfWork.SaveChanges();
}
/// <summary>
/// Updates a <see cref="ProcessingQueue"/> entry in database
/// </summary>
/// <param name="queue">
/// <see cref="ProcessingQueue"/>
/// </param>
public void Update(ProcessingQueue queue)
{
//Attach(queue);
UnitOfWork.SaveChanges();
}
/// <summary>
/// Delete a <see cref="ProcessingQueue"/> entry in database
/// </summary>
/// <param name="Queue">
/// <see cref="ProcessingQueue"/>
/// </param>
public void Delete(ProcessingQueue Queue)
{
GetDbSet<ProcessingQueue>().Remove(Queue);
UnitOfWork.SaveChanges();
}
/// <summary>
/// Gets a <see cref="ProcessingQueue"/> by its unique Id
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public ProcessingQueue GetById(int id)
{
return (from e in Context.ProcessingQueue_SelectById(id) select e).FirstOrDefault();
}
/// <summary>
/// Gets a list of <see cref="ProcessingQueue"/> entries by status
/// </summary>
/// <param name="status"></param>
/// <returns></returns>
public IList<ProcessingQueue> GetByStatus(int status)
{
return (from e in Context.ProcessingQueue_SelectByStatus(status) select e).ToList();
}
/// <summary>
/// Gets a list of all <see cref="ProcessingQueue"/> entries
/// </summary>
/// <returns></returns>
public IList<ProcessingQueue> GetAll()
{
return (from e in Context.ProcessingQueue_Select() select e).ToList();
}
/// <summary>
/// Gets the next pending item id in the queue for a specific work
/// </summary>
/// <param name="serverId">Unique id of the server that will process the item in the queue</param>
/// <param name="workTypeId">type of <see cref="WorkType"/> we are looking for</param>
/// <param name="operationId">if defined only operations of the type indicated are considered.</param>
/// <returns>Next pending item in the queue for the work type or null if no pending work is found</returns>
public int GetNextPendingItemId(int serverId, int workTypeId, int? operationId)
{
var id = Context.ProcessingQueue_GetNextPending(serverId, workTypeId, operationId).SingleOrDefault();
return id.HasValue ? id.Value : -1;
}
/// <summary>
/// Returns a list of <see cref="ProcessingQueueStatus_dto"/>s objects with all
/// active entries in the queue
/// </summary>
/// <returns></returns>
public IList<ProcessingQueueStatus_dto> GetActiveStatusEntries()
{
return (from e in Context.ProcessingQueueStatus_Select() select e).ToList();
}
/// <summary>
/// Bumps an entry to the front of the queue
/// </summary>
/// <param name="processingQueueId"></param>
public void Bump(int processingQueueId)
{
Context.ProcessingQueue_Bump(processingQueueId);
}
}
We use Unity for dependency injection, some calling code for example:
我们使用Unity进行依赖注入,一些调用代码例如:
#region Members
private readonly IProcessingQueueRepository _queueRepository;
#endregion
#region Constructors
/// <summary>Initializes ProcessingQueue services with repositories</summary>
/// <param name="queueRepository"><see cref="IProcessingQueueRepository"/></param>
public ProcessingQueueService(IProcessingQueueRepository queueRepository)
{
Check.Require(queueRepository != null, "processingQueueRepository is required");
_queueRepository = queueRepository;
}
#endregion
The code in the windows service that kicks off the timers is as follows:
windows服务中启动定时器的代码如下:
_staWorkTypeConfigLock.EnterReadLock();
foreach (var timer in from operation in (from o in _staWorkTypeConfig.WorkOperations where o.UseQueueForExecution && o.AssignedProcessors > 0 select o)
let interval = operation.SpawnInternval < 30 ? 30 : operation.SpawnInternval
select new StaTimer
{
Interval = _runImmediate ? 5000 : interval*1000,
Operation = (ProcessingQueue.RequestedOperation) operation.OperationId
})
{
timer.Elapsed += ApxQueueProcessingOnElapsedInterval;
timer.Enabled = true;
Logger.DebugFormat("Queue processing for operations of type {0} will execute every {1} seconds", timer.Operation, timer.Interval/1000);
}
_staWorkTypeConfigLock.ExitReadLock();
StaTimer is just a wrapper on timer adding operation type. ApxQueueProcessingOnElapsedInterval then bascially just assigns work to the process based on the operation.
StaTimer 只是计时器添加操作类型的包装器。ApxQueueProcessingOnElapsedInterval 然后基本上只是根据操作将工作分配给进程。
I will also add a bit of the ApxQueueProcessingOnElapsedInterval code where we are spawning tasks.
我还将在我们生成任务的地方添加一些 ApxQueueProcessingOnElapsedInterval 代码。
_staTasksLock.EnterWriteLock();
for (var x = 0; x < tasksNeeded; x++)
{
var t = new Task(obj => ProcessStaQueue((QueueProcessConfig) obj),
CreateQueueProcessConfig(true, operation), _cancellationToken);
_staTasks.Add(new Tuple<ProcessingQueue.RequestedOperation, DateTime, Task>(operation, DateTime.Now,t));
t.Start();
Thread.Sleep(300); //so there are less conflicts fighting for jobs in the queue table
}
_staTasksLock.ExitWriteLock();
回答by Ladislav Mrnka
Looks like your service, repository and context are supposed to live for the whole life time of your application but that is incorrect. You can have multiple timers triggered at the same time. That means multiple threads will use your service in parallel and they will execute the code of your service in their thread = context is shared among multiple threads => exception because context is not thread safe.
看起来您的服务、存储库和上下文应该在您的应用程序的整个生命周期内都存在,但这是不正确的。您可以同时触发多个计时器。这意味着多个线程将并行使用您的服务,并且它们将在其线程中执行您的服务代码 = 上下文在多个线程之间共享 => 异常,因为上下文不是线程安全的。
The only option is to use a new context instance for each operation you want to execute. You can for example change your classes to accept context factory instead of context and get a new context for each operation.
唯一的选择是为您要执行的每个操作使用一个新的上下文实例。例如,您可以更改您的类以接受上下文工厂而不是上下文,并为每个操作获取一个新的上下文。
回答by hofnarwillie
In case this helps anyone:
如果这对任何人有帮助:
In my case, I did ensure that the non-thread-safe DbContext
had a TransientLifetime
(using Ninject), but it was still causing concurrency issues! Turns out that in some of my custom ActionFilters
I used Dependency Injection to get access to the DbContext
in the constructor, but ActionFilters
have a lifetime that keeps them instantiated over multiple requests, so the context didn't get recreated.
就我而言,我确实确保非线程安全DbContext
有一个TransientLifetime
(使用 Ninject),但它仍然导致并发问题!事实证明,在我的一些自定义中,ActionFilters
我使用了依赖注入来访问DbContext
构造函数中的 ,但是ActionFilters
有一个生命周期可以让它们在多个请求中实例化,因此没有重新创建上下文。
I fixed it by manually resolving the dependency in the OnActionExecuting
method instead of in the constructor so that it is a fresh instance every time.
我通过在OnActionExecuting
方法中而不是在构造函数中手动解析依赖来修复它,以便它每次都是一个新实例。
回答by vaindil
In my case I was getting this problem because I forgot the await
keyword before one of my DAL function calls. Putting await
there resolved it.
就我而言,我遇到了这个问题,因为我await
在 DAL 函数调用之一之前忘记了关键字。放在await
那里解决了它。