带有取消令牌的 NetworkStream.ReadAsync 永远不会取消
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/12421989/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
NetworkStream.ReadAsync with a cancellation token never cancels
提问by Softlion
Here the proof.
Any idea what is wrong in this code ?
这里的证明。
知道这段代码有什么问题吗?
[TestMethod]
public void TestTest()
{
var tcp = new TcpClient() { ReceiveTimeout = 5000, SendTimeout = 20000 };
tcp.Connect(IPAddress.Parse("176.31.100.115"), 25);
bool ok = Read(tcp.GetStream()).Wait(30000);
Assert.IsTrue(ok);
}
async Task Read(NetworkStream stream)
{
using (var cancellationTokenSource = new CancellationTokenSource(5000))
{
int receivedCount;
try
{
var buffer = new byte[1000];
receivedCount = await stream.ReadAsync(buffer, 0, 1000, cancellationTokenSource.Token);
}
catch (TimeoutException e)
{
receivedCount = -1;
}
}
}
采纳答案by Softlion
I finally found a workaround. Combine the async call with a delay task (Task.Delay) using Task.WaitAny. When the delay elapses before the io task, close the stream. This will force the task to stop. You should handle the async exception on the io task correctly. And you should add a continuation task for both the delayed task and the io task.
我终于找到了解决方法。使用 Task.WaitAny 将异步调用与延迟任务 (Task.Delay) 结合起来。当延迟在 io 任务之前过去时,关闭流。这将强制任务停止。您应该正确处理 io 任务上的异步异常。并且您应该为延迟任务和 io 任务添加一个延续任务。
It also work with tcp connections. Closing the connection in another thread (you could consider it is the delay task thread) forces all async tasks using/waiting for this connection to stop.
它也适用于 tcp 连接。在另一个线程中关闭连接(您可以认为它是延迟任务线程)会强制所有使用/等待此连接的异步任务停止。
--EDIT--
- 编辑 -
Another cleaner solution suggested by @vtortola: use the cancellation token to register a call to stream.Close:
@vtortola 建议的另一种更简洁的解决方案:使用取消令牌注册对 stream.Close 的调用:
async ValueTask Read(NetworkStream stream, TimeSpan timeout = default)
{
if(timeout == default(TimeSpan))
timeout = TimeSpan.FromSeconds(5);
using var cts = new CancellationTokenSource(timeout); //C# 8 syntax
using(cts.Token.Register(() => stream.Close()))
{
int receivedCount;
try
{
var buffer = new byte[30000];
receivedCount = await stream.ReadAsync(buffer, 0, 30000, tcs.Token).ConfigureAwait(false);
}
catch (TimeoutException)
{
receivedCount = -1;
}
}
}
回答by usr
Cancellation is cooperative. NetworkStream.ReadAsyncmust cooperate to be able to be cancelled. It is kind of hard for it to do that because that would potentially leave the stream in an undefined state. What bytes have already been read from the Windows TCP stack and what haven't? IO is not easily cancellable.
取消是合作的。NetworkStream.ReadAsync必须配合才能取消。这样做有点困难,因为这可能会使流处于未定义状态。哪些字节已从 Windows TCP 堆栈中读取,哪些尚未读取?IO 不容易取消。
Reflector shows that NetworkStreamdoes not override ReadAsync. This means that it will get the default behavior of Stream.ReadAsyncwhich just throws the token away. There is no generic way Stream operations can be cancelled so the BCL Streamclass does not even try (it cannot try - there is no way to do this).
反射器显示NetworkStream不会覆盖ReadAsync. 这意味着它将获得Stream.ReadAsync将令牌扔掉的默认行为。没有通用的方式可以取消 Stream 操作,因此 BCLStream类甚至不尝试(它不能尝试 - 没有办法做到这一点)。
You should set a timeout on the Socket.
您应该在Socket.
回答by Anssssss
Per the description in Softlion's answer:
根据 Softlion 的回答中的描述:
Combine the async call with a delay task (Task.Delay) using Task.WaitAny. When the delay elapses before the io task, close the stream. This will force the task to stop. You should handle the async exception on the io task correctly. And you should add a continuation task for both the dealy task and the io task.
使用 Task.WaitAny 将异步调用与延迟任务 (Task.Delay) 结合起来。当延迟在 io 任务之前过去时,关闭流。这将强制任务停止。您应该正确处理 io 任务上的异步异常。并且您应该为 dealy 任务和 io 任务添加一个延续任务。
I've made some code that gives you the async read with timeout:
我已经制作了一些代码,可以为您提供超时的异步读取:
using System;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace ConsoleApplication2013
{
class Program
{
/// <summary>
/// Does an async read on the supplied NetworkStream and will timeout after the specified milliseconds.
/// </summary>
/// <param name="ns">NetworkStream object on which to do the ReadAsync</param>
/// <param name="s">Socket associated with ns (needed to close to abort the ReadAsync task if the timeout occurs)</param>
/// <param name="timeoutMillis">number of milliseconds to wait for the read to complete before timing out</param>
/// <param name="buffer"> The buffer to write the data into</param>
/// <param name="offset">The byte offset in buffer at which to begin writing data from the stream</param>
/// <param name="amountToRead">The maximum number of bytes to read</param>
/// <returns>
/// a Tuple where Item1 is true if the ReadAsync completed, and false if the timeout occurred,
/// and Item2 is set to the amount of data that was read when Item1 is true
/// </returns>
public static async Task<Tuple<bool, int>> ReadWithTimeoutAsync(NetworkStream ns, Socket s, int timeoutMillis, byte[] buffer, int offset, int amountToRead)
{
Task<int> readTask = ns.ReadAsync(buffer, offset, amountToRead);
Task timeoutTask = Task.Delay(timeoutMillis);
int amountRead = 0;
bool result = await Task.Factory.ContinueWhenAny<bool>(new Task[] { readTask, timeoutTask }, (completedTask) =>
{
if (completedTask == timeoutTask) //the timeout task was the first to complete
{
//close the socket (unless you set ownsSocket parameter to true in the NetworkStream constructor, closing the network stream alone was not enough to cause the readTask to get an exception)
s.Close();
return false; //indicate that a timeout occurred
}
else //the readTask completed
{
amountRead = readTask.Result;
return true;
}
});
return new Tuple<bool, int>(result, amountRead);
}
#region sample usage
static void Main(string[] args)
{
Program p = new Program();
Task.WaitAll(p.RunAsync());
}
public async Task RunAsync()
{
Socket s = new Socket(SocketType.Stream, ProtocolType.Tcp);
Console.WriteLine("Connecting...");
s.Connect("127.0.0.1", 7894); //for a simple server to test the timeout, run "ncat -l 127.0.0.1 7894"
Console.WriteLine("Connected!");
NetworkStream ns = new NetworkStream(s);
byte[] buffer = new byte[1024];
Task<Tuple<bool, int>> readWithTimeoutTask = Program.ReadWithTimeoutAsync(ns, s, 3000, buffer, 0, 1024);
Console.WriteLine("Read task created");
Tuple<bool, int> result = await readWithTimeoutTask;
Console.WriteLine("readWithTimeoutTask is complete!");
Console.WriteLine("Read succeeded without timeout? " + result.Item1 + "; Amount read=" + result.Item2);
}
#endregion
}
}
回答by Stephen Cleary
There are a few problems there that pop out:
有一些问题会弹出:
CancellationTokenthrowsOperationCanceledException, notTimeoutException(cancellation is not always due to timeout).ReceiveTimeoutdoesn't apply, since you're doing an asynchronous read. Even if it did, you'd have a race condition betweenIOExceptionandOperationCanceledException.- Since you're synchronously connecting the socket, you'll want a high timeout on this test (IIRC, the default connection timeout is ~90 seconds, but can be changed as Windows monitors the network speeds).
The correct way to test asynchronous code is with an asynchronous test:
[TestMethod] public async Task TestTest() { var tcp = new TcpClient() { ReceiveTimeout = 5000, SendTimeout = 20000 }; tcp.Connect(IPAddress.Parse("176.31.100.115"), 25); await Read(tcp.GetStream()); }
CancellationTokenthrowsOperationCanceledException, notTimeoutException(取消并不总是由于超时)。ReceiveTimeout不适用,因为您正在进行异步读取。即使是这样,您也会在IOException和之间存在竞争条件OperationCanceledException。- 由于您正在同步连接套接字,因此您需要在此测试中设置高超时时间(IIRC,默认连接超时时间约为 90 秒,但可以在 Windows 监控网络速度时进行更改)。
测试异步代码的正确方法是使用异步测试:
[TestMethod] public async Task TestTest() { var tcp = new TcpClient() { ReceiveTimeout = 5000, SendTimeout = 20000 }; tcp.Connect(IPAddress.Parse("176.31.100.115"), 25); await Read(tcp.GetStream()); }
回答by ZakiMa
Providing more context on three different approaches. My service monitors other web applications availability. So, it needs to establish lots of connections to various web sites. Some of them crash/return errors/become unresponsive.
提供有关三种不同方法的更多背景信息。我的服务监视其他 Web 应用程序的可用性。因此,它需要与各种网站建立大量连接。其中一些崩溃/返回错误/变得无响应。
Axis Y - number of hung tests (sessions). Drops to 0 caused by deployments/restarts.
Y 轴 - 挂起测试(会话)的数量。由于部署/重启而下降到 0。
I. (Jan 25th) After revamping a service, the initial implementation used ReadAsync with a cancellation token. This resulted in lots of tests hanging (running requests against those web sites showed that servers indeed sometimes didn't return content).
I.(1 月 25 日)改进服务后,初始实现使用带有取消令牌的 ReadAsync。这导致许多测试挂起(针对这些网站运行请求表明服务器有时确实没有返回内容)。
II. (Feb 17th) Deployed a change which guarded cancellation with Task.Delay. This completely fixed this issue.
二、(2 月 17 日)部署了一项更改,该更改通过 Task.Delay 保护取消。这完全解决了这个问题。
private async Task<int> StreamReadWithCancellationTokenAsync(Stream stream, byte[] buffer, int count, Task cancellationDelayTask)
{
if (cancellationDelayTask.IsCanceled)
{
throw new TaskCanceledException();
}
// Stream.ReadAsync doesn't honor cancellation token. It only checks it at the beginning. The actual
// operation is not guarded. As a result if remote server never responds and connection never closed
// it will lead to this operation hanging forever.
Task<int> readBytesTask = stream.ReadAsync(
buffer,
0,
count);
await Task.WhenAny(readBytesTask, cancellationDelayTask).ConfigureAwait(false);
// Check whether cancellation task is cancelled (or completed).
if (cancellationDelayTask.IsCanceled || cancellationDelayTask.IsCompleted)
{
throw new TaskCanceledException();
}
// Means that main task completed. We use Result directly.
// If the main task failed the following line will throw an exception and
// we'll catch it above.
int readBytes = readBytesTask.Result;
return readBytes;
}
III (March 3rd) Following this StackOverflow implemented closing a stream based on timeout:
III(3 月 3 日)继此 StackOverflow 实现基于超时关闭流:
using (timeoutToken.Register(() => stream.Close()))
{
// Stream.ReadAsync doesn't honor cancellation token. It only checks it at the beginning. The actual
// operation is not guarded. As a result if a remote server never responds and connection never closed
// it will lead to this operation hanging forever.
// ReSharper disable once MethodSupportsCancellation
readBytes = await targetStream.ReadAsync(
buffer,
0,
Math.Min(responseBodyLimitInBytes - totalReadBytes, buffer.Length)).ConfigureAwait(false);
}
This implementation brought hangs back (not to the same extent as the initial approach):
这个实现带来了阻碍(与初始方法的程度不同):
Reverted back to Task.Delay solution.
恢复到 Task.Delay 解决方案。


