如何处理WCF的MSMQ绑定中的消息失败
我已经创建了WCF服务,并且正在使用netMsmqBinding绑定。
这是一个简单的服务,将Dto传递给我的服务方法,并且不期望响应。该消息被放置在MSMQ中,并且一旦被拾取就插入到数据库中。
确保没有数据丢失的最佳方法是什么?
我尝试了以下2种方法:
- 引发异常这会将消息置于死信队列中,以供人工仔细阅读。服务开始时我可以处理
- 在绑定上设置receiveRetryCount =" 3" 3次尝试之后-瞬间发生,这似乎使消息排在队列中,但是服务出现故障。重新启动我的服务会重复此过程。
理想情况下,我想执行以下操作:
尝试处理消息
- 如果失败,请等待5分钟以获取该消息,然后重试。
- 如果该过程失败3次,则将邮件移到一个死信队列中。
- 重新启动该服务会将所有消息从死信队列中推回该队列中,以便可以对其进行处理。
我能做到吗?如果可以,怎么办?
我们能否指出我关于如何在给定场景中最佳利用WCF和MSMQ的任何优秀文章。
任何帮助将非常感激。谢谢!
一些其他信息
我在Windows XP和Windows Server 2003上使用MSMQ 3.0。
不幸的是,我无法使用针对MSMQ 4.0和Vista / 2008的内置的毒害消息支持。
解决方案
回答
我认为使用MSMQ(仅在Vista上可用)可以做到这一点:
<bindings> <netMsmqBinding> <binding name="PosionMessageHandling" receiveRetryCount="3" retryCycleDelay="00:05:00" maxRetryCycles="3" receiveErrorHandling="Move" /> </netMsmqBinding> </bindings>
在第一次呼叫失败后,WCF将立即重试ReceiveRetryCount次。批处理失败后,消息将被移动
到重试队列。延迟RetryCycleDelay分钟后,消息从重试队列移至端点队列,然后重试该批处理。这将重复
MaxRetryCycle时间。如果所有失败,则根据可以移动的receiveErrorHandling处理消息。
(到中毒队列),拒绝,掉落或者故障
顺便说一下,有关WCF和MSMQ的好文章是Juval Lowy撰写的Progammig WCF书的第9章
回答
不幸的是,我只能使用Windows XP和Windows Server 2003,因此这不是我的选择。 (我将在发布问题后发现这个解决方案并意识到我无法使用它时再次澄清我的问题)
我发现一种解决方案是设置一个自定义处理程序,该处理程序会将我的消息移动到另一个队列或者中毒队列并重新启动我的服务。
在我看来,这太疯狂了。想象一下,我的Sql Server关闭了重新启动服务的频率。
因此,我最终要做的是允许线路故障并将消息保留在队列中。
我还将致命消息记录到我的系统记录服务中,这已经发生了。
解决问题后,我将重新启动服务,所有消息将再次开始处理。
我意识到重新处理此消息或者任何其他消息都会失败,因此为什么需要将此消息和其他消息移动到另一个队列。我最好停止我的服务,然后在一切正常运行时重新启动它。
aogan,我们对MSMQ 4.0有了完美的答案,但不幸的是我不满意
回答
如果我们使用的是SQL-Server,则应使用分布式事务,因为MSMQ和SQL-Server都支持它。发生的情况是将数据库写操作包装在TransactionScope块中,仅在成功时才调用scope.Complete()。如果失败,则当WCF方法返回时,该消息将被放回到队列中以再次尝试。这是我使用的代码的精简版本:
[OperationBehavior(TransactionScopeRequired=true, TransactionAutoComplete=true)] public void InsertRecord(RecordType record) { try { using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required)) { SqlConnection InsertConnection = new SqlConnection(ConnectionString); InsertConnection.Open(); // Insert statements go here InsertConnection.Close(); // Vote to commit the transaction if there were no failures scope.Complete(); } } catch (Exception ex) { logger.WarnException(string.Format("Distributed transaction failure for {0}", Transaction.Current.TransactionInformation.DistributedIdentifier.ToString()), ex); } }
我通过对大量但已知数量的记录进行排队来测试这一点,让WCF启动大量线程以同时处理其中的许多记录(一次到达16个线程--16消息离开队列),然后在操作过程中终止进程。重新启动程序后,将从队列中读取消息并再次进行处理,就好像什么都没发生一样,并且在测试结束时,数据库是一致的,并且没有丢失的记录。
分布式事务管理器具有环境状态,当我们创建新的TransactionScope实例时,它会自动在方法调用范围内搜索当前事务-该方法应该由WCF在将消息从队列中弹出时已经由WCF创建。并调用了方法。
回答
SDK中有一个示例,可能对我们有用。基本上,它所做的是将IErrorHandler实现添加到服务,该实现将在WCF声明消息为"中毒"(即,当所有已配置的重试用尽后)时捕获错误。该示例所做的是将消息移至另一个队列,然后重新启动与该消息关联的ServiceHost(因为在发现有毒消息时它将出现故障)。
这不是一个非常漂亮的示例,但它可能会很有用。但是,有两个限制:
1如果我们有与服务关联的多个端点(即通过多个队列公开),则无法知道有害消息到达的队列。如果只有一个队列,这将不是问题。我还没有看到任何官方的解决方法,但是我已经尝试了一种可能的替代方法,我已经在这里进行了记录:http://winterdom.com/weblog/2008/05/27/NetMSMQAndPoisonMessages.aspx
2将问题消息移到另一个队列后,这将由我们负责,因此,一旦超时完成(或者将新服务添加到该队列以进行处理),就由我们将其移回到处理队列。
老实说,无论哪种情况,我们都在这里看一些WCF本身无法涵盖的"手动"工作。
我最近一直在一个不同的项目中工作,我需要明确控制重试的频率,而当前的解决方案是创建一组重试队列,并根据以下情况在重试队列和主处理队列之间手动移动消息一组计时器和一些试探法,仅使用原始的System.Messaging内容来处理MSMQ队列。这样做似乎很不错,但是如果我们这样做的话,可能会遇到一些麻烦。